apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.8k stars 4.22k forks source link

[Feature Request]: Add support for BigQuery structs in BigQueryIO with BEAM_ROW output type #27166

Open jubebo opened 1 year ago

jubebo commented 1 year ago

What would you like to happen?

The BigQuery to Python type conversion in bigquery_schema_tools.py currently only support primitive types. I would like to contribute by adding support for REPEATED RECORD fields. Since I am new to this project I would be thankful for some support in this.

@svetakvsundhar can you help me understand if the following can be achieved:

I think a similar feature request was implemented in the BigQuery->Avro->Beam conversion for the java sdk: Pull request 9089.

Issue Priority

Priority: 2 (default / most feature requests should be filed as P2)

Issue Components

jubebo commented 1 year ago

I.e. the feature request would enable calling

beam.io.ReadFromBigQuery(
    project="your-project",
   dataset="your-dataset",
   table="your-table"
   method=beam.io.ReadFromBigQuery.Method.DIRECT_READ,
   flatten_results=False,
   output_type="BEAM_ROW"
)

for tables with nested entries - e.g. tables with the following schema

<TableSchema
    fields: [
        <TableFieldSchema
            fields: []
            name: 'primitive_1'
            type: 'INTEGER'>,
        <TableFieldSchema
            fields: []
            name: 'primitive_2'
            type: 'STRING'>,
        <TableFieldSchema
            fields: [
                <TableFieldSchema
                    fields: []
                    name: 'nested_lv1_1'
                    type: 'STRING'>,
                <TableFieldSchema
                    fields: []
                    name: 'nested_lv1_2'
                    type: 'STRING'>,
                <TableFieldSchema
                    fields: [
                        <TableFieldSchema
                            fields: []
                            name: 'nested_lv2_1'
                            type: 'STRING'>,
                        <TableFieldSchema
                            fields: []
                            name: 'nested_lv2_2'
                            type: 'FLOAT'>]
                    mode: 'REPEATED'
                    name: 'nested_lv1_3'
                    type: 'RECORD'>]
            mode: 'REPEATED'
            name: 'nested'
            type: 'RECORD'>]>

This schema would be the input to https://github.com/apache/beam/blob/5223ab466e78f403680a22036d3a769897d0cf9d/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py#L56 as discussed above.

svetakvsundhar commented 1 year ago

Hi @jubebo , thanks for raising this and your interest. Yes, I think that approach would work generally. Let me know what assistance you may need.

jubebo commented 1 year ago

Awesome. I was taking a closer look at this today and implemented first changes.

This is what I've done

Open questions - guidance needed

  1. Is the PTransform implementation already aware of 'nested' row entries? As far as I understand, the named tuple object returned by generate_user_type_from_bq_schema will be mapped onto the existing PCollection within the transformations expand method.
  2. Is it desired that nested schemas will get registered multiple times in this setup? See https://github.com/apache/beam/blob/5e942ae3790bc95148413c43ab7e43a01a2d82ae/sdks/python/apache_beam/typehints/schemas.py#L533-L542
  3. Can we drop the calls to named_fields_to_schema and named_tuple_from_schema in generate_user_type_from_bq_schema altogether and create the (nested) named tuple object manually? If so, we would of course lose the type checking functionalities build into these two methods.
  4. Unfortunately the current implementation does not yet execute without errors. Even when only adding the recursive call to generate_user_type_from_bq_schema the following error occurs. Can you help me understand the issure?

Stack trace

../../own_packages/apache_beam/pipeline.py:600: in __exit__
    self.result = self.run()
../../own_packages/apache_beam/pipeline.py:577: in run
    return self.runner.run_pipeline(self, self._options)
../../own_packages/apache_beam/runners/direct/direct_runner.py:129: in run_pipeline
    return runner.run_pipeline(pipeline, options)
../../own_packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:204: in run_pipeline
    options)
../../own_packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:224: in run_via_runner_api
    return self.run_stages(stage_context, stages)
../../own_packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:456: in run_stages
    runner_execution_context, bundle_context_manager, bundle_input)
../../own_packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:789: in _execute_bundle
    bundle_manager))
../../own_packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1013: in _run_bundle
    data_input, data_output, input_timers, expected_timer_output)
../../own_packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1348: in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
../../own_packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py:379: in push
    response = self.worker.do_instruction(request)
../../own_packages/apache_beam/runners/worker/sdk_worker.py:630: in do_instruction
    getattr(request, request_type), request.instruction_id)
../../own_packages/apache_beam/runners/worker/sdk_worker.py:667: in process_bundle
    bundle_processor.process_bundle(instruction_id))
../../own_packages/apache_beam/runners/worker/bundle_processor.py:1062: in process_bundle
    element.data)
../../own_packages/apache_beam/runners/worker/bundle_processor.py:231: in process_encoded
    self.output(decoded_value)
../../own_packages/apache_beam/runners/worker/operations.py:528: in output
    _cast_to_receiver(self.receivers[output_index]).receive(windowed_value)
../../own_packages/apache_beam/runners/worker/operations.py:240: in receive
    self.consumer.process(windowed_value)
../../own_packages/apache_beam/runners/worker/operations.py:1031: in process
    o)
../../own_packages/apache_beam/runners/common.py:1436: in process_with_sized_restriction
    watermark_estimator_state=estimator_state)
../../own_packages/apache_beam/runners/common.py:819: in invoke_process
    windowed_value, additional_args, additional_kwargs)
