denisenkom / pytds

Python DBAPI driver for MSSQL using pure Python TDS (Tabular Data Stream) protocol implementation
MIT License
190 stars 52 forks source link

Cannot connect to GCP CloudSQL MSSQL using sqlalchemy-pytds #141

Closed yaraslaublonski closed 1 year ago

yaraslaublonski commented 1 year ago

I have following code which saves dataframe to the place where connector is watching

def fill_test_data(connector_class, config) -> tuple[AbstractConnector, pd.DataFrame]:
  """Writes pandas dataframe as a table to the database"""
  connector, df = prepare_test_data(connector_class, config)

  with connector as con:
    df.to_sql(name=config.get("db_table"), con=con, index=False, if_exists="replace")

  return connector, df

Connector has this methods to be able to connect to cloud sql database

  def __get_connection(self) -> Any:
    """Get connection to GCP Cloud SQL database"""
    decoded_creds = base64.b64decode(self.params["db_creds_base64"].encode()).decode()
    db_creds = json.loads(decoded_creds)
    creds = service_account.Credentials.from_service_account_info(db_creds)
    return Connector(credentials=creds).connect(
      instance_connection_string=self.params["db_instance"],
      driver=self.params["db_driver"],
      user=self.params["db_user"],
      password=self.params["db_pass"],
      db=self.params["db_name"],
      ip_type=IPTypes.PUBLIC
    )

  def _make_engine(self) -> sqlalchemy.engine.Engine:
    """Make engine for db"""
    db_type = self.params["db_type"]
    db_driver = self.params["db_driver"]
    suffix_dns: str = ""
    # exceptional case when we use mssql server
    if db_driver == CloudSQLDriverEnum.SQLSERVER:
      suffix_dns = "localhost"
    return sqlalchemy.create_engine(
      url=f"{db_type}+{db_driver}://{suffix_dns}", creator=self.__get_connection
    )
