apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.89k stars 4.27k forks source link

[Bug][Or a Feature Request]: Overwrite BigQuery TestClient to work with another API endpoint #23598

Open ruodingt opened 2 years ago

ruodingt commented 2 years ago

What happened?

Thanks for reviewing my issue:)

I am trying to use the test_client arg while declare an WriteToBigQuery instance and hopefully I can make the bq client work with big query emulator.

I did notice this one: https://github.com/apache/beam/blob/786ba8b54c023d6f7a24cbf8321a46f329ca7027/sdks/python/apache_beam/io/gcp/bigquery.py#L1443

which lead to an assumption: test_client is an instance of BigqueryV2

Here is the pipeline code:

def test_write_big_query():
    project_id = "test-project"
    dataset_id = 'poc-test-dataset'
    table_1_id = 'poc-test-table-1'

    pipeline_options = PipelineOptions(save_main_session=True)
    tp = TestPipeline(options=pipeline_options)

    x = tp | Create([
        {'type2': 'a', "full_name": 'firstname lastname', 'id2': str(uuid.uuid4())},
        {'type2': 'b', "full_addr": 'addr st, vic 3000', 'id2': str(uuid.uuid4())}
    ])

    from apache_beam.io.gcp.internal.clients.bigquery import BigqueryV2
    from apache_beam.internal.http_client import get_new_http
    from apache_beam.internal.gcp import auth

    test_client_bq = BigqueryV2(... # I tried different versions her and trying to make this work

    x = x | WriteToBigQuery(table=table_resolve,
                            dataset=dataset_id,
                            project=project_id,
                            test_client=test_client_bq,
                            method=WriteToBigQuery.Method.STREAMING_INSERTS, validate=False)

    tp.run()

I keeps getting trouble while trying different ways to construct a working BigqueryV2 instance but it keeps failing...

Experiment 1

test_client_bq is defined as :

test_client_bq = BigqueryV2(url='http://localhost:9050/bigquery/v2/')
error msg ``` unit/test_io.py:301 (test_write_big_query) encoded = b'QlpoOTFBWSZTWURKFI8ABwF/6v////ve///1v///ov////pz5K4AIABIAEU1YAgfelh9D32+fON6ALc55NTzVnlLoaSGphoU2p6ZRP0DU0ynqPE0ymEa...T9hSQuqjjMVZ+10rH+O4z7uw6tc2/eQw6eVNTFVY9LZqrermFypf6cBJfsdjNmqjKJjfaP/RsCy366TEl+MGJiyaqHmRJ+5hGaEiD/4u5IpwoSCIlCkeA=' enable_trace = True, use_zlib = False def loads(encoded, enable_trace=True, use_zlib=False): """For internal use only; no backwards-compatibility guarantees.""" c = base64.b64decode(encoded) if use_zlib: s = zlib.decompress(c) else: s = bz2.decompress(c) del c # Free up some possibly large and no-longer-needed memory. with _pickle_lock: try: > return dill.loads(s) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/dill_pickler.py:285: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ str = b'\x80\x04\x956\x10\x00\x00\x00\x00\x00\x00\x8c\x1bapache_beam.io.gcp.bigquery\x94\x8c\x0fBigQueryWriteFn\x94\x93\x94)...#streaming_api_logging_frequency_sec\x94M,\x01\x8c\x16ignore_unknown_columns\x94\x89\x8c\x0c_max_retries\x94M\x10\'ub.' ignore = None, kwds = {}, file = <_io.BytesIO object at 0x13392df90> def loads(str, ignore=None, **kwds): """unpickle an object from a string""" file = StringIO(str) > return load(file, ignore, **kwds) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:275: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ file = <_io.BytesIO object at 0x13392df90>, ignore = None, kwds = {} def load(file, ignore=None, **kwds): """unpickle an object from a file""" > return Unpickler(file, ignore=ignore, **kwds).load() /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:270: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = def load(self): #NOTE: if settings change, need to update attributes > obj = StockUnpickler.load(self) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:472: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = [], sequence = ['*/*'] def extend(self, sequence): """Validate extension of list.""" > self.__field.validate(sequence) E AttributeError: 'FieldList' object has no attribute '_FieldList__field' /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apitools/base/protorpclite/messages.py:1147: AttributeError During handling of the above exception, another exception occurred: def test_write_big_query(): project_id = "test-project" dataset_id = 'poc-test-dataset' table_1_id = 'poc-test-table-1' pipeline_options = PipelineOptions(save_main_session=True) tp = TestPipeline(options=pipeline_options) x = tp | Create([ {'type2': 'a', "full_name": 'firstname lastname', 'id2': str(uuid.uuid4())}, {'type2': 'b', "full_addr": 'addr st, vic 3000', 'id2': str(uuid.uuid4())} ]) from apache_beam.io.gcp.internal.clients.bigquery import BigqueryV2 from apache_beam.internal.http_client import get_new_http from apache_beam.internal.gcp import auth # from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper # bqc_ = BigqueryV2(url='http://localhost:9050/bigquery/v2/') # bq_test_client = BigQueryWrapper(client=bqc_) test_client_bq = BigqueryV2( url='http://localhost:9050/bigquery/v2/' # http=get_new_http(), # credentials=auth.get_service_credentials(None), # response_encoding='utf8', # additional_http_headers={ # "user-agent": "apache-beam-x" # } ) > x = x | WriteToBigQuery(table=table_resolve, dataset=dataset_id, project=project_id, test_client=test_client_bq, method=WriteToBigQuery.Method.STREAMING_INSERTS, validate=False) test_io.py:332: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pvalue.py:137: in __or__ return self.pipeline.apply(ptransform, self) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pipeline.py:709: in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:185: in apply return m(transform, input, options) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:215: in apply_PTransform return transform.expand(input) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery.py:2308: in expand outputs = pcoll | _StreamToBigQuery( /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pvalue.py:137: in __or__ return self.pipeline.apply(ptransform, self) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pipeline.py:709: in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:185: in apply return m(transform, input, options) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:215: in apply_PTransform return transform.expand(input) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery.py:2031: in expand | 'StreamInsertRows' >> ParDo( /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/transforms/core.py:1416: in __init__ super().__init__(fn, *args, **kwargs) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/transforms/ptransform.py:864: in __init__ self.fn = pickler.loads(pickler.dumps(self.fn)) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/pickler.py:51: in loads return desired_pickle_lib.loads( /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/dill_pickler.py:289: in loads return dill.loads(s) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:275: in loads return load(file, ignore, **kwds) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:270: in load return Unpickler(file, ignore=ignore, **kwds).load() /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:472: in load obj = StockUnpickler.load(self) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = [], sequence = ['*/*'] def extend(self, sequence): """Validate extension of list.""" > self.__field.validate(sequence) E AttributeError: 'FieldList' object has no attribute '_FieldList__field' /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apitools/base/protorpclite/messages.py:1147: AttributeError ```

Experiment 2

since I see this line:

https://github.com/apache/beam/blob/786ba8b54c023d6f7a24cbf8321a46f329ca7027/sdks/python/apache_beam/io/gcp/bigquery_tools.py#L332

I put test_client_bq as :

bigquery.BigqueryV2(
        http=get_new_http(),
        credentials=auth.get_service_credentials(None),
        response_encoding='utf8',
        additional_http_headers={
            "user-agent": "apache-beam-%s" % apache_beam.__version__
        })
error ``` unit/test_io.py:301 (test_write_big_query) encoded = b'QlpoOTFBWSZTWWgkjdgAB3X/+/////v////1v///ov////rj7S4IAEAgAEAAAQQMAGAIn30+56Cp9vV573du719OefHOtoNs9HffPvDRCExA1MnqmaMk...dj9dkHpxsfg1V9mODbtHdYiyVO4UjKY1QG3OhDMGUiXjpbDNEUcrtRipaYUdeOKZH9XoMTt8LOY7Dtna2a4isad47qLu83S3xyJ/6f/F3JFOFCQaCSN2A=' enable_trace = True, use_zlib = False def loads(encoded, enable_trace=True, use_zlib=False): """For internal use only; no backwards-compatibility guarantees.""" c = base64.b64decode(encoded) if use_zlib: s = zlib.decompress(c) else: s = bz2.decompress(c) del c # Free up some possibly large and no-longer-needed memory. with _pickle_lock: try: > return dill.loads(s) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/dill_pickler.py:285: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ str = b'\x80\x04\x95a\x11\x00\x00\x00\x00\x00\x00\x8c\x1bapache_beam.io.gcp.bigquery\x94\x8c\x0fBigQueryWriteFn\x94\x93\x94)...#streaming_api_logging_frequency_sec\x94M,\x01\x8c\x16ignore_unknown_columns\x94\x89\x8c\x0c_max_retries\x94M\x10\'ub.' ignore = None, kwds = {}, file = <_io.BytesIO object at 0x133cd2a40> def loads(str, ignore=None, **kwds): """unpickle an object from a string""" file = StringIO(str) > return load(file, ignore, **kwds) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:275: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ file = <_io.BytesIO object at 0x133cd2a40>, ignore = None, kwds = {} def load(file, ignore=None, **kwds): """unpickle an object from a file""" > return Unpickler(file, ignore=ignore, **kwds).load() /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:270: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = def load(self): #NOTE: if settings change, need to update attributes > obj = StockUnpickler.load(self) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:472: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = attr = '__setstate__' def __getattr__(self, attr): """Delegate attribute access to underlying google-auth credentials.""" > return getattr(self._google_auth_credentials, attr) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = attr = '_google_auth_credentials' def __getattr__(self, attr): """Delegate attribute access to underlying google-auth credentials.""" > return getattr(self._google_auth_credentials, attr) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = attr = '_google_auth_credentials' def __getattr__(self, attr): """Delegate attribute access to underlying google-auth credentials.""" > return getattr(self._google_auth_credentials, attr) E RecursionError: maximum recursion depth exceeded /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119: RecursionError !!! Recursion detected (same locals & position) During handling of the above exception, another exception occurred: self = fn = args = (), kwargs = {} def __init__(self, fn, *args, **kwargs): # type: (WithTypeHints, *Any, **Any) -> None if isinstance(fn, type) and issubclass(fn, WithTypeHints): # Don't treat Fn class objects as callables. raise ValueError('Use %s() not %s.' % (fn.__name__, fn.__name__)) self.fn = self.make_fn(fn, bool(args or kwargs)) # Now that we figure out the label, initialize the super-class. super().__init__() if (any(isinstance(v, pvalue.PCollection) for v in args) or any(isinstance(v, pvalue.PCollection) for v in kwargs.values())): raise error.SideInputError( 'PCollection used directly as side input argument. Specify ' 'AsIter(pcollection) or AsSingleton(pcollection) to indicate how the ' 'PCollection is to be used.') self.args, self.kwargs, self.side_inputs = util.remove_objects_from_args( args, kwargs, pvalue.AsSideInput) self.raw_side_inputs = args, kwargs # Prevent name collisions with fns of the form ' at ...>' self._cached_fn = self.fn # Ensure fn and side inputs are picklable for remote execution. try: > self.fn = pickler.loads(pickler.dumps(self.fn)) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/transforms/ptransform.py:864: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ encoded = b'QlpoOTFBWSZTWWgkjdgAB3X/+/////v////1v///ov////rj7S4IAEAgAEAAAQQMAGAIn30+56Cp9vV573du719OefHOtoNs9HffPvDRCExA1MnqmaMk...dj9dkHpxsfg1V9mODbtHdYiyVO4UjKY1QG3OhDMGUiXjpbDNEUcrtRipaYUdeOKZH9XoMTt8LOY7Dtna2a4isad47qLu83S3xyJ/6f/F3JFOFCQaCSN2A=' enable_trace = True, use_zlib = False def loads(encoded, enable_trace=True, use_zlib=False): """For internal use only; no backwards-compatibility guarantees.""" > return desired_pickle_lib.loads( encoded, enable_trace=enable_trace, use_zlib=use_zlib) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/pickler.py:51: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ encoded = b'QlpoOTFBWSZTWWgkjdgAB3X/+/////v////1v///ov////rj7S4IAEAgAEAAAQQMAGAIn30+56Cp9vV573du719OefHOtoNs9HffPvDRCExA1MnqmaMk...dj9dkHpxsfg1V9mODbtHdYiyVO4UjKY1QG3OhDMGUiXjpbDNEUcrtRipaYUdeOKZH9XoMTt8LOY7Dtna2a4isad47qLu83S3xyJ/6f/F3JFOFCQaCSN2A=' enable_trace = True, use_zlib = False def loads(encoded, enable_trace=True, use_zlib=False): """For internal use only; no backwards-compatibility guarantees.""" c = base64.b64decode(encoded) if use_zlib: s = zlib.decompress(c) else: s = bz2.decompress(c) del c # Free up some possibly large and no-longer-needed memory. with _pickle_lock: try: return dill.loads(s) except Exception: # pylint: disable=broad-except if enable_trace: dill.dill._trace(True) # pylint: disable=protected-access > return dill.loads(s) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/dill_pickler.py:289: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ str = b'\x80\x04\x95a\x11\x00\x00\x00\x00\x00\x00\x8c\x1bapache_beam.io.gcp.bigquery\x94\x8c\x0fBigQueryWriteFn\x94\x93\x94)...#streaming_api_logging_frequency_sec\x94M,\x01\x8c\x16ignore_unknown_columns\x94\x89\x8c\x0c_max_retries\x94M\x10\'ub.' ignore = None, kwds = {}, file = <_io.BytesIO object at 0x133d2d4f0> def loads(str, ignore=None, **kwds): """unpickle an object from a string""" file = StringIO(str) > return load(file, ignore, **kwds) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:275: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ file = <_io.BytesIO object at 0x133d2d4f0>, ignore = None, kwds = {} def load(file, ignore=None, **kwds): """unpickle an object from a file""" > return Unpickler(file, ignore=ignore, **kwds).load() /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:270: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = def load(self): #NOTE: if settings change, need to update attributes > obj = StockUnpickler.load(self) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:472: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = attr = '__setstate__' def __getattr__(self, attr): """Delegate attribute access to underlying google-auth credentials.""" > return getattr(self._google_auth_credentials, attr) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = attr = '_google_auth_credentials' def __getattr__(self, attr): """Delegate attribute access to underlying google-auth credentials.""" > return getattr(self._google_auth_credentials, attr) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = attr = '_google_auth_credentials' def __getattr__(self, attr): """Delegate attribute access to underlying google-auth credentials.""" > return getattr(self._google_auth_credentials, attr) E RecursionError: maximum recursion depth exceeded while calling a Python object /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119: RecursionError !!! Recursion detected (same locals & position) During handling of the above exception, another exception occurred: def test_write_big_query(): project_id = "test-project" dataset_id = 'poc-test-dataset' table_1_id = 'poc-test-table-1' pipeline_options = PipelineOptions(save_main_session=True) tp = TestPipeline(options=pipeline_options) x = tp | Create([ {'type2': 'a', "full_name": 'firstname lastname', 'id2': str(uuid.uuid4())}, {'type2': 'b', "full_addr": 'addr st, vic 3000', 'id2': str(uuid.uuid4())} ]) from apache_beam.io.gcp.internal.clients.bigquery import BigqueryV2 from apache_beam.internal.http_client import get_new_http from apache_beam.internal.gcp import auth # from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper # bqc_ = BigqueryV2(url='http://localhost:9050/bigquery/v2/') # bq_test_client = BigQueryWrapper(client=bqc_) # test_client_bq = BigqueryV2( # url='http://localhost:9050/bigquery/v2/' # # http=get_new_http(), # # credentials=auth.get_service_credentials(None), # # response_encoding='utf8', # # additional_http_headers={ # # "user-agent": "apache-beam-x" # # } # ) test_client_bq = BigqueryV2( http=get_new_http(), credentials=auth.get_service_credentials(None), response_encoding='utf8', additional_http_headers={ "user-agent": "apache-beam-2.41.0" }) > x = x | WriteToBigQuery(table=table_resolve, dataset=dataset_id, project=project_id, test_client=test_client_bq, method=WriteToBigQuery.Method.STREAMING_INSERTS, validate=False) test_io.py:340: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pvalue.py:137: in __or__ return self.pipeline.apply(ptransform, self) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pipeline.py:709: in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:185: in apply return m(transform, input, options) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:215: in apply_PTransform return transform.expand(input) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery.py:2308: in expand outputs = pcoll | _StreamToBigQuery( /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pvalue.py:137: in __or__ return self.pipeline.apply(ptransform, self) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pipeline.py:709: in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:185: in apply return m(transform, input, options) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:215: in apply_PTransform return transform.expand(input) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery.py:2031: in expand | 'StreamInsertRows' >> ParDo( /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/transforms/core.py:1416: in __init__ super().__init__(fn, *args, **kwargs) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = fn = args = (), kwargs = {} def __init__(self, fn, *args, **kwargs): # type: (WithTypeHints, *Any, **Any) -> None if isinstance(fn, type) and issubclass(fn, WithTypeHints): # Don't treat Fn class objects as callables. raise ValueError('Use %s() not %s.' % (fn.__name__, fn.__name__)) self.fn = self.make_fn(fn, bool(args or kwargs)) # Now that we figure out the label, initialize the super-class. super().__init__() if (any(isinstance(v, pvalue.PCollection) for v in args) or any(isinstance(v, pvalue.PCollection) for v in kwargs.values())): raise error.SideInputError( 'PCollection used directly as side input argument. Specify ' 'AsIter(pcollection) or AsSingleton(pcollection) to indicate how the ' 'PCollection is to be used.') self.args, self.kwargs, self.side_inputs = util.remove_objects_from_args( args, kwargs, pvalue.AsSideInput) self.raw_side_inputs = args, kwargs # Prevent name collisions with fns of the form ' at ...>' self._cached_fn = self.fn # Ensure fn and side inputs are picklable for remote execution. try: self.fn = pickler.loads(pickler.dumps(self.fn)) except RuntimeError as e: > raise RuntimeError('Unable to pickle fn %s: %s' % (self.fn, e)) E RuntimeError: Unable to pickle fn : maximum recursion depth exceeded while calling a Python object /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/transforms/ptransform.py:866: RuntimeError ```

ENV

pip3 freeze ``` allure-pytest==2.9.45 allure-python-commons==2.9.45 anyio==3.6.1 apache-beam==2.41.0 appdirs==1.4.4 attrs==22.1.0 avro==1.11.0 bigquery-schema-generator==1.5 cachetools==4.2.4 certifi==2022.6.15 charset-normalizer==2.1.1 click==8.1.3 cloudevents==1.2.0 cloudpickle==2.1.0 confluent-kafka==1.8.2 coverage==6.4.4 crcmod==1.7 deprecation==2.1.0 dill==0.3.1.1 docopt==0.6.2 fastapi==0.79.0 fastavro==1.6.0 fasteners==0.18 google-api-core==2.10.0 google-apitools==0.5.32 google-auth==2.11.0 google-auth-httplib2==0.1.0 google-cloud-bigquery==2.34.4 google-cloud-bigquery-storage==2.13.2 google-cloud-bigtable==1.7.2 google-cloud-core==2.3.2 google-cloud-datastore==1.15.5 google-cloud-dlp==3.9.2 google-cloud-language==1.3.2 google-cloud-pubsub==2.13.9 google-cloud-pubsublite==1.5.0 google-cloud-recommendations-ai==0.7.1 google-cloud-spanner==1.19.3 google-cloud-videointelligence==1.16.3 google-cloud-vision==1.0.2 google-crc32c==1.5.0 google-resumable-media==2.4.0 googleapis-common-protos==1.56.4 greenlet==1.1.3 grpc-google-iam-v1==0.12.4 grpcio==1.48.1 grpcio-gcp==0.2.2 grpcio-status==1.48.1 h11==0.13.0 hdfs==2.7.0 httplib2==0.20.4 idna==3.3 importlib-metadata==4.12.0 libcst==0.4.7 more-itertools==8.14.0 MouseInfo==0.1.3 mypy-extensions==0.4.3 numpy==1.22.4 oauth2client==4.1.3 orjson==3.7.12 overrides==6.5.0 packaging==21.3 pandas==1.4.4 Pillow==9.2.0 pluggy==0.13.1 proto-plus==1.22.0 protobuf==3.20.3 py==1.11.0 pyarrow==7.0.0 pyasn1==0.4.8 pyasn1-modules==0.2.8 PyAutoGUI==0.9.53 pydantic==1.10.1 pydot==1.4.2 pyee==8.2.2 PyGetWindow==0.0.9 PyJWT==2.4.0 pymongo==3.12.3 PyMsgBox==1.0.9 pyobjc==8.5.1 pyobjc-core==8.5.1 pyobjc-framework-Accessibility==8.5.1 pyobjc-framework-Accounts==8.5.1 pyobjc-framework-AddressBook==8.5.1 pyobjc-framework-AdServices==8.5.1 pyobjc-framework-AdSupport==8.5.1 pyobjc-framework-AppleScriptKit==8.5.1 pyobjc-framework-AppleScriptObjC==8.5.1 pyobjc-framework-ApplicationServices==8.5.1 pyobjc-framework-AppTrackingTransparency==8.5.1 pyobjc-framework-AudioVideoBridging==8.5.1 pyobjc-framework-AuthenticationServices==8.5.1 pyobjc-framework-AutomaticAssessmentConfiguration==8.5.1 pyobjc-framework-Automator==8.5.1 pyobjc-framework-AVFoundation==8.5.1 pyobjc-framework-AVKit==8.5.1 pyobjc-framework-BusinessChat==8.5.1 pyobjc-framework-CalendarStore==8.5.1 pyobjc-framework-CallKit==8.5.1 pyobjc-framework-CFNetwork==8.5.1 pyobjc-framework-ClassKit==8.5.1 pyobjc-framework-CloudKit==8.5.1 pyobjc-framework-Cocoa==8.5.1 pyobjc-framework-Collaboration==8.5.1 pyobjc-framework-ColorSync==8.5.1 pyobjc-framework-Contacts==8.5.1 pyobjc-framework-ContactsUI==8.5.1 pyobjc-framework-CoreAudio==8.5.1 pyobjc-framework-CoreAudioKit==8.5.1 pyobjc-framework-CoreBluetooth==8.5.1 pyobjc-framework-CoreData==8.5.1 pyobjc-framework-CoreHaptics==8.5.1 pyobjc-framework-CoreLocation==8.5.1 pyobjc-framework-CoreMedia==8.5.1 pyobjc-framework-CoreMediaIO==8.5.1 pyobjc-framework-CoreMIDI==8.5.1 pyobjc-framework-CoreML==8.5.1 pyobjc-framework-CoreMotion==8.5.1 pyobjc-framework-CoreServices==8.5.1 pyobjc-framework-CoreSpotlight==8.5.1 pyobjc-framework-CoreText==8.5.1 pyobjc-framework-CoreWLAN==8.5.1 pyobjc-framework-CryptoTokenKit==8.5.1 pyobjc-framework-DataDetection==8.5.1 pyobjc-framework-DeviceCheck==8.5.1 pyobjc-framework-DictionaryServices==8.5.1 pyobjc-framework-DiscRecording==8.5.1 pyobjc-framework-DiscRecordingUI==8.5.1 pyobjc-framework-DiskArbitration==8.5.1 pyobjc-framework-DVDPlayback==8.5.1 pyobjc-framework-EventKit==8.5.1 pyobjc-framework-ExceptionHandling==8.5.1 pyobjc-framework-ExecutionPolicy==8.5.1 pyobjc-framework-ExternalAccessory==8.5.1 pyobjc-framework-FileProvider==8.5.1 pyobjc-framework-FileProviderUI==8.5.1 pyobjc-framework-FinderSync==8.5.1 pyobjc-framework-FSEvents==8.5.1 pyobjc-framework-GameCenter==8.5.1 pyobjc-framework-GameController==8.5.1 pyobjc-framework-GameKit==8.5.1 pyobjc-framework-GameplayKit==8.5.1 pyobjc-framework-ImageCaptureCore==8.5.1 pyobjc-framework-IMServicePlugIn==8.5.1 pyobjc-framework-InputMethodKit==8.5.1 pyobjc-framework-InstallerPlugins==8.5.1 pyobjc-framework-InstantMessage==8.5.1 pyobjc-framework-Intents==8.5.1 pyobjc-framework-IntentsUI==8.5.1 pyobjc-framework-IOSurface==8.5.1 pyobjc-framework-iTunesLibrary==8.5.1 pyobjc-framework-KernelManagement==8.5.1 pyobjc-framework-LatentSemanticMapping==8.5.1 pyobjc-framework-LaunchServices==8.5.1 pyobjc-framework-libdispatch==8.5.1 pyobjc-framework-LinkPresentation==8.5.1 pyobjc-framework-LocalAuthentication==8.5.1 pyobjc-framework-LocalAuthenticationEmbeddedUI==8.5.1 pyobjc-framework-MailKit==8.5.1 pyobjc-framework-MapKit==8.5.1 pyobjc-framework-MediaAccessibility==8.5.1 pyobjc-framework-MediaLibrary==8.5.1 pyobjc-framework-MediaPlayer==8.5.1 pyobjc-framework-MediaToolbox==8.5.1 pyobjc-framework-Metal==8.5.1 pyobjc-framework-MetalKit==8.5.1 pyobjc-framework-MetalPerformanceShaders==8.5.1 pyobjc-framework-MetalPerformanceShadersGraph==8.5.1 pyobjc-framework-MetricKit==8.5.1 pyobjc-framework-MLCompute==8.5.1 pyobjc-framework-ModelIO==8.5.1 pyobjc-framework-MultipeerConnectivity==8.5.1 pyobjc-framework-NaturalLanguage==8.5.1 pyobjc-framework-NetFS==8.5.1 pyobjc-framework-Network==8.5.1 pyobjc-framework-NetworkExtension==8.5.1 pyobjc-framework-NotificationCenter==8.5.1 pyobjc-framework-OpenDirectory==8.5.1 pyobjc-framework-OSAKit==8.5.1 pyobjc-framework-OSLog==8.5.1 pyobjc-framework-PassKit==8.5.1 pyobjc-framework-PencilKit==8.5.1 pyobjc-framework-Photos==8.5.1 pyobjc-framework-PhotosUI==8.5.1 pyobjc-framework-PreferencePanes==8.5.1 pyobjc-framework-PushKit==8.5.1 pyobjc-framework-Quartz==8.5.1 pyobjc-framework-QuickLookThumbnailing==8.5.1 pyobjc-framework-ReplayKit==8.5.1 pyobjc-framework-SafariServices==8.5.1 pyobjc-framework-SceneKit==8.5.1 pyobjc-framework-ScreenCaptureKit==8.5.1 pyobjc-framework-ScreenSaver==8.5.1 pyobjc-framework-ScreenTime==8.5.1 pyobjc-framework-ScriptingBridge==8.5.1 pyobjc-framework-SearchKit==8.5.1 pyobjc-framework-Security==8.5.1 pyobjc-framework-SecurityFoundation==8.5.1 pyobjc-framework-SecurityInterface==8.5.1 pyobjc-framework-ServiceManagement==8.5.1 pyobjc-framework-ShazamKit==8.5.1 pyobjc-framework-Social==8.5.1 pyobjc-framework-SoundAnalysis==8.5.1 pyobjc-framework-Speech==8.5.1 pyobjc-framework-SpriteKit==8.5.1 pyobjc-framework-StoreKit==8.5.1 pyobjc-framework-SyncServices==8.5.1 pyobjc-framework-SystemConfiguration==8.5.1 pyobjc-framework-SystemExtensions==8.5.1 pyobjc-framework-UniformTypeIdentifiers==8.5.1 pyobjc-framework-UserNotifications==8.5.1 pyobjc-framework-UserNotificationsUI==8.5.1 pyobjc-framework-VideoSubscriberAccount==8.5.1 pyobjc-framework-VideoToolbox==8.5.1 pyobjc-framework-Virtualization==8.5.1 pyobjc-framework-Vision==8.5.1 pyobjc-framework-WebKit==8.5.1 pyparsing==3.0.9 pyperclip==1.8.2 pyppeteer==1.0.2 PyRect==0.2.0 PyScreeze==0.1.28 pytest==5.4.3 pytest-order==1.0.1 python-dateutil==2.8.2 pytweening==1.0.4 pytz==2022.2.1 PyYAML==6.0 regex==2022.9.13 requests==2.28.1 rsa==4.9 rubicon-objc==0.4.2 six==1.16.0 sniffio==1.3.0 SQLAlchemy==1.4.39 sqlparse==0.4.2 starlette==0.19.1 tqdm==4.64.1 typing-inspect==0.8.0 typing_extensions==4.3.0 urllib3==1.26.11 uvicorn==0.18.2 wcwidth==0.2.5 websockets==10.3 zipp==3.8.1 zstandard==0.18.0 ```

Other context

code to set up bigquery emulator `docker-compose` to set up the emulator ``` version: '3.6' services: bigquery-emulator: hostname: bigquery container_name: bigquery image: ghcr.io/goccy/bigquery-emulator:latest ports: - 9050:9050 command: --project=${PROJECT_ID} --port=${BIGQUERY_PORT} ``` The python code to setup dataset/table ``` def set_up_bq_instance(): from google.cloud.bigquery import TableReference, DatasetReference from google.api_core.client_options import ClientOptions from google.auth.credentials import AnonymousCredentials from google.cloud import bigquery project_id = "test-project" client_options = ClientOptions(api_endpoint="http://localhost:9050") bq_test_client = bq_test_client = bigquery.Client( project=project_id, client_options=client_options, credentials=AnonymousCredentials(), ) dataset_ref = DatasetReference(project=project_id, dataset_id='poc-test-dataset') try: bq_test_client.create_dataset(dataset=dataset_ref, exists_ok=True, retry=None) except InternalServerError: pass table_ref_1 = TableReference(dataset_ref=dataset_ref, table_id='poc-test-table-1') table_ref_2 = TableReference(dataset_ref=dataset_ref, table_id='poc-test-table-2') schema_1 = [ bigquery.SchemaField("type2", "STRING", mode="REQUIRED"), bigquery.SchemaField("full_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("id2", "STRING", mode="REQUIRED"), ] schema_2 = [ bigquery.SchemaField("type2", "STRING", mode="REQUIRED"), bigquery.SchemaField("full_addr", "STRING", mode="REQUIRED"), bigquery.SchemaField("id2", "STRING", mode="REQUIRED"), ] table_1 = bigquery.Table(table_ref=table_ref_1, schema=schema_1) table_2 = bigquery.Table(table_ref=table_ref_2, schema=schema_2) try: _ = bq_test_client.create_table(table_1, exists_ok=True, retry=None) except InternalServerError: pass try: _ = bq_test_client.create_table(table_2, exists_ok=True, retry=None) except InternalServerError: pass list_dataset = list(bq_test_client.list_datasets()) list_table = list(bq_test_client.list_tables(dataset_ref)) return bq_test_client ```

Issue Priority

Priority: 2

Issue Component

Component: io-py-gcp

kennknowles commented 1 year ago

@johnjcasey @Abacn do you understand the question here?