googleapis / python-bigquery-storage

Apache License 2.0
112 stars 44 forks source link

append_rows is not compatible with proto3 optional #818

Closed darrenwjones closed 3 weeks ago

darrenwjones commented 3 weeks ago

Continuation of this issue as it seems this fix is only applying to the AppendRowsStream which, to my understanding, is depreciated in favor of append_rows

Environment details

OS type and version: MacOS 14.6.1
Python version: 3.12
pip version: 24.2
google-cloud-bigquery-storage version: 2.25.0

Steps to reproduce

  1. When attempting to write rows using the append_rows function a server-side error will trigger if using a proto3 optional tag

Code example

write_client = bigquery_storage.BigQueryWriteClient()
table_path = f'projects/{project_id}/datasets/{dataset_id}/tables/{table_id}'
write_stream = f'{table_path}/streams/_default'

proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
pb2_descriptor.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor

# pb_rows being a list of ProtoMessage(s)
proto_rows = types.ProtoRows()
proto_rows.serialized_rows = [row.SerializeToString() for row in pb_rows]
request = types.AppendRowsRequest()
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
proto_data.rows = proto_rows
request.proto_rows = proto_data
request.write_stream = write_stream

write_client.append_rows(requests=iter([request]))

Stack trace

Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/streaming_framework/write/writer.py", line 112, in _write_to_bq
    response = writer.append_rows(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/streaming_framework/write/bq_writer.py", line 102, in append_rows
    return next(self.write_client.append_rows(requests=iter([request])))
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/google/cloud/bigquery_storage_v1/services/big_query_write/client.py", line 935, in append_rows
    response = rpc(
               ^^^^
  File "/usr/local/lib/python3.12/site-packages/google/api_core/gapic_v1/method.py", line 131, in __call__
    return wrapped_func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/google/api_core/retry/retry_unary.py", line 293, in retry_wrapped_func
    return retry_target(
           ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/google/api_core/retry/retry_unary.py", line 153, in retry_target
    _retry_error_helper(
  File "/usr/local/lib/python3.12/site-packages/google/api_core/retry/retry_base.py", line 212, in _retry_error_helper
    raise final_exc from source_exc
  File "/usr/local/lib/python3.12/site-packages/google/api_core/retry/retry_unary.py", line 144, in retry_target
    result = target()
             ^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/google/api_core/timeout.py", line 120, in func_with_timeout
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/google/api_core/grpc_helpers.py", line 174, in error_remapped_callable
    raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.InvalidArgument: 400 Invalid proto schema: BqMessage.proto: Message.messagebody.shipmentadvices.shipmentadvice.ExtnGiftIndicator: The [proto3_optional=true] option may only be set on proto3fields, not Message.messagebody.shipmentadvices.shipmentadvice.ExtnGiftIndicator
darrenwjones commented 3 weeks ago

Looking at the commit that was merged to fix AppendRowsStream, I tried to implement the same changes by adding the following code between copying the descriptor and setting the descriptor in the schema

pb2_descriptor.CopyToProto(proto_descriptor)
# ------ new code ------
for field in proto_descriptor.field:
    field.ClearField("oneof_index")
    field.ClearField("proto3_optional")
proto_descriptor.ClearField("oneof_decl")
# ------ end new code ------
proto_schema.proto_descriptor = proto_descriptor

I also tried adding the code after creating the AppendRowsRequest Although neither of these changes seem to fix my issue

darrenwjones commented 3 weeks ago

I fixed this issue when realizing my fields had nested fields and thus I wasn't clearing all the fields appropriately. After the CopyToProto I cleared all the fields of proto3 syntax by using a function like so

def _clear_proto3_in_descriptor(descriptor):
    for field in descriptor.field:
        field.ClearField("oneof_index")
        field.ClearField("proto3_optional")
    descriptor.ClearField("oneof_decl")
    for nested_type in descriptor.nested_type:
        _clear_proto3_in_descriptor(nested_type)