../../own_packages/apache_beam/runners/common.py:985: in _invoke_process_per_window
    self.threadsafe_watermark_estimator)
../../own_packages/apache_beam/runners/common.py:1582: in handle_process_outputs
    self._write_value_to_tag(tag, windowed_value, watermark_estimator)
../../own_packages/apache_beam/runners/common.py:1695: in _write_value_to_tag
    self.main_receivers.receive(windowed_value)
../../own_packages/apache_beam/runners/worker/operations.py:240: in receive
    self.consumer.process(windowed_value)
../../own_packages/apache_beam/runners/worker/operations.py:908: in process
    delayed_applications = self.dofn_runner.process(o)
../../own_packages/apache_beam/runners/common.py:1420: in process
    self._reraise_augmented(exn)
../../own_packages/apache_beam/runners/common.py:1492: in _reraise_augmented
    raise exn
../../own_packages/apache_beam/runners/common.py:1418: in process
    return self.do_fn_invoker.invoke_process(windowed_value)
../../own_packages/apache_beam/runners/common.py:625: in invoke_process
    windowed_value, self.process_method(windowed_value.value))
../../own_packages/apache_beam/runners/common.py:1582: in handle_process_outputs
    self._write_value_to_tag(tag, windowed_value, watermark_estimator)
../../own_packages/apache_beam/runners/common.py:1695: in _write_value_to_tag
    self.main_receivers.receive(windowed_value)
../../own_packages/apache_beam/runners/worker/operations.py:240: in receive
    self.consumer.process(windowed_value)
../../own_packages/apache_beam/runners/worker/operations.py:908: in process
    delayed_applications = self.dofn_runner.process(o)
../../own_packages/apache_beam/runners/common.py:1420: in process
    self._reraise_augmented(exn)
../../own_packages/apache_beam/runners/common.py:1508: in _reraise_augmented
    raise new_exn.with_traceback(tb)
../../own_packages/apache_beam/runners/common.py:1418: in process
    return self.do_fn_invoker.invoke_process(windowed_value)
../../own_packages/apache_beam/runners/common.py:625: in invoke_process
    windowed_value, self.process_method(windowed_value.value))
../../own_packages/apache_beam/runners/common.py:1582: in handle_process_outputs
    self._write_value_to_tag(tag, windowed_value, watermark_estimator)
../../own_packages/apache_beam/runners/common.py:1695: in _write_value_to_tag
    self.main_receivers.receive(windowed_value)
../../own_packages/apache_beam/runners/worker/operations.py:239: in receive
    self.update_counters_start(windowed_value)
../../own_packages/apache_beam/runners/worker/operations.py:198: in update_counters_start
    self.opcounter.update_from(windowed_value)
../../own_packages/apache_beam/runners/worker/opcounters.py:213: in update_from
    self.do_sample(windowed_value)
../../own_packages/apache_beam/runners/worker/opcounters.py:265: in do_sample
    self.coder_impl.get_estimated_size_and_observables(windowed_value))
../../own_packages/apache_beam/coders/coder_impl.py:1507: in get_estimated_size_and_observables
    value.value, nested=nested))
../../own_packages/apache_beam/coders/coder_impl.py:209: in get_estimated_size_and_observables
    return self.estimate_size(value, nested), []
../../own_packages/apache_beam/coders/coder_impl.py:248: in estimate_size
    self.encode_to_stream(value, out, nested)
../../own_packages/apache_beam/coders/coder_impl.py:1769: in encode_to_stream
    component_coder.encode_to_stream(attr, out, True)
../../own_packages/apache_beam/coders/coder_impl.py:1735: in encode_to_stream
    attrs = [getattr(value, name) for name in self.field_names]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

.0 = <list_iterator object at 0x7fe1f6b25610>

>   attrs = [getattr(value, name) for name in self.field_names]
E   AttributeError: 'list' object has no attribute 'nested_lv1_1' [while running 'QueryTable/ParDo(BeamSchemaConversionDoFn)']
jubebo commented 1 year ago

I also started a pull request, but I does not yet show up in the discussion - so here's the link.

svetakvsundhar commented 1 year ago

thanks-- I will leave comments both on your questions and the PR.

For starters, could we break down the changes into multiple PRs? Eg, can we start with the renaming function PR?

jubebo commented 1 year ago

Sure, I will break down the changes into multiple PRs. By "renaming functions PR" you refer to what I have described with "suggested to rename dict_of_tuples to field_names_and_types", correct?

jubebo commented 1 year ago

Have created the first PR as discussed: link

jubebo commented 1 year ago

First PR got merged into master - have just created the second PR as suggested.

jubebo commented 1 year ago

As discussed in the second PR, implementing these suggestions requires changes to beam core, which is why I was investigating alternative solutions for my problem. I have found that a simple call to

beam.io.ReadFromBigQuery(
   project="your-project",
   dataset="your-dataset",
   table="your-table"
   method=beam.io.ReadFromBigQuery.Method.DIRECT_READ
)

will solve the problems that I am currently facing, while still enabling me to parse nested data directly from BigQuery into Beam. From the documentation in Google Cloud DataFlow I also understand that manually parsing the output format can yield performance improvements compared to reading TableRow (or beam.Row) data types.

Please let me know if there are any major concers regarding this alternative approach, or if you identify additional improvements when following the suggestions of this feature request as discussed in the beginning of this conversation.