apache / arrow

Apache Arrow is the universal columnar format and multi-language toolbox for fast data interchange and in-memory analytics
https://arrow.apache.org/
Apache License 2.0
14.64k stars 3.56k forks source link

[Python][BigQuery][Parquet] BigQuery Interprets Simple List Fields in Parquet Files as RECORD Types with REPEATED Mode #44683

Closed kcullimore closed 1 week ago

kcullimore commented 2 weeks ago

Describe the enhancement requested

I’m not sure if the behavior described below is expected and I'm just missing something or is a bug.

When uploading a Parquet file created with PyArrow to Google BigQuery, columns containing simple lists (e.g., List[str], List[int], List[float]) are interpreted by BigQuery as RECORD types with REPEATED mode instead of the expected primitive types (STRING, INTEGER, FLOAT) with REPEATED mode.

The example input schema is:

id: int64
int_column: list<item: int64>
  child 0, item: int64
str_column: list<item: string>
  child 0, item: string
float_column: list<item: double>
  child 0, item: double

After uploading to a BigQuery table via a parquet file it returns the following schema (after querying and converting back to an arrow table):

id: int64
int_column: struct<list: list<item: struct<element: int64>> not null>
  child 0, list: list<item: struct<element: int64>> not null
      child 0, item: struct<element: int64>
          child 0, element: int64
str_column: struct<list: list<item: struct<element: string>> not null>
  child 0, list: list<item: struct<element: string>> not null
      child 0, item: struct<element: string>
          child 0, element: string
float_column: struct<list: list<item: struct<element: double>> not null>
  child 0, list: list<item: struct<element: double>> not null
      child 0, item: struct<element: double>
          child 0, element: double

I've tried explicitly defining the schema in BigQuery and ensuring that the Parquet file’s schema matches but the behavior persists.

I have an alternative workaround in mind (via JSON) but would prefer to continue using PyArrow and parquet.

Example Code

To reproduce create a Parquet file using PyArrow that includes some columns with lists of integers, strings, and floats. Upload this Parquet file to BigQuery via a bucket and inspect the table schema and field values.

I would expect BigQuery to recognize the int_column, str_column, and float_column as arrays of integers, strings, and floats respectively (with REPEATED mode). However, it interprets these columns as RECORD types with REPEATED mode which complicates the data handling.

import os
import pyarrow as pa
import pyarrow.parquet as pq
from google.cloud import bigquery, storage

# file
sample_filename = 'sample_data.parquet'
sample_filepath = f'{sample_filename}'

# Create mock data
data = {
    'id': [0, 1, 2, 3],
    'int_column': [[1], [2, 2], [], [999]],
    'str_column': [['a', 'aa'], ['b'], [], ['alpha']],
    'float_column': [[1.1], [2.2, 3.30], [], [9.029]]
}
schema = pa.schema([
    pa.field('id', pa.int64()),
    pa.field('int_column', pa.list_(pa.int64())),
    pa.field('str_column', pa.list_(pa.string())),
    pa.field('float_column', pa.list_(pa.float64())),
])
table = pa.Table.from_pydict(data, schema=schema)
print(table.schema)
"""
id: int64
int_column: list<item: int64>
  child 0, item: int64
str_column: list<item: string>
  child 0, item: string
float_column: list<item: double>
  child 0, item: double
"""

# Write and read from parquet file
pq.write_table(table, sample_filepath)
imported_table = pq.read_table(sample_filepath)
print(imported_table.schema)

# Upload to bucket
bucket_name = 'bucket_name'
blob_uri = f'gs://{bucket_name}/{sample_filename}'
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(sample_filename)
blob.upload_from_filename(sample_filepath)

# Upload to BigQuery table
dataset_id = 'dataset_id'
table_id = 'table_id'
bq_client = bigquery.Client()
table_ref = bq_client.dataset(dataset_id).table(table_id)
job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.PARQUET,
    write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
)
load_job = bq_client.load_table_from_uri(
    blob_uri,
    table_ref,
    job_config=job_config,
)
load_job.result()

