apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.76k stars 4.21k forks source link

[Bug]: Python JDBC IO Try To Connect RDB Before Deploying #23029

Open case-k-git opened 2 years ago

case-k-git commented 2 years ago

What happened?

When I tried to deploy python jdbc pipeline to dataflow from my local env, failed to deploy into dataflow and got connection error. seems to be python jdbc io trying to connect database from local env not only dataflow env.

I have checked connection and find trying to make connection from my pc.database can only accepting connection inside from dataflow net work so got connection error.

I have also checked java jdbc version and it worked fine. so python versions this behavior must be bug

class PostgresToBigQueryDataflow():

    def __init__(self):
        self._username = '<username>'
        self._password = '<password>'
        self._driver_class_name = 'org.postgresql.Driver'
        self._query = "select id from beam_table;"
        self._jdbc_url = 'jdbc:postgresql://<private_IP>:5432/beam'
        self._project = '<project id>'
        self._dataset = '<dataset>'
        self._table = '<table>'
        self._options = DebugOptions([
            "--runner=DataflowRunner",
            "--project=<project id>",
            "--job_name=<job name>",
            "--temp_location=gs://<project id>/tmp/",
            "--region=us-central1",
            "--experiments=use_runner_v2",
            "--subnetwork=regions/us-central1/subnetworks/<subnet>",
        ])
    def test(self):
        JdbcToBigQuery(self._username, self._password, self._driver_class_name, self._query, self._jdbc_url, self._project, self._dataset,self._table, self._options).run()

connection

Issue Priority

Priority: 2

Issue Component

Component: cross-language

Abacn commented 1 year ago

Could you please share the error message seen when deploying the pipeline to Dataflow?

I did some local test and see the following error when cannot connect to jdbc database:

INFO:apache_beam.utils.subprocess_server:WARNING: Configuration class 'org.apache.beam.sdk.extensions.schemaio.expansion.ExternalSchemaIOTransformRegistrar$Configuration' has no schema registered. Attempting to construct with setter approach.
Traceback (most recent call last):
  File "jdbcioTest.py", line 180, in <module>
    test_instance.run_read()
  File "jdbcioTest.py", line 157, in run_read
    p
  File "/Users/yathu/dev/virtualenv/py38beam/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py", line 1095, in __ror__
    return self.transform.__ror__(pvalueish, self.label)
  File "/Users/yathu/dev/virtualenv/py38beam/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py", line 617, in __ror__
    result = p.apply(self, pvalueish, label)
  File "/Users/yathu/dev/virtualenv/py38beam/lib/python3.8/site-packages/apache_beam/pipeline.py", line 663, in apply
    return self.apply(transform, pvalueish)
  File "/Users/yathu/dev/virtualenv/py38beam/lib/python3.8/site-packages/apache_beam/pipeline.py", line 709, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
  File "/Users/yathu/dev/virtualenv/py38beam/lib/python3.8/site-packages/apache_beam/runners/runner.py", line 185, in apply
    return m(transform, input, options)
  File "/Users/yathu/dev/virtualenv/py38beam/lib/python3.8/site-packages/apache_beam/runners/runner.py", line 215, in apply_PTransform
    return transform.expand(input)
  File "/Users/yathu/dev/virtualenv/py38beam/lib/python3.8/site-packages/apache_beam/transforms/external.py", line 526, in expand
    raise RuntimeError(response.error)
RuntimeError: org.apache.beam.sdk.io.jdbc.BeamSchemaInferenceException: Failed to infer Beam schema
    at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadRows.inferBeamSchema(JdbcIO.java:696)
    at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadRows.expand(JdbcIO.java:672)
    at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadRows.expand(JdbcIO.java:592)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
...

If this is also what you see, what happens is that the external transform is trying to infer schema by connecting to the database at pipeline expansion time, which happens only in external transform expansion service. Will investigate whether it is possible or how can avoid it.

Abacn commented 1 year ago

.remove-labels "awaiting triage"

github-actions[bot] commented 1 year ago

Label "awaiting cannot be managed because it does not exist in the repo. Please check your spelling.

Abacn commented 1 year ago

.remove-labels 'awaiting triage'

Abacn commented 1 year ago

Expansion service tries to get the schema by connecting the jdbc server. Using Java SDK does not go to the expansion service so it did not. However I agree that it is reasonable that could defer the process.

CC: @robertwb There was some discussion of defer the expansion service. It could benefit this use case if implemented. Or is there other solution?

mataralhawiti commented 3 months ago

I'm facing the same issue where it tries to infer schema during pipeline submission from local machine (which doesn't have access to DB server).