goccy / bigquery-emulator

BigQuery emulator server implemented in Go
MIT License
845 stars 108 forks source link

Cannot use Storage API streaming writes due to SSL handshake error #368

Open MiltiadisKoutsokeras opened 4 days ago

MiltiadisKoutsokeras commented 4 days ago

What happened?

I am trying to test the emulator in order to include it in our Integration Testing facilities. One of the scenarios we have in our codebase is the streaming of data via Storage Python API BigQueryWriteClient. The setup of the process requires to create a write stream via the following code:

from google.cloud import bigquery_storage

table_path = write_client.table_path(project=project_id, dataset=dataset_id, table=table_id)
write_client = bigquery_storage.BigQueryWriteClient(
    credentials=AnonymousCredentials(),
    client_options=ClientOptions(api_endpoint='<host>:9060')
)
write_stream = bigquery_storage.types.WriteStream()
write_stream.type_ = bigquery_storage.types.WriteStream.Type.PENDING
write_stream = write_client.create_write_stream(
    parent=table_path, write_stream=write_stream
)

The create_write_stream invocation throws an SSL exception:

E0000 00:00:1732552475.870920      24 ssl_transport_security.cc:1659] Handshake failed with fatal error SSL_ERROR_SSL: error:100000f7:SSL routines:OPENSSL_internal:WRONG_VERSION_NUMBER

What did you expect to happen?

I was expecting to have the write stream created for appending rows via Protocol Buffers.

How can we reproduce it (as minimally and precisely as possible)?

I have created a small Docker Compose file set in order to replicate it easily. Just copy the contents to the same filenames, put all files to a single directory and launch with:

docker compose up

Dockerfile

FROM python:3.9-bullseye

RUN mkdir /app
WORKDIR /app

RUN pip install --no-cache-dir db-dtypes
RUN pip install --no-cache-dir google-cloud-bigquery
RUN pip install --no-cache-dir google-cloud-bigquery-storage
RUN pip install --no-cache-dir pandas
RUN pip install --no-cache-dir protobuf==3.20.2

COPY emulator_test.py /app/emulator_test.py

CMD python3 -m emulator_test

compose.yaml

services:
  emulator_test:
    build:
      context: .
      dockerfile: Dockerfile
    image: emulator_test:local-testing
    platform: linux/amd64
    environment:
      BIGQUERY_EMULATOR_HOST: "http://google_bigquery:9050"
      BIGQUERY_STORAGE_EMULATOR_HOST: "google_bigquery:9060"
    depends_on:
      google_bigquery:
        required: true
        condition: service_started
        restart: true

  google_bigquery:
    image: ghcr.io/goccy/bigquery-emulator:latest
    platform: linux/amd64
    restart: unless-stopped
    ports:
      - "127.0.0.1:9050:9050/tcp"
      - "127.0.0.1:9060:9060/tcp"
    command:
      [
        "--host=google_bigquery",
        "--port=9050",
        "--grpc-port=9060",
        "--log-level=info",
        "--project=localtesting",
        "--dataset=localtesting_dataset"
      ]

emulator_test.py

"""Test BigQuery Emulator.
"""

from os import environ

from google.api_core.client_options import ClientOptions
from google.auth.credentials import AnonymousCredentials
from google.cloud import bigquery, bigquery_storage
from google.cloud.exceptions import Conflict

PROJECT_ID = 'localtesting'
DATASET_ID = 'localtesting_dataset'
TABLE_ID = 'localtesting_table'
TABLE_SCHEMA = {
    "Column_1": {
        "type": "STRING",
        "mode": "NULLABLE"
    },
    "Column_2": {
        "type": "BOOLEAN",
        "mode": "NULLABLE"
    },
    "Column_3": {
        "type": "INTEGER",
        "mode": "NULLABLE"
    }
}
BIGQUERY_EMULATOR_HOST = environ.get('BIGQUERY_EMULATOR_HOST')
BIGQUERY_STORAGE_EMULATOR_HOST = environ.get('BIGQUERY_STORAGE_EMULATOR_HOST')

print(f'BIGQUERY_EMULATOR_HOST: {BIGQUERY_EMULATOR_HOST}')
print(f'BIGQUERY_STORAGE_EMULATOR_HOST: {BIGQUERY_STORAGE_EMULATOR_HOST}')

client = bigquery.Client(
    PROJECT_ID, credentials=AnonymousCredentials(),
    client_options=ClientOptions(api_endpoint=BIGQUERY_EMULATOR_HOST))

tables = client.list_tables(DATASET_ID)
fully_qualified_table = f'{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}'
table_schema = []
for key, value in TABLE_SCHEMA.items():
    api_repr = {
        "name": key
    }
    api_repr.update(value)
    field = bigquery.SchemaField.from_api_repr(api_repr=api_repr)
    table_schema.append(field)
try:
    client.create_table(table=bigquery.Table(
        fully_qualified_table, schema=table_schema))
    print('Table created')
except Conflict as conflict:
    print('Table already exists')

print(f"Tables contained in '{DATASET_ID}':")
for table in tables:
    print(f'{table.project}.{table.dataset_id}.{table.table_id}')

insert = client.query(
    f"INSERT {fully_qualified_table}(Column_1, Column_2, Column_3) "
    f"VALUES ('Test 1', {True}, {1}), ('Test 2', {False}, {2})"
).result()

select = client.query(
    f'SELECT * FROM {fully_qualified_table}'
).result()
print(f'Result total rows: {select.total_rows}')
print('=======================================================================')
for row in select:
    print(f"| {row['Column_1']} | {row['Column_2']} | {row['Column_3']} |")
print('=======================================================================')

read_client = bigquery_storage.BigQueryReadClient(
    credentials=AnonymousCredentials(),
    client_options=ClientOptions(api_endpoint=BIGQUERY_STORAGE_EMULATOR_HOST))
dataframe = client.query(
    f'SELECT * FROM {fully_qualified_table}'
).to_dataframe(bqstorage_client=read_client)
print(f'Result Dataframe rows: {dataframe.shape[0]}')
print('=======================================================================')
print(f'{dataframe}')
print('=======================================================================')

write_client = bigquery_storage.BigQueryWriteClient(
    credentials=AnonymousCredentials(),
    client_options=ClientOptions(api_endpoint=BIGQUERY_STORAGE_EMULATOR_HOST))
print(f'Storage write transport: {write_client.transport.kind}')
write_stream = bigquery_storage.types.WriteStream()
write_stream.type_ = bigquery_storage.types.WriteStream.Type.PENDING
# FAILS: write_client.create_write_stream
# Handshake failed with fatal error SSL_ERROR_SSL: error:100000f7:SSL routines:OPENSSL_internal:WRONG_VERSION_NUMBER
write_stream = write_client.create_write_stream(
    parent=fully_qualified_table, write_stream=write_stream
)
stream_name = write_stream.name
print(f'Storage write stream: {stream_name}')

print('Finished!')

Anything else we need to know?

I am using the following environment:

OS: Debian 11 Python: 3.9.2 Docker:20.10.5+dfsg1, build 55c4c88 Compose: v2.23.0