# Review BQ table schema
loaded_table = bq_client.get_table(table_ref)
print(loaded_table.schema)
"""
[SchemaField('id', 'INTEGER', 'NULLABLE', None, None, (), None), SchemaField('int_column', 'RECORD', 'NULLABLE', None, None, (SchemaField('list', 'RECORD', 'REPEATED', None, None, (SchemaField('element', 'INTEGER', 'NULLABLE', None, None, (), None),), None),), None), SchemaField('str_column', 'RECORD', 'NULLABLE', None, None, (SchemaField('list', 'RECORD', 'REPEATED', None, None, (SchemaField('element', 'STRING', 'NULLABLE', None, None, (), None),), None),), None), SchemaField('float_column', 'RECORD', 'NULLABLE', None, None, (SchemaField('list', 'RECORD', 'REPEATED', None, None, (SchemaField('element', 'FLOAT', 'NULLABLE', None, None, (), None),), None),), None)]
"""

# Review BQ table data
query = f'SELECT * FROM `{dataset_id}.{table_id}`'
query_job = bq_client.query(query)
bq_table = query_job.result().to_arrow()
print(bq_table.schema())
"""
id: int64
int_column: struct<list: list<item: struct<element: int64>> not null>
  child 0, list: list<item: struct<element: int64>> not null
      child 0, item: struct<element: int64>
          child 0, element: int64
str_column: struct<list: list<item: struct<element: string>> not null>
  child 0, list: list<item: struct<element: string>> not null
      child 0, item: struct<element: string>
          child 0, element: string
float_column: struct<list: list<item: struct<element: double>> not null>
  child 0, list: list<item: struct<element: double>> not null
      child 0, item: struct<element: double>
          child 0, element: double
"""

# Optional job_config to verify enforcing schema does not help
bq_client.delete_table(table_ref)
bq_schema = [
    bigquery.SchemaField('id', 'INTEGER', mode='NULLABLE'),
    bigquery.SchemaField('int_column', 'INTEGER', mode='REPEATED'),
    bigquery.SchemaField('str_column', 'STRING', mode='REPEATED'),
    bigquery.SchemaField('float_column', 'FLOAT', mode='REPEATED'),
]
job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.PARQUET,
    write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
    schema=bq_schema,
    autodetect=False,
)
load_job = bq_client.load_table_from_uri(
    blob_uri,
    table_ref,
    job_config=job_config,
)
load_job.result()
loaded_table = bq_client.get_table(table_ref)
print(loaded_table.schema)
"""
[SchemaField('id', 'INTEGER', 'NULLABLE', None, None, (), None), SchemaField('int_column', 'INTEGER', 'REPEATED', None, None, (), None), SchemaField('str_column', 'STRING', 'REPEATED', None, None, (), None), SchemaField('float_column', 'FLOAT', 'REPEATED', None, None, (), None)]
"""

query = f'SELECT * FROM `{dataset_id}.{table_id}`'
query_job = bq_client.query(query)
print(query_job.result().to_arrow())
"""
pyarrow.Table
id: int64
int_column: list<item: string> not null
  child 0, item: string
str_column: list<item: string> not null
  child 0, item: string
float_column: list<item: double> not null
  child 0, item: double
----
id: [[0,1,2,3]]
int_column: [[[],[],[],[]]]
str_column: [[[],[],[],[]]]
float_column: [[[],[],[],[]]]
"""

# Clear data
os.remove(sample_filepath)
blob.delete()
bq_client.delete_table(table_ref)

Environment:

• Python 3.11.10 • Ubuntu 22.04.5 • pyarrow==18.0.0 • google-cloud-bigquery==3.26.0 • google-cloud-storage==2.18.2

Component(s)

Python

mapleFU commented 1 week ago

I think this same as: https://github.com/apache/arrow/issues/43908

kcullimore commented 1 week ago

Thanks @mapleFU,

I do see the similar issue being discussed further down in the comments. I've switched to using JSON for my immediate needs.

Closing this since its a duplicate of the evolving #43908 issue and python-bigquery-#2008 issue.