Closed yantrikskaran closed 4 years ago
Just to clarify, which client library is this question about? BigQuery?
Just to clarify, which client library is this question about? BigQuery?
yes it is regarding Big Query and I am using google.cloud.bigquery.client.
Just add more details - below is the snippet of my code. client = bigquery.Client() select_query = client.query(query) result = select_query.result() for row in result: print(row)
After printing around 1000 records I can see next record is has a gap of 10 sec. Looks like each page has around 1000 records and when it is fetching next page it is take time.
LOGS - Check below logs.
2020-05-18 03:33:36,380 MainThread (ProcessId : 7056) INFO create_tfRecords.py:(78) Completed writing to file 2020-05-18 03:33:36,380 MainThread (ProcessId : 7056) INFO util.py:(86) Memory Used By Process : 1000.03 MB 2020-05-18 03:33:47,691 MainThread (ProcessId : 7056) INFO create_tfRecords.py:(150) Start : 157053 2020-05-18 03:33:47,691 MainThread (ProcessId : 7056) INFO create_tfRecords.py:(210) year_week : 201931
@yantrikskaran Thanks for the report, I transferred it to the corresponding repository.
client.query()
returns a QueryJob
instance and the latter's result() method accepts a page_size
argument.
Would increasing the page size to N * 1000 help? After all, that's effectively the same as fetching N pages of 1000 rows in a single batch.
@plamut Thanks a lot for the response. Let me try page size and update this thread.
One doubt - When I am going to increase the page size. I think it will add delay in loading the next page.
Example - Suppose I have to process 500 pages, each has 1000 rows. Each row is of x MB. Currently with default pagination - after initial page, next page is taking 10 sec. I change the page size to 10*1000 records per page, after initial page, next page, will take more time as it has to load more rows/data at a time?
I change the page size to 10*1000 records per page, after initial page, next page, will take more time as it has to load more rows/data at a time?
@yantrikskaran If the delay is due to the network bottleneck, then yes, fetching a bigger page will take longer, although the total delay might be lower than individually fetching 10 pages of 1000 records.
Since fetching the data is an I/O operation, you can also parallelize it and start fetching the next page while processing the current page by moving either the processing or fetching the data to its own thread.
You might also have a look at the new (much) faster BigQuery Storage API, which will become a default in one of the next releases. For now, if it's OK to receive data in a pandas DataFrame and you have the required dependencies, you can try processing table data as a stream of DataFrames:
from google.cloud import bigquery_storage_v1
bqstorage_client = bigquery_storage_v1.BigQueryReadClient()
dataframes = client.query(query).to_dataframe_iterable(bqstorage_client=bqstorage_client)
for df in dataframes:
# process the dataframe
@plamut - Yes I tried changing page size, yes it is adding delay for next page.
I had tried using to_dataframe_iterable() also and but I had an issue where I cannot convert the result to a DF as it has a type which is not supported. I don;t want to add more logic to my SQL to create a DF compatible result.
You have mentioned to "Since fetching the data is an I/O operation, you can also parallelize it and start fetching the next page while processing the current page by moving either the processing or fetching the data to its own thread."
Do you have a sample code which can help me to try this parallelization.
@plamut - I think there some major issue either at library or Big Query level.
My Query is suppose to return 4500 records.
query.result() implementation doesn't help. Still I can see it is loading around 730 records per page.
from google.cloud.bigquery.client import Client client = bigquery.Client() select_query = client.query(query) result = select_query.result(page_size=10000, max_results=10000)
for row in result: print(row)
I tried table.list_rows. But implementation doesn't help.
from google.cloud.bigquery.client import Client client = bigquery.Client() select_query = client.query(query) result = select_query.result(page_size=10000, max_results=10000)
destination_table = select_query.destination
result = client.list_rows(destination_table, start_index=0, max_results=10000, page_size=10000)
for row in result: print(row)
It also has same behaviour around 730 records per page.
@yantrikskaran 4500 records indeed doesn't sound that much, unless rows are excessively large. What's the typical row size in your data and what's the internet connection speed on the machine downloading the data?
Can you maybe also try reproducing the issue by fetching something from one of the public datasets? If yes, I can try fetching the same data and see how it behaves on my local machine.
@yantrikskaran 4500 records indeed doesn't sound that much, unless rows are excessively large. What's the typical row size in your data and what's the internet connection speed on the machine downloading the data?
Can you maybe also try reproducing the issue by fetching something from one of the public datasets? If yes, I can try fetching the same data and see how it behaves on my local machine.
@plamut Yes number is not high but I am sure the row size is very large. Each row may be around 2 MB or little more. It is a json row which I am constructing as part of my SQL.
Internet speed should not be an issue, I am facing the problem when running on Google Compute Engine. My local connection speed is 100 mbps.
@yantrikskaran I took a closer look at this and the following seems to be happening:
QueryJob.result()
returns a RowIterator
instance, a subclass of the Iterator
form api_core
that implements the iteration logic.
When iterating over it, the first page is fetched and iterated over, yielding individual rows. The next page is only requested when all the rows of the current page have been yielded, causing the delay you are observing.
This can also be reproduced with much smaller result sets and small page sizes:
TABLE_ID = "bigquery-public-data.stackoverflow.tags"
PAGE_SIZE = 5
MAX_RESULTS = 100
ROW_PROCESSING_TIME = 0.1
query = f"""
SELECT id, REVERSE(tag_name) as tag_name, count, excerpt_post_id, wiki_post_id
FROM `{TABLE_ID}`
"""
client = bigquery.Client()
logging.info("START")
query_job = client.query(query)
row_iterator = query_job.result(page_size=PAGE_SIZE, max_results=MAX_RESULTS)
for row in row_iterator:
logging.info("Processing row: %s", row)
time.sleep(ROW_PROCESSING_TIME)
logging.info("DONE.")
I customized the log output format and added a few extra log messages to the iterator to track page fetching, and this was the output:
INFO [2020-05-20 11:51:14,856] MainThread [root] [issue_112.py:35][iterate_rows] START
INFO [2020-05-20 11:51:16,586] MainThread [google.api_core.page_iterator] [page_iterator.py:249][_page_iter] Fetching first page...
INFO [2020-05-20 11:51:16,892] MainThread [google.api_core.page_iterator] [page_iterator.py:251][_page_iter] ... fetched first page <google.api_core.page_iterator.Page object at 0x7f21a4bcb208>.
INFO [2020-05-20 11:51:16,892] MainThread [root] [issue_112.py:41][iterate_rows] Processing row: Row((1255, 'aiw', 256, 8843419, 8843418), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO [2020-05-20 11:51:16,993] MainThread [root] [issue_112.py:41][iterate_rows] Processing row: Row((26602, 'birtnoc-tna', 256, 8924398, 8924397), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO [2020-05-20 11:51:17,094] MainThread [root] [issue_112.py:41][iterate_rows] Processing row: Row((35050, 'gnidaolerp', 256, 41349882, 41349881), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO [2020-05-20 11:51:17,194] MainThread [root] [issue_112.py:41][iterate_rows] Processing row: Row((37223, 'ecapselbat', 256, 20811171, 20811170), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO [2020-05-20 11:51:17,295] MainThread [root] [issue_112.py:41][iterate_rows] Processing row: Row((39137, 'edulcni-php', 256, 10065834, 10065833), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO [2020-05-20 11:51:17,396] MainThread [google.api_core.page_iterator] [page_iterator.py:257][_page_iter] Fetching next page...
INFO [2020-05-20 11:51:17,712] MainThread [google.api_core.page_iterator] [page_iterator.py:259][_page_iter] ... fetched next page <google.api_core.page_iterator.Page object at 0x7f21a4bf2a90>.
INFO [2020-05-20 11:51:17,712] MainThread [root] [issue_112.py:41][iterate_rows] Processing row: Row((44801, 'revres-neg', 256, 10314429, 10314428), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO [2020-05-20 11:51:17,812] MainThread [root] [issue_112.py:41][iterate_rows] Processing row: Row((46127, 'msaf', 256, 5845134, 5845133), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO [2020-05-20 11:51:17,913] MainThread [root] [issue_112.py:41][iterate_rows] Processing row: Row((46246, 'enigneyeknomj', 256, 7040300, 7040299), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO [2020-05-20 11:51:18,014] MainThread [root] [issue_112.py:41][iterate_rows] Processing row: Row((46708, 'xum', 256, 47876370, 47876369), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO [2020-05-20 11:51:18,114] MainThread [root] [issue_112.py:41][iterate_rows] Processing row: Row((56649, 'noitamotua-iu-tfosorcim', 256, 8706533, 8706532), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO [2020-05-20 11:51:18,215] MainThread [google.api_core.page_iterator] [page_iterator.py:257][_page_iter] Fetching next page...
INFO [2020-05-20 11:51:19,146] MainThread [google.api_core.page_iterator] [page_iterator.py:259][_page_iter] ... fetched next page <google.api_core.page_iterator.Page object at 0x7f21a4bf25c0>.
INFO [2020-05-20 11:51:19,147] MainThread [root] [issue_112.py:41][iterate_rows] Processing row: Row((59146, 'stroperelcaro', 256, 7732834, 7732833), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO [2020-05-20 11:51:19,247] MainThread [root] [issue_112.py:41][iterate_rows] Processing row: Row((59415, 'reweiv-fdp', 256, 31914620, 31914619), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
...
INFO [2020-05-20 11:51:38,697] MainThread [google.api_core.page_iterator] [page_iterator.py:257][_page_iter] Fetching next page...
INFO [2020-05-20 11:51:38,697] MainThread [google.api_core.page_iterator] [page_iterator.py:259][_page_iter] ... fetched next page None.
INFO [2020-05-20 11:51:38,698] MainThread [root] [issue_112.py:44][iterate_rows] DONE.
It's clear that the processing gets blocked every time a new result page needs to be fetched, which takes around 0.3 - 1.0 seconds on my machine. This delay is quite significant when considering the fact that processing a single page of 5 rows takes around half a second in this setup.
The entire script runs for well over 20 seconds and it gets even worse if page size is further reduced due to the overhead of fetching each page.
The solution to this is to increase the page size closer to what the memory and various quota limits allow, and to fetch data in parallel with its processing. I will come up with a sketch of this idea shortly.
@yantrikskaran
Here's the idea - data is fetched in a background thread, and it's fetched by iterating over result pages to have control over when the next page is requested. Every fetched page is put into out_queue
where it is grabbed by the main thread processing the rows.
At the beginning at before the processing each new page starts, an item is put into in_queue
. This is a signal to the data fetching thread to start fetching the next page in the background. That queue can also be used to shutdown the thread by putting None
into it.
def fetch_data(in_queue, out_queue):
query_job = client.query(query)
row_iterator = query_job.result(page_size=PAGE_SIZE, max_results=MAX_RESULTS)
page_iterator = row_iterator.pages
while True:
item = in_queue.get()
if item is None:
logging.info("Poison pill, aborting")
break
logging.info("Requesting next page")
try:
page = next(page_iterator)
except Exception as exc:
logging.info("Error getting next page: %s", repr(exc))
out_queue.put(None)
break
else:
logging.info("Got next page, putting in out queue: %s", page)
out_queue.put(page)
def process_parallel():
logging.info("START")
in_queue = queue.Queue()
out_queue = queue.Queue()
fetcher_thread = threading.Thread(
target=fetch_data,
args=(in_queue, out_queue),
name="Thread-Fetcher",
daemon=True,
)
fetcher_thread.start()
in_queue.put("fetch it!")
while True:
page = out_queue.get()
if page is None:
in_queue.put(None)
break
in_queue.put("fetch it!")
for row in page:
logging.info("Processing row: %s", row)
time.sleep(ROW_PROCESSING_TIME)
fetcher_thread.join(timeout=1.0)
if fetcher_thread.is_alive():
logging.warning(f"{fetcher_thread.name} did not terminate.")
logging.info("DONE")
process_parallel()
Running this version of the script with the same parameters (PAGE_SIZE, etc.) reduces the total time to sub 20 seconds on my machine, and the logs show that processing and fetching are indeed done in parallel:
...
INFO [2020-05-20 12:31:13,254] MainThread [root] [issue_112.py:112][process_parallel] Processing row: Row((87248, 'tikloot-gc', 1, 14102253, 14102252), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO [2020-05-20 12:31:13,254] Thread-Fetcher [root] [issue_112.py:76][fetch_data] Requesting next page
INFO [2020-05-20 12:31:13,254] Thread-Fetcher [google.api_core.page_iterator] [page_iterator.py:257][_page_iter] Fetching next page...
INFO [2020-05-20 12:31:13,354] MainThread [root] [issue_112.py:112][process_parallel] Processing row: Row((87277, 'ypocoidua-ipam', 1, 14117646, 14117645), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO [2020-05-20 12:31:13,455] MainThread [root] [issue_112.py:112][process_parallel] Processing row: Row((87278, 'etsapoidua-ipam', 1, 14117652, 14117651), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO [2020-05-20 12:31:13,556] MainThread [root] [issue_112.py:112][process_parallel] Processing row: Row((87904, 'zyr', 1, 14427246, 14427245), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO [2020-05-20 12:31:13,560] Thread-Fetcher [google.api_core.page_iterator] [page_iterator.py:259][_page_iter] ... fetched next page <google.api_core.page_iterator.Page object at 0x7f51a1375668>.
INFO [2020-05-20 12:31:13,560] Thread-Fetcher [root] [issue_112.py:84][fetch_data] Got next page, putting in out queue: <google.api_core.page_iterator.Page object at 0x7f51a1375668>
INFO [2020-05-20 12:31:13,656] MainThread [root] [issue_112.py:112][process_parallel] Processing row: Row((87955, 'sjemarf', 1, 14450032, 14450031), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO [2020-05-20 12:31:13,757] MainThread [root] [issue_112.py:112][process_parallel] Processing row: Row((87989, 'desacpot', 1, 14497586, 14497585), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO [2020-05-20 12:31:13,757] Thread-Fetcher [root] [issue_112.py:76][fetch_data] Requesting next page
INFO [2020-05-20 12:31:13,757] Thread-Fetcher [google.api_core.page_iterator] [page_iterator.py:257][_page_iter] Fetching next page...
INFO [2020-05-20 12:31:13,857] MainThread [root] [issue_112.py:112][process_parallel] Processing row: Row((88060, 'stneve-noidirt', 1, 14520018, 14520017), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
...
The actual implementation would likely require more advances error handling and/or more sophisticated logic for deciding when to start page transfers, but you should get the idea. Or give the BQ Storage API a shot, if that's feasible.
@plamut Thanks a lot for confirming the issue. Yes I am trying to implement the same logic where multiple thread fetch he data from result and push to queue and main thread consume via queue.
Thanks for sharing the snippet to implement the same.
Yes, I am trying to use BQ Storage API as well. Hope that solves the issue.
Can we implement fetching pages in the background as part of the library? So that it was taken care. Looks like BQ Storage API does this?
I had tried using to_dataframe_iterable() also and but I had an issue where I cannot convert the result to a DF as it has a type which is not supported.
@yantrikskaran would it be possible to provide more details on which type isn't supported? A recent release of pyarrow (0.17.0) fixed some conversion issues.
I had tried using to_dataframe_iterable() also and but I had an issue where I cannot convert the result to a DF as it has a type which is not supported.
@yantrikskaran would it be possible to provide more details on which type isn't supported? A recent release of pyarrow (0.17.0) fixed some conversion issues.
My SQL is having column aggregated as ARRAY_AGG and STRUCT. When I am trying to use get the output as dataframe I am getting some error.
Will share the actual error and data in a day or two.
Can we implement fetching pages in the background as part of the library? So that it was taken care. Looks like BQ Storage API does this?
@yantrikskaran The REST APi is generally more suitable for small to medium result sets. While it would be possible to implement automatic pre-fetching pages in the background, it's really a problem that is already better addressed by the BQ Storage API. The latter streams the rows to the client and also supports multiple streams that can be read concurrently.
Generally, it should take take too much code for somebody to implement pre-fetching the REST pages on their own, if desired, but since it's probably not a typical use case, I don't think it will be added to the client library.
Can we implement fetching pages in the background as part of the library? So that it was taken care. Looks like BQ Storage API does this?
@yantrikskaran The REST APi is generally more suitable for small to medium result sets. While it would be possible to implement automatic pre-fetching pages in the background, it's really a problem that is already better addressed by the BQ Storage API. The latter streams the rows to the client and also supports multiple streams that can be read concurrently.
Generally, it should take take too much code for somebody to implement pre-fetching the REST pages on their own, if desired, but since it's probably not a typical use case, I don't think it will be added to the client library.
@plamut yes, It make sense. Thanks a lot..!
Since the original issue has been analyzed and two decent (IMHO) solutions proposed, I will now close this.
@yantrikskaran Feel free to re-open if you think more information is needed. As for the issue with some data types, I suggest opening a separate issue, if necessary, since that is not directly related to result pagination.
If I may ask when running:
job = client.query(query)
res = job.result()
for row in res:
yield dict(row)
Let's say the query is SELECT * FROM table
.
Is the entire data from the table loaded into memory?
@plamut I tried implementing something similar to your post on background fetches and it didnt work. Each background fetch gets a page worth of data, but it is full of duplicates, so end up with massive data loss. I was using start_index=K*Page_size for each thread.
@Tunneller If you are sure that each fetch requested a different (sub)set of result, I suggest opening new issue describing a possible problem with start_index
.
Please also note that I am no longer a maintainer of this library (since early 2022), thus I am not up to date with any internal changes that have been made to the library in the last two years or so.
I do hope that you will get to the bottom of this!
Currently I am facing an issue where I have to process big number of records coming as part of query result. The results are available in multiple pages.
When I am trying to process the row from result I notice that it takes around 10 seconds to load next page. I am able to process one page in a sec or 2 and then I have to wait for 10 seconds for next page. It is time consuming.
I cannot load entire result in memory as the response can be more than 10 GB and we start hitting various QUOTA limits.
It there a way i can load batch of 100 pages in memory at once and start my process and in the background next batch of 100 pages are loaded during processing on first batch of pages.