Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot use Storage API streaming writes due to SSL handshake error #368

Open
MiltiadisKoutsokeras opened this issue Nov 25, 2024 · 0 comments
Labels
bug Something isn't working

Comments

@MiltiadisKoutsokeras
Copy link

What happened?

I am trying to test the emulator in order to include it in our Integration Testing facilities. One of the scenarios we have in our codebase is the streaming of data via Storage Python API BigQueryWriteClient. The setup of the process requires to create a write stream via the following code:

from google.cloud import bigquery_storage

table_path = write_client.table_path(project=project_id, dataset=dataset_id, table=table_id)
write_client = bigquery_storage.BigQueryWriteClient(
    credentials=AnonymousCredentials(),
    client_options=ClientOptions(api_endpoint='<host>:9060')
)
write_stream = bigquery_storage.types.WriteStream()
write_stream.type_ = bigquery_storage.types.WriteStream.Type.PENDING
write_stream = write_client.create_write_stream(
    parent=table_path, write_stream=write_stream
)

The create_write_stream invocation throws an SSL exception:

E0000 00:00:1732552475.870920      24 ssl_transport_security.cc:1659] Handshake failed with fatal error SSL_ERROR_SSL: error:100000f7:SSL routines:OPENSSL_internal:WRONG_VERSION_NUMBER

What did you expect to happen?

I was expecting to have the write stream created for appending rows via Protocol Buffers.

How can we reproduce it (as minimally and precisely as possible)?

I have created a small Docker Compose file set in order to replicate it easily. Just copy the contents to the same filenames, put all files to a single directory and launch with:

docker compose up

Dockerfile

FROM python:3.9-bullseye

RUN mkdir /app
WORKDIR /app

RUN pip install --no-cache-dir db-dtypes
RUN pip install --no-cache-dir google-cloud-bigquery
RUN pip install --no-cache-dir google-cloud-bigquery-storage
RUN pip install --no-cache-dir pandas
RUN pip install --no-cache-dir protobuf==3.20.2

COPY emulator_test.py /app/emulator_test.py

CMD python3 -m emulator_test

compose.yaml

services:
  emulator_test:
    build:
      context: .
      dockerfile: Dockerfile
    image: emulator_test:local-testing
    platform: linux/amd64
    environment:
      BIGQUERY_EMULATOR_HOST: "http://google_bigquery:9050"
      BIGQUERY_STORAGE_EMULATOR_HOST: "google_bigquery:9060"
    depends_on:
      google_bigquery:
        required: true
        condition: service_started
        restart: true

  google_bigquery:
    image: ghcr.io/goccy/bigquery-emulator:latest
    platform: linux/amd64
    restart: unless-stopped
    ports:
      - "127.0.0.1:9050:9050/tcp"
      - "127.0.0.1:9060:9060/tcp"
    command:
      [
        "--host=google_bigquery",
        "--port=9050",
        "--grpc-port=9060",
        "--log-level=info",
        "--project=localtesting",
        "--dataset=localtesting_dataset"
      ]

emulator_test.py

"""Test BigQuery Emulator.
"""

from os import environ

from google.api_core.client_options import ClientOptions
from google.auth.credentials import AnonymousCredentials
from google.cloud import bigquery, bigquery_storage
from google.cloud.exceptions import Conflict

PROJECT_ID = 'localtesting'
DATASET_ID = 'localtesting_dataset'
TABLE_ID = 'localtesting_table'
TABLE_SCHEMA = {
    "Column_1": {
        "type": "STRING",
        "mode": "NULLABLE"
    },
    "Column_2": {
        "type": "BOOLEAN",
        "mode": "NULLABLE"
    },
    "Column_3": {
        "type": "INTEGER",
        "mode": "NULLABLE"
    }
}
BIGQUERY_EMULATOR_HOST = environ.get('BIGQUERY_EMULATOR_HOST')
BIGQUERY_STORAGE_EMULATOR_HOST = environ.get('BIGQUERY_STORAGE_EMULATOR_HOST')

print(f'BIGQUERY_EMULATOR_HOST: {BIGQUERY_EMULATOR_HOST}')
print(f'BIGQUERY_STORAGE_EMULATOR_HOST: {BIGQUERY_STORAGE_EMULATOR_HOST}')

client = bigquery.Client(
    PROJECT_ID, credentials=AnonymousCredentials(),
    client_options=ClientOptions(api_endpoint=BIGQUERY_EMULATOR_HOST))

tables = client.list_tables(DATASET_ID)
fully_qualified_table = f'{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}'
table_schema = []
for key, value in TABLE_SCHEMA.items():
    api_repr = {
        "name": key
    }
    api_repr.update(value)
    field = bigquery.SchemaField.from_api_repr(api_repr=api_repr)
    table_schema.append(field)
try:
    client.create_table(table=bigquery.Table(
        fully_qualified_table, schema=table_schema))
    print('Table created')
except Conflict as conflict:
    print('Table already exists')

print(f"Tables contained in '{DATASET_ID}':")
for table in tables:
    print(f'{table.project}.{table.dataset_id}.{table.table_id}')

insert = client.query(
    f"INSERT {fully_qualified_table}(Column_1, Column_2, Column_3) "
    f"VALUES ('Test 1', {True}, {1}), ('Test 2', {False}, {2})"
).result()

select = client.query(
    f'SELECT * FROM {fully_qualified_table}'
).result()
print(f'Result total rows: {select.total_rows}')
print('=======================================================================')
for row in select:
    print(f"| {row['Column_1']} | {row['Column_2']} | {row['Column_3']} |")
print('=======================================================================')

read_client = bigquery_storage.BigQueryReadClient(
    credentials=AnonymousCredentials(),
    client_options=ClientOptions(api_endpoint=BIGQUERY_STORAGE_EMULATOR_HOST))
dataframe = client.query(
    f'SELECT * FROM {fully_qualified_table}'
).to_dataframe(bqstorage_client=read_client)
print(f'Result Dataframe rows: {dataframe.shape[0]}')
print('=======================================================================')
print(f'{dataframe}')
print('=======================================================================')

write_client = bigquery_storage.BigQueryWriteClient(
    credentials=AnonymousCredentials(),
    client_options=ClientOptions(api_endpoint=BIGQUERY_STORAGE_EMULATOR_HOST))
print(f'Storage write transport: {write_client.transport.kind}')
write_stream = bigquery_storage.types.WriteStream()
write_stream.type_ = bigquery_storage.types.WriteStream.Type.PENDING
# FAILS: write_client.create_write_stream
# Handshake failed with fatal error SSL_ERROR_SSL: error:100000f7:SSL routines:OPENSSL_internal:WRONG_VERSION_NUMBER
write_stream = write_client.create_write_stream(
    parent=fully_qualified_table, write_stream=write_stream
)
stream_name = write_stream.name
print(f'Storage write stream: {stream_name}')

print('Finished!')

Anything else we need to know?

I am using the following environment:

OS: Debian 11
Python: 3.9.2
Docker: 20.10.5+dfsg1, build 55c4c88
Compose: v2.23.0

@MiltiadisKoutsokeras MiltiadisKoutsokeras added the bug Something isn't working label Nov 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant