googleapis / python-bigquery-pandas

Google BigQuery connector for pandas
https://googleapis.dev/python/pandas-gbq/latest/index.html
BSD 3-Clause "New" or "Revised" License
437 stars 119 forks source link

Rare Block on GBQ write: #672

Open adonoho opened 11 months ago

adonoho commented 11 months ago

Environment details

Steps to reproduce

  1. Run lots of jobs, write to GBQ 100,000+ times. In this case, the write to GBQ has succeeded for many thousands of rows.
  2. twice it has stalled and needed to be interrupted.

Code example

The DB is already set up in this method and the credentials are not None. The stall happens in the df.to_gbq() call. No exception is thrown to be caught.

    def _push_to_database(self):
        df = pd.concat(self.results)
        # Store remotely for flexibility.
        if self.remote is not None:
            try:
                with self.remote.connect() as rdb:
                    df.to_sql(self.table_name, rdb, if_exists='append', method='multi')
            except SQLAlchemyError as e:
                logging.error("%s", e)
        if self.credentials is not None:
            try:
                df.to_gbq(f'EMS.{self.table_name}',
                          if_exists='append',
                          progress_bar=False,
                          credentials=self.credentials)
            except pandas_gbq.exceptions.GenericGBQException as e:
                logging.error("%s", e)
        elif self.project_id is not None:
            try:
                df.to_gbq(f'EMS.{self.table_name}',
                          if_exists='append',
                          progress_bar=False,
                          project_id=self.project_id)
            except pandas_gbq.exceptions.GenericGBQException as e:
                logging.error("%s", e)

        # Store locally for durability.
        with self.local.connect() as ldb:
            df.to_sql(self.table_name, ldb, if_exists='append', method='multi')
        self.results = []

Stack trace

Traceback (most recent call last):
  File "/Users/awd/Projects/MultiverseExperiments/AMP_matrix_recovery/run_amp_normal_bayes.py", line 494, in <module>
    do_coiled_experiment('exp_dicts/AMP_matrix_recovery_normal_bayes_3_2.json')
  File "/Users/awd/Projects/MultiverseExperiments/AMP_matrix_recovery/run_amp_normal_bayes.py", line 449, in do_coiled_experiment
    do_on_cluster(exp, run_amp_instance, client, credentials=get_gbq_credentials())
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/EMS/manager.py", line 412, in do_on_cluster
    db.push_batch()
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/EMS/manager.py", line 117, in push_batch
    self._push_to_database()
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/EMS/manager.py", line 73, in _push_to_database
    df.to_gbq(f'EMS.{self.table_name}',
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/pandas/core/frame.py", line 2161, in to_gbq
    gbq.to_gbq(
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/pandas/io/gbq.py", line 223, in to_gbq
    pandas_gbq.to_gbq(
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/pandas_gbq/gbq.py", line 1220, in to_gbq
    connector.load_data(
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/pandas_gbq/gbq.py", line 602, in load_data
    chunks = load.load_chunks(
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/pandas_gbq/load.py", line 243, in load_chunks
    load_parquet(
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/pandas_gbq/load.py", line 137, in load_parquet
    ).result()
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/google/cloud/bigquery/job/base.py", line 922, in result
    return super(_AsyncJob, self).result(timeout=timeout, **kwargs)
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/google/api_core/future/polling.py", line 256, in result
    self._blocking_poll(timeout=timeout, retry=retry, polling=polling)
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/google/api_core/future/polling.py", line 137, in _blocking_poll
    polling(self._done_or_raise)(retry=retry)
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/google/api_core/retry.py", line 349, in retry_wrapped_func
    return retry_target(
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/google/api_core/retry.py", line 191, in retry_target
    return target()
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/google/api_core/future/polling.py", line 119, in _done_or_raise
    if not self.done(retry=retry):
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/google/cloud/bigquery/job/base.py", line 889, in done
    self.reload(retry=retry, timeout=timeout)
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/google/cloud/bigquery/job/base.py", line 781, in reload
    api_response = client._call_api(
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/google/cloud/bigquery/client.py", line 816, in _call_api
    return call()
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/google/cloud/_http/__init__.py", line 482, in api_request
    response = self._make_request(
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/google/cloud/_http/__init__.py", line 341, in _make_request
    return self._do_request(
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/google/cloud/_http/__init__.py", line 379, in _do_request
    return self.http.request(
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/google/auth/transport/requests.py", line 542, in request
    response = super(AuthorizedSession, self).request(
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/requests/sessions.py", line 589, in request
    resp = self.send(prep, **send_kwargs)
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/requests/sessions.py", line 703, in send
    r = adapter.send(request, **kwargs)
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/requests/adapters.py", line 486, in send
    resp = conn.urlopen(
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/urllib3/connectionpool.py", line 703, in urlopen
    httplib_response = self._make_request(
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/urllib3/connectionpool.py", line 449, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/urllib3/connectionpool.py", line 444, in _make_request
    httplib_response = conn.getresponse()
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/http/client.py", line 1375, in getresponse
    response.begin()
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/http/client.py", line 318, in begin
    version, status, reason = self._read_status()
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/http/client.py", line 279, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/ssl.py", line 1274, in recv_into
    return self.read(nbytes, buffer)
  File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/ssl.py", line 1130, in read
    return self._sslobj.read(len, buffer)

Making sure to follow these steps will guarantee the quickest resolution possible.

Thanks!

Linchin commented 11 months ago

Hi @adonoho, thank you for reporting this issue. I tried, but haven't been able to reproduce it. I suspect this has something to do with your network being unstable. Maybe you can add a timeout for the df.to_gbq() function and retry if it stalls? Also, you said "Run lots of jobs, write to GBQ 100,000+ times", but I hit quota error just after 1500 insertions. How did you bypass this?

adonoho commented 11 months ago

@Linchin This is a program that collects values from a compute cluster. Each function returns the single row of a data frame. They are concatenated and then written to GBQ. I've had jobs create 5M rows in 4K row chunks, i.e. every minute or 4k rows whichever occurs sooner. I will explore the timeout function to retry. I am happy to instrument my code however you might wish to help find this problem. BTW, df.to_gbq() doesn't appear to support a timeout parameter. (I am new to Google APIs. Please forgive me if it is documented in an non-obvious place to me.)

Linchin commented 11 months ago

Indeed df.to_gbq() doesn't have a timeout option. I'm more thinking about using Python to do it, such as the examples here.

adonoho commented 11 months ago

Presumably, the underlying Google API calls support timeouts? Is a better answer to surface exceptions that involve timeouts? (I followed the link you mentioned and because they say they don't think it plays well with threads will rule it out. FTR, this is a DASK app that is gathering data via Tornado and presenting it to the single threaded __main__ code. I am really quite happy to implement timeout catching code instead of making the loop potentially unstable.

adonoho commented 11 months ago

From the above trace, I found the following interesting #TODO in load_chunks() line 252:

    if api_method == "load_parquet":
        load_parquet(
            client,
            dataframe,
            destination_table_ref,
            write_disposition,
            location,
            schema,
            billing_project=billing_project,
        )
        # TODO: yield progress depending on result() with timeout
        return [0]

Clearly, the new load_parquet() method is not yet complete. What can I do to help fix this code? (Bear in mind that I am, due to inexperience with Google APIs, uncertain how the maintenance team manages timeout issues in pandas_gbq.