def cloud_sql_mssql() -> tuple[CloudSQLConnector, pd.DataFrame]:
app-test_1  |       """Produce cloud sql connector and test df"""
app-test_1  |       table_name = "test_table"
app-test_1  |       creds = json.dumps(json.load(open("cred.json", "r")))
app-test_1  |       config = {
app-test_1  |         "type": "cloudsql",
app-test_1  |         "db_instance": f"{conf.CSQL_PROJECT}:{conf.CSQL_REGION}:{conf.CSQL_MSSQL_INSTANCE}",
app-test_1  |         "db_type": CloudSQLTypeEnum.SQLSERVER,
app-test_1  |         "db_user": conf.CSQL_USER,
app-test_1  |         "db_pass": conf.CSQL_PASS,
app-test_1  |         "db_name": conf.CSQL_DATABASE,
app-test_1  |         "db_table": table_name,
app-test_1  |         "db_driver": CloudSQLDriverEnum.SQLSERVER,
app-test_1  |         "db_creds_base64": base64.b64encode(creds.encode()).decode(),
app-test_1  |       }
app-test_1  | >     connector, df = fill_test_data(CloudSQLConnector, config)
app-test_1  | 
app-test_1  | wand/sneakpeek/test/fixtures/cloud_sql_fixture.py:75: 
app-test_1  | _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
app-test_1  | wand/sneakpeek/test/fixtures/utils.py:12: in fill_test_data
app-test_1  |     with connector as con:
app-test_1  | wand/shared/models/connector.py:39: in __enter__
app-test_1  |     self._connection = engine.connect()
app-test_1  | /venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:3197: in connect
app-test_1  |     return self._connection_cls(self, close_with_result=close_with_result)
app-test_1  | /venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:96: in __init__
app-test_1  |     else engine.raw_connection()
app-test_1  | /venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:3276: in raw_connection
app-test_1  |     return self._wrap_pool_connect(self.pool.connect, _connection)
app-test_1  | /venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:3243: in _wrap_pool_connect
app-test_1  |     return fn()
app-test_1  | /venv/lib/python3.9/site-packages/sqlalchemy/pool/base.py:310: in connect
app-test_1  |     return _ConnectionFairy._checkout(self)
app-test_1  | /venv/lib/python3.9/site-packages/sqlalchemy/pool/base.py:868: in _checkout
app-test_1  |     fairy = _ConnectionRecord.checkout(pool)
app-test_1  | /venv/lib/python3.9/site-packages/sqlalchemy/pool/base.py:476: in checkout
app-test_1  |     rec = pool._do_get()
app-test_1  | /venv/lib/python3.9/site-packages/sqlalchemy/pool/impl.py:146: in _do_get
app-test_1  |     self._dec_overflow()
app-test_1  | /venv/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py:70: in __exit__
app-test_1  |     compat.raise_(
app-test_1  | /venv/lib/python3.9/site-packages/sqlalchemy/util/compat.py:207: in raise_
app-test_1  |     raise exception
app-test_1  | /venv/lib/python3.9/site-packages/sqlalchemy/pool/impl.py:143: in _do_get
app-test_1  |     return self._create_connection()
app-test_1  | /venv/lib/python3.9/site-packages/sqlalchemy/pool/base.py:256: in _create_connection
app-test_1  |     return _ConnectionRecord(self)
app-test_1  | /venv/lib/python3.9/site-packages/sqlalchemy/pool/base.py:371: in __init__
app-test_1  |     self.__connect()
app-test_1  | /venv/lib/python3.9/site-packages/sqlalchemy/pool/base.py:666: in __connect
app-test_1  |     pool.logger.debug("Error on connect(): %s", e)
app-test_1  | /venv/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py:70: in __exit__
app-test_1  |     compat.raise_(
app-test_1  | /venv/lib/python3.9/site-packages/sqlalchemy/util/compat.py:207: in raise_
app-test_1  |     raise exception
app-test_1  | /venv/lib/python3.9/site-packages/sqlalchemy/pool/base.py:661: in __connect
app-test_1  |     self.dbapi_connection = connection = pool._invoke_creator(self)
app-test_1  | /venv/lib/python3.9/site-packages/sqlalchemy/pool/base.py:241: in <lambda>
app-test_1  |     return lambda crec: creator()
app-test_1  | wand/shared/models/connector.py:100: in __get_connection
app-test_1  |     return Connector(credentials=creds).connect(
app-test_1  | /venv/lib/python3.9/site-packages/google/cloud/sql/connector/connector.py:146: in connect
app-test_1  |     return connect_task.result()
app-test_1  | /usr/local/lib/python3.9/concurrent/futures/_base.py:446: in result
app-test_1  |     return self.__get_result()
app-test_1  | /usr/local/lib/python3.9/concurrent/futures/_base.py:391: in __get_result
app-test_1  |     raise self._exception
app-test_1  | /venv/lib/python3.9/site-packages/google/cloud/sql/connector/connector.py:245: in connect_async
app-test_1  |     raise (e)
app-test_1  | /venv/lib/python3.9/site-packages/google/cloud/sql/connector/connector.py:239: in connect_async
app-test_1  |     return await asyncio.wait_for(get_connection(), timeout)
app-test_1  | /usr/local/lib/python3.9/asyncio/tasks.py:479: in wait_for
app-test_1  |     return fut.result()
app-test_1  | /venv/lib/python3.9/site-packages/google/cloud/sql/connector/connector.py:235: in get_connection
app-test_1  |     return await self._loop.run_in_executor(None, connect_partial)
app-test_1  | /usr/local/lib/python3.9/concurrent/futures/thread.py:58: in run
app-test_1  |     result = self.fn(*self.args, **self.kwargs)
app-test_1  | /venv/lib/python3.9/site-packages/google/cloud/sql/connector/pytds.py:76: in connect
app-test_1  |     return pytds.connect(
app-test_1  | /venv/lib/python3.9/site-packages/pytds/__init__.py:1345: in connect
app-test_1  |     conn._open(sock=sock)
app-test_1  | /venv/lib/python3.9/site-packages/pytds/__init__.py:372: in _open
app-test_1  |     self._try_open(timeout=retry_time, sock=sock)
app-test_1  | /venv/lib/python3.9/site-packages/pytds/__init__.py:354: in _try_open
app-test_1  |     self._connect(host=host, port=port, instance=instance, timeout=timeout, sock=sock)
app-test_1  | _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
app-test_1  | 
app-test_1  | self = <pytds.Connection object at 0xffff98249970>, host = '34.140.48.128'
app-test_1  | port = 1433, instance = '', timeout = 2.4
app-test_1  | sock = <ssl.SSLSocket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>
app-test_1  | 
app-test_1  |     def _connect(self, host, port, instance, timeout, sock=None):
app-test_1  |         login = self._login
app-test_1  |     
app-test_1  |         try:
app-test_1  |             login.server_name = host
app-test_1  |             login.instance_name = instance
app-test_1  |             port = _resolve_instance_port(
app-test_1  |                 host,
app-test_1  |                 port,
app-test_1  |                 instance,
app-test_1  |                 timeout=timeout)
app-test_1  |             if not sock:
app-test_1  |                 logger.info('Opening socket to %s:%d', host, port)
app-test_1  |                 sock = socket.create_connection((host, port), timeout)
app-test_1  |         except Exception as e:
app-test_1  |             raise LoginError("Cannot connect to server '{0}': {1}".format(host, e), e)
app-test_1  |     
app-test_1  | >       sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
app-test_1  | E       OSError: [Errno 9] Bad file descriptor

It gives me this traceback and I don't know what to do. I haven't found any information in google and sqlalchemy-tds. Looks like it's a bug in pytds library, but I haven't found a solution for it

yaraslaublonski commented 1 year ago

Fixed. The issue was that user hadn't got permissions to write to the database. Unfortunately, this info cannot be retrieved from the error.