Closed danvau7 closed 7 months ago
Hi @danvau7 I can take a look at this for you π Thanks for raising the issue.
Do you have a requirements.txt
that you can share the contents of? Curious which version of SQLAlchemy your job is using?
Also as you mentioned the direct connection works for you which means the issue may be in SQLAlchemy or the Python Connector.
Depending on your use-case and since it looks like you are not using IAM DB AuthN (one of the advantages of the Python Connector) you may be fine just using the direct connection like you pointed out (it will be more performant than the Python Connector). If so you can add require SSL connections with the direct connection and still manage a secure connection.
pool = create_async_engine(
"postgresql+asyncpg://test:xyzxyzxyzxyz@100.x.y.z/databasename",
connect_args={"ssl": "require"},
)
Only slight downside to the above is that you are no longer getting an mTLS connection with the client cert verification that the Python Connector provides. However, most of the time users are okay with this for Private IP connections and SSL set to required.
I'll take a look at finding the cause of the Python Connector bug once I get your SQLAlchemy version to reproduce with.
Thanks again π
Hey @jackwotherspoon:
Actually we do want to use (really want to use in fact and are required to at some point lol) the IAM DB AuthN part but I had to drop it for the time being to figure out a way to get this all to work.
When I looked at the SQLAlchemy version originally on the Runner, it was not good ... version 1.4.50~ (see below).
I did manually bump it in the requirements.txt
file to sqlalchemy==2.0.19
but that seemed to have no effect and the error code I got back was the same when I reran it a few minutes ago. Perhaps one of the other sqlalchemy libraries I see listed?
I did manage to get a print out of every Python library on the Runner though and the version number though. This might help (this job ran a couple of hours ago so the SQLAlchemy version is going to still say 1.4.50~ heads up).
Have a great weekend, and let me know if you see anything on these version numbers that strikes you as odd, or if any insight in particular comes your way. Issues with version numbers could definitely be the case and that would explain why it would work locally but not on Dataflow. Thanks so much π₯
absl-py==2.1.0
agate==1.6.3
aiodebug==2.3.0
aiofiles==23.2.1
aiohttp==3.9.1
aiosignal==1.3.1
alembic==1.13.1
amqp==5.2.0
anyio==4.2.0
apache-airflow-providers-apache-beam==5.5.0
apache-airflow-providers-cncf-kubernetes==7.13.0
apache-airflow-providers-common-sql==1.10.0
apache-airflow-providers-dbt-cloud==3.5.1
apache-airflow-providers-ftp==3.7.0
apache-airflow-providers-google==10.14.0
apache-airflow-providers-hashicorp==3.6.1
apache-airflow-providers-http==4.8.0
apache-airflow-providers-imap==3.5.0
apache-airflow-providers-mysql==5.5.1
apache-airflow-providers-postgres==5.10.0
apache-airflow-providers-sendgrid==3.4.0
apache-airflow-providers-sqlite==3.7.0
apache-airflow-providers-ssh==3.10.0
apache-airflow==2.6.3+composer
apache-beam==2.53.0
apispec==5.2.2
appdirs==1.4.4
argcomplete==3.2.1
asgiref==3.7.2
asn1crypto==1.5.1
astunparse==1.6.3
async-timeout==4.0.3
asyncio==3.4.3
asyncpg==0.29.0
attrs==23.2.0
babel==2.14.0
backoff==2.2.1
bcrypt==4.1.2
beautifulsoup4==4.12.3
billiard==4.2.0
blinker==1.7.0
cachecontrol==0.13.1
cachelib==0.9.0
cachetools==5.3.2
cattrs==23.2.3
celery==5.3.6
certifi==2023.11.17
cffi==1.16.0
chardet==5.2.0
charset-normalizer==3.3.2
click-didyoumean==0.3.0
click-plugins==1.1.1
click-repl==0.3.0
click==8.1.3
clickclick==20.10.2
cloud-sql-python-connector==1.8.0
cloudpickle==2.2.1
colorama==0.4.6
colorlog==4.8.0
configupdater==3.2
connexion==2.14.2
crcmod==1.7
cron-descriptor==1.4.0
croniter==2.0.1
cryptography==42.0.5
db-dtypes==1.2.0
dbt-bigquery==1.5.4
dbt-core==1.5.4
dbt-extractor==0.4.1
decorator==5.1.1
deprecated==1.2.14
diff-cover==8.0.3
dill==0.3.1.1
distlib==0.3.8
dnspython==2.5.0
docopt==0.6.2
docutils==0.20.1
email-validator==1.3.1
fastavro==1.9.3
fasteners==0.19
filelock==3.13.1
firebase-admin==6.3.0
flask-appbuilder==4.3.1
flask-babel==2.0.0
flask-bcrypt==1.0.1
flask-caching==2.1.0
flask-jwt-extended==4.6.0
flask-limiter==3.5.0
flask-login==0.6.3
flask-session==0.5.0
flask-sqlalchemy==2.5.1
flask-wtf==1.2.1
flask==2.2.5
flatbuffers==23.5.26
flower==2.0.1
frozenlist==1.4.1
fsspec==2023.12.2
future==0.18.3
gast==0.5.4
gcloud-aio-auth==4.2.3
gcloud-aio-bigquery==7.0.0
gcloud-aio-storage==9.0.0
gcsfs==2023.12.2.post1
google-ads==22.1.0
google-analytics-admin==0.22.5
google-api-core==2.15.0
google-api-python-client==2.114.0
google-apitools==0.5.32
google-auth-httplib2==0.2.0
google-auth-oauthlib==1.2.0
google-auth==2.26.2
google-cloud-access-context-manager==0.1.16
google-cloud-aiplatform==1.39.0
google-cloud-appengine-logging==1.4.0
google-cloud-asset==3.23.0
google-cloud-audit-log==0.2.5
google-cloud-automl==2.12.0
google-cloud-batch==0.17.8
google-cloud-bigquery-datatransfer==3.13.0
google-cloud-bigquery-storage==2.24.0
google-cloud-bigquery==3.16.0
google-cloud-bigtable==2.22.0
google-cloud-build==3.22.0
google-cloud-common==1.3.0
google-cloud-compute==1.15.0
google-cloud-container==2.38.0
google-cloud-core==2.4.1
google-cloud-datacatalog-lineage-producer-client==0.1.0
google-cloud-datacatalog-lineage==0.3.1
google-cloud-datacatalog==3.17.2
google-cloud-dataflow-client==0.8.6
google-cloud-dataform==0.5.5
google-cloud-dataplex==1.11.0
google-cloud-dataproc-metastore==1.14.0
google-cloud-dataproc==5.8.0
google-cloud-datastore==2.19.0
google-cloud-discoveryengine==0.11.10
google-cloud-dlp==3.14.0
google-cloud-documentai==2.21.1
google-cloud-filestore==1.8.0
google-cloud-firestore==2.14.0
google-cloud-kms==2.20.0
google-cloud-language==2.12.0
google-cloud-logging==3.9.0
google-cloud-memcache==1.8.0
google-cloud-monitoring==2.18.0
google-cloud-orchestration-airflow==1.10.0
google-cloud-org-policy==1.10.0
google-cloud-os-config==1.16.0
google-cloud-os-login==2.13.0
google-cloud-pubsub==2.19.0
google-cloud-pubsublite==0.6.1
google-cloud-redis==2.14.0
google-cloud-resource-manager==1.11.0
google-cloud-run==0.10.1
google-cloud-secret-manager==2.17.0
google-cloud-spanner==3.41.0
google-cloud-speech==2.23.0
google-cloud-storage-transfer==1.10.0
google-cloud-storage==2.14.0
google-cloud-tasks==2.15.1
google-cloud-texttospeech==2.15.1
google-cloud-translate==3.14.0
google-cloud-videointelligence==2.12.0
google-cloud-vision==3.5.0
google-cloud-workflows==1.13.0
google-crc32c==1.5.0
google-pasta==0.2.0
google-re2==1.1
google-resumable-media==2.7.0
googleapis-common-protos==1.62.0
graphviz==0.20.1
greenlet==3.0.3
grpc-google-iam-v1==0.13.0
grpcio-gcp==0.2.2
grpcio-status==1.60.0
grpcio==1.60.0
gunicorn==21.2.0
h11==0.14.0
h5py==3.10.0
hdfs==2.7.3
hologram==0.0.16
httpcore==1.0.2
httplib2==0.22.0
httpx==0.26.0
humanize==4.9.0
hvac==2.1.0
idna==3.6
importlib-metadata==7.0.1
importlib-resources==6.1.1
inflection==0.5.1
iniconfig==2.0.0
isodate==0.6.1
itsdangerous==2.1.2
jaraco.classes==3.3.0
jeepney==0.8.0
jinja2==3.1.2
js2py==0.74
json-merge-patch==0.2
jsonpickle==3.0.2
jsonschema-specifications==2023.12.1
jsonschema==4.21.1
keras==2.15.0
keyring==24.3.0
keyrings.google-artifactregistry-auth==1.1.2
kombu==5.3.5
kubernetes-asyncio==24.2.3
kubernetes==23.6.0
lazy-object-proxy==1.10.0
leather==0.3.4
libclang==16.0.6
limits==3.7.0
linkify-it-py==2.0.2
lockfile==0.12.2
logbook==1.5.3
looker-sdk==23.20.1
mako==1.3.1
markdown-it-py==3.0.0
markdown==3.5.2
markupsafe==2.1.4
marshmallow-enum==1.5.1
marshmallow-oneofschema==3.1.0
marshmallow-sqlalchemy==0.26.1
marshmallow==3.20.2
mashumaro==3.6
mdit-py-plugins==0.4.0
mdurl==0.1.2
minimal-snowplow-tracker==0.0.2
ml-dtypes==0.2.0
more-itertools==10.2.0
msal==1.28.0
msgpack==1.0.7
multidict==6.0.4
mysql-connector-python==8.3.0
mysqlclient==2.2.1
networkx==2.8.8
numpy==1.24.4
oauth2client==4.1.3
oauthlib==3.2.2
objsize==0.6.1
opt-einsum==3.3.0
ordered-set==4.1.0
orjson==3.9.12
overrides==6.5.0
packaging==23.2
pandas-gbq==0.20.0
pandas==2.2.0
paramiko==3.4.0
parsedatetime==2.4
pathspec==0.9.0
pendulum==2.1.2
pg8000==1.31.1
pip==24.0
pipdeptree==2.13.2
platformdirs==4.1.0
pluggy==1.3.0
prison==0.2.1
prometheus-client==0.19.0
prompt-toolkit==3.0.43
proto-plus==1.23.0
protobuf==4.22.5
psutil==5.9.8
psycopg2-binary==2.9.9
pyarrow-hotfix==0.6
pyarrow==14.0.2
pyasn1-modules==0.3.0
pyasn1==0.5.1
pycparser==2.21
pydantic==1.10.14
pydata-google-auth==1.8.2
pydot==1.4.2
pygments==2.17.2
pyjsparser==2.7.1
pyjwt==2.8.0
pymongo==4.6.1
pynacl==1.5.0
pyopenssl==24.0.0
pyparsing==3.1.1
pytest==7.4.4
python-daemon==3.0.1
python-dateutil==2.8.2
python-http-client==3.3.7
python-nvd3==0.15.0
python-slugify==8.0.1
pytimeparse==1.1.8
pytz==2023.3.post1
pytzdata==2020.1
pyyaml==6.0.1
redis==3.5.3
referencing==0.32.1
regex==2023.12.25
requests-oauthlib==1.3.1
requests-toolbelt==1.0.0
requests==2.31.0
rfc3339-validator==0.1.4
rich-argparse==1.4.0
rich==13.7.0
rpds-py==0.17.1
rsa==4.9
scramp==1.4.4
secretstorage==3.3.3
sendgrid==6.11.0
setproctitle==1.3.3
setuptools==69.1.1
shapely==2.0.2
six==1.16.0
sniffio==1.3.0
soupsieve==2.5
sqlalchemy-bigquery==1.9.0
sqlalchemy-jsonfield==1.0.2
sqlalchemy-spanner==1.6.2
sqlalchemy-utils==0.41.1
sqlalchemy==1.4.51
sqlfluff==2.3.5
sqllineage==1.4.9
sqlparse==0.4.4
sshtunnel==0.4.0
starkbank-ecdsa==2.2.0
statsd==4.0.1
tabulate==0.9.0
tblib==3.0.0
tenacity==8.2.3
tensorboard-data-server==0.7.2
tensorboard==2.15.1
tensorflow-estimator==2.15.0
tensorflow-io-gcs-filesystem==0.35.0
tensorflow==2.15.0.post1
termcolor==2.4.0
text-unidecode==1.3
tornado==6.4
tqdm==4.66.1
typing-extensions==4.9.0
tzdata==2023.4
tzlocal==5.2
uc-micro-py==1.0.2
unicodecsv==0.14.1
uritemplate==4.1.1
urllib3==2.1.0
vine==5.1.0
virtualenv==20.25.0
wcwidth==0.2.13
websocket-client==1.7.0
werkzeug==2.2.3
wheel==0.42.0
wrapt==1.14.1
wtforms==3.1.2
yarl==1.9.4
zipp==3.17.0
zstandard==0.22.0
@danvau7 Yes my first thought was the SQLAlchemy version. As I added support for async_creator
in v2.0.16 of SQLAlchemy. So the error you are seeing would make sense for v1.X.X versions of SQLAlchemy.
So v2.0.19 should work with the async driver. Can you verify that the new version was successfully installed? Sometimes Dataflow can have dependency constraints and cause the deps to not be able to install later versions. You could try adding print(sqlalchemy.__version__)
(which i think should work) to see what version is running.
Also there is a way either with SQLAlchemy or with asyncpg directly to get support for IAM DB AuthN without needing the Python Connector. It looks something like this using the do_connect
event listener of SQLAlchemy:
from datetime import datetime
import os
import google.auth
from google.auth.credentials import Credentials
from google.auth.transport.requests import Request
import sqlalchemy
from sqlalchemy import event
from sqlalchemy.ext.asyncio import create_async_engine
def create_sqlalchemy_engine(
ip_address: str,
user: str,
db_name: str,
) -> sqlalchemy.ext.asyncio.engine.AsyncEngine:
"""Creates a SQLAlchemy connection pool for a Cloud SQL instance configured
using asyncpg.
Callers are responsible for closing the pool. This implementation uses a
direct TCP connection with IAM database authentication and not
the Cloud SQL Python Connector.
A sample invocation looks like:
engine = create_sqlalchemy_engine(
ip_address,
user,
db,
)
async with engine.connect() as conn:
result = await conn.execute(sqlalchemy.text("SELECT NOW()"))
conn.commit()
time = result.fetchone()
Args:
ip_address (str):
The IP address of the Cloud SQL instance, e.g., 10.0.0.1
user (str):
The formatted IAM database username.
e.g., my-email@test.com, service-account@project-id.iam
db_name (str):
The name of the database, e.g., mydb
"""
# initialize Google Auth credentials
credentials, _ = google.auth.default(
scopes=["https://www.googleapis.com/auth/sqlservice.login"]
)
def get_authentication_token(credentials: Credentials) -> str:
"""Get OAuth2 access token to be used for IAM database authentication"""
# refresh credentials if expired
if not credentials.valid:
request = Request()
credentials.refresh(request)
return credentials.token
engine = create_async_engine(
# Equivalent URL:
# postgresql+asyncpg://<user>:empty@<host>:5432/<db_name>
sqlalchemy.engine.url.URL.create(
drivername="postgresql+asyncpg",
username=user, # your IAM db user, e.g. service-account@project-id.iam
password="", # placeholder to be replaced with OAuth2 token
host=ip_address, # your Cloud SQL instance IP address
port=5432,
database=db_name, # your database name
),
# Because this connection uses an OAuth2 token as a password, you must
# require SSL, or better, enforce all clients speak SSL on the server
# side. This ensures the OAuth2 token is not inadvertantly leaked.
connect_args={"ssl": "require"},
)
# set 'do_connect' event listener to replace password with OAuth2 token
# must use engine.sync_engine as async events are not implemented
@event.listens_for(engine.sync_engine, "do_connect")
def auto_iam_authentication(dialect, conn_rec, cargs, cparams) -> None:
cparams["password"] = get_authentication_token(credentials)
return engine
async def main() -> None:
"""Basic test to get time from database using asyncpg with SQLAlchemy."""
engine = create_sqlalchemy_engine("10.0.0.1", "my-user@gmail.com", "my-db")
# use connection from connection pool to query AlloyDB database
async with engine.connect() as conn:
result = await conn.execute(sqlalchemy.text("SELECT NOW()"))
time = result.fetchone()
print("Current time is ", time[0])
curr_time = time[0]
assert type(curr_time) is datetime
# cleanup AsyncEngine
await engine.dispose()
Might work for your use-case but I would double check your SQLAlchemy version first as I still think that might be the source of your error. I can investigate further if it turns out that the SQLAlchemy version is not the issue.
Hey @jackwotherspoon you were exactly right. The version for SQLAlchemy, which I am exporting from the Dataflow job via Google Cloud Logging, is still coming across as 1.4.50.
This is pretty surprising, as we've had issues with packages before, and actually install them twice. Once view the Airflow DAG pipeline options, and again via a requirements.txt
file that we source in the Beam Pipeline Operator. In both cases I pin it at sqlalchemy==2.0.19
as mentioned above.
Either we are doing something wrong or its a Dataflow bug. I'll get back with the team, and also escalate with Google Support.
When I find out what the solution is, I'll make sure to post here in case others run into the same issue regarding unexpected versions of SQLAlchemy π
@danvau7 Yes pls do update us all here once you find either the bug or how to mitigate the issue! π
Cloud SQL + Dataflow is an experience I want to work on improving and making more seamless in the near future as I am seeing more and more users adopting Dataflow. So if you have any general friction points or areas of confusion feel free to drop them here and I'll pass them on to the appropriate teams!
@jackwotherspoon Hey it is more difficult than we anticipated getting SQLAlchemy to work on Dataflow. Both Dataflow and Composer (from where the Dataflow jobs are being kicked off) have quite low versions of SQLAlchemy that are troublesome to install over. We might be able to get around this.
In the meantime, I noticed the following couple of lines from the documentation for Cloud SQL Python Connector:
The Connector itself creates connection objects by calling its connect method but does not manage database connection pooling. For this reason, it is recommended to use the connector alongside a library that can create connection pools, such as SQLAlchemy
QUESTION: Do you have any code examples lying around of creating this connection that bypasses SQLAlchemy entirely? I am aware we won't have the ability to manage connection pooling, but we are only grabbing data once via a single query that snags about 2k rows. If not no worries, but I did want to ask.
Will continue to post here as I find out from others about possibilities of making Dataflow -> CloudSQL more seamless.
@danvau7 Yes pls do update us all here once you find either the bug or how to mitigate the issue! π
Cloud SQL + Dataflow is an experience I want to work on improving and making more seamless in the near future as I am seeing more and more users adopting Dataflow. So if you have any general friction points or areas of confusion feel free to drop them here and I'll pass them on to the appropriate teams!
Hey @jackwotherspoon , have we thought about adding Cloud SQL (specifically PSQL) as an official Beam I/O Connector? I feel like that would make it the most seamless. Note that most other customers I have seen use Cloud SQL leverage the current jdbcio connector (but it has its own limitations).
@FearTheMonstar I had not given that too much thought but it sounds like a very good idea!
Do you want to open up a new issue on this repo for something along the lines of "Official Beam/IO Connector for Cloud SQL" and explain any of the current limitations etc. That way other users can upvote it and I can show stakeholders that it desired which will help to prioritize working on it π
Do you have any code examples lying around of creating this connection that bypasses SQLAlchemy entirely? I am aware we won't have the ability to manage connection pooling, but we are only grabbing data once via a single query that snags about 2k rows. If not no worries, but I did want to ask.
@danvau7 Yes you can use the Python Connector without SQLAlchemy. The connect()
or connect_async
method of the Connector will just return a database connection object for the given driver you are using. So if you are using asyncpg
persay you can just call connector.connect_async
as a direct substitute for asyncpg.connect
in the official asyncpg docs.
Would look like the following:
import asyncpg
async def runmain():
connector = await create_async_connector()
conn = await connector.connect_async(
"cs-xxx-adxxxxxxxx-sbxyyy:us-central1:pg-sql-usc1-pri", # Cloud SQL instance connection name
"asyncpg",
user="test",
db="databasename",
password="xyzxyzxyzxyz",
enable_iam_auth=False,
ip_type=IPTypes.PRIVATE,
timeout=10
)
# based on asyncpg docs... could use 'conn.execute', 'conn.fetch' etc.
time = await conn.fetchrow('SELECT NOW()')
print(time)
await conn.close()
await connector.close_async()
FYI: looks like folks are trying to update the pre-installed SQLAlchemy version for Datafow https://github.com/apache/beam/pull/30919
@jackwotherspoon The code above worked! Thank you so much. A lot of happy people. Also great to see this request produced a broader improvement to Apache Beam / Cloud Dataflow. That will definitely allow %much% easier connections to Cloud SQL.
I am also really happy this solution is on the internet in case someone else runs into it. If you get a chance, I would consider pasting your "SQLAlchemy-less" code in the official docs. It was a lifesaver.
Have a great upcoming weekend :)
π β€οΈ π
Awesome! Glad we were able to get a working solution. I will definitely think about adding a SQLAlchemy-less sample to our docs (making our samples/docs better is a bigger issue I'm trying to tackle soon).
@FearTheMonstar Let me know if you want to create the issue for "Official Beam/IO Connector for Cloud SQL" on this repo, otherwise I can open one if need be :smile:
Bug Description
While running the provided code on Dataflow, getting the following error:
I do not observe this error when running the same provided code locally interestingly.
I have tried to copy, almost exactly command for command (see provided example), using the two following code examples:
Not sure why I am getting this error.
Example code (or command)
Stacktrace
No response
Steps to reproduce?
Environment
Additional Details
Some more details. If I replace:
with
Then the code works fine and connects from Dataflow (which also proves it is not a permission / networking issue).
Thus I suspect it is some issue in the object returned by the
getconn
function. I do not believe I am mixing synchronous and asynchronous code (always a danger) but perhaps I missed something?