dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.33k stars 1.43k forks source link

mysql.connector Error: Python 'multipartitionkey' cannot be converted to a MySQL type #15273

Open bkozura opened 1 year ago

bkozura commented 1 year ago

Dagster version

version 1.3.12

What's the issue?

Good morning When I try to run an asset materialization with multipartitions (one monthly, one static). I am running into this unexpected error:

sqlalchemy.exc.ProgrammingError: (mysql.connector.errors.ProgrammingError) Failed processing pyformat-parameters; Python 'multipartitionkey' cannot be converted to a MySQL type
[SQL: INSERT INTO event_logs (run_id, event, dagster_event_type, timestamp, step_key, asset_key, `partition`) VALUES (%(run_id)s, %(event)s, %(dagster_event_type)s, %(timestamp)s, %(step_key)s, %(asset_key)s, %(partition)s)]
[parameters: {'run_id': 'f64ebdde-035f-4141-8c80-618bed14d339', 'event': '{"__class__": "EventLogEntry", "dagster_event": {"__class__": "DagsterEvent", "event_specific_data": {"__class__": "StepMaterializationData", "asset_ ... (2069 characters truncated) ... -8c80-618bed14d339", "step_key": "srids_from_requester", "timestamp": 1689251990.9016986, "user_message": "Materialized value srids_from_requester."}', 'dagster_event_type': 'ASSET_MATERIALIZATION', 'timestamp': datetime.datetime(2023, 7, 13, 12, 39, 50, 901699), 'step_key': 'srids_from_requester', 'asset_key': '["srids_from_requester"]', 'partition': 'tiktok_data_export|2022-01-01'}]
(Background on this error at: https://sqlalche.me/e/20/f405)
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_plan.py", line 273, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_step.py", line 375, in core_dagster_event_sequence_for_step
    for evt in _type_check_and_store_output(step_context, user_event):
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_step.py", line 428, in _type_check_and_store_output
    for evt in _store_output(step_context, step_output_handle, output):
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_step.py", line 673, in _store_output
    yield DagsterEvent.asset_materialization(step_context, materialization)
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/dagster/_core/events/__init__.py", line 895, in asset_materialization
    return DagsterEvent.from_step(
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/dagster/_core/events/__init__.py", line 413, in from_step
    log_step_event(step_context, event)
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/dagster/_core/events/__init__.py", line 292, in log_step_event
    step_context.log.log_dagster_event(
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/dagster/_core/log_manager.py", line 409, in log_dagster_event
    self.log(level=level, msg=msg, extra={DAGSTER_META_KEY: dagster_event})
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/dagster/_core/log_manager.py", line 424, in log
    self._log(level, msg, args, **kwargs)
  File "/usr/lib/python3.10/logging/__init__.py", line 1624, in _log
    self.handle(record)
  File "/usr/lib/python3.10/logging/__init__.py", line 1634, in handle
    self.callHandlers(record)
  File "/usr/lib/python3.10/logging/__init__.py", line 1696, in callHandlers
    hdlr.handle(record)
  File "/usr/lib/python3.10/logging/__init__.py", line 968, in handle
    self.emit(record)
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/dagster/_core/log_manager.py", line 290, in emit
    handler.handle(dagster_record)
  File "/usr/lib/python3.10/logging/__init__.py", line 968, in handle
    self.emit(record)
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/dagster/_core/instance/__init__.py", line 199, in emit
    self._instance.handle_new_event(event)
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/dagster/_core/instance/__init__.py", line 1926, in handle_new_event
    self._event_storage.store_event(event)
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/dagster/_core/storage/event_log/sql_event_log.py", line 387, in store_event
    result = conn.execute(insert_event_statement)
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1412, in execute
    return meth(
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 483, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1635, in _execute_clauseelement
    ret = self._execute_context(
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1844, in _execute_context
    return self._exec_single_context(
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1984, in _exec_single_context
    self._handle_dbapi_exception(
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2339, in _handle_dbapi_exception
    raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1965, in _exec_single_context
    self.dialect.do_execute(
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 921, in do_execute
    cursor.execute(statement, parameters)
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/mysql_connector_python-8.0.33-py3.10.egg/mysql/connector/cursor.py", line 595, in execute
    stmt = _bytestr_format_dict(stmt, self._process_params_dict(params))
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/mysql_connector_python-8.0.33-py3.10.egg/mysql/connector/cursor.py", line 451, in _process_params_dict
    raise ProgrammingError(

The above exception was caused by the following exception:
mysql.connector.errors.ProgrammingError: Failed processing pyformat-parameters; Python 'multipartitionkey' cannot be converted to a MySQL type
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1965, in _exec_single_context
    self.dialect.do_execute(
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 921, in do_execute
    cursor.execute(statement, parameters)
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/mysql_connector_python-8.0.33-py3.10.egg/mysql/connector/cursor.py", line 595, in execute
    stmt = _bytestr_format_dict(stmt, self._process_params_dict(params))
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/mysql_connector_python-8.0.33-py3.10.egg/mysql/connector/cursor.py", line 451, in _process_params_dict
    raise ProgrammingError(

The above exception was caused by the following exception:
TypeError: Python 'multipartitionkey' cannot be converted to a MySQL type
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/mysql_connector_python-8.0.33-py3.10.egg/mysql/connector/cursor.py", line 445, in _process_params_dict
    conv = to_mysql(conv)
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/mysql_connector_python-8.0.33-py3.10.egg/mysql/connector/conversion.py", line 230, in to_mysql
    raise TypeError(

The above exception occurred during handling of the following exception:
AttributeError: 'MySQLConverter' object has no attribute '_multipartitionkey_to_mysql'
  File "/opt/measure/env/measure-analytics/lib/python3.10/site-packages/mysql_connector_python-8.0.33-py3.10.egg/mysql/connector/conversion.py", line 223, in to_mysql
    converted: ToMysqlOutputTypes = getattr(self, f"_{type_name}_to_mysql")(

This happens on both the dev and prod envs. It does not happen on my local environment.

What did you expect to happen?

Asset materialization runs

How to reproduce?

package versions

dagster==1.3.13
dagit==1.3.13
dagster-aws==0.19.13
dagster-cloud==1.3.13
dagster-mysql==0.19.13
mysql-connector-python 8.0.33

Deployment type

Other

Deployment details

storage:
  mysql:
    mysql_db:
      username: dagsteruser
      password: dagsteruser
      hostname:
        env: DAGSTER_MYSQL_HOSTNAME
      db_name: dagster
      port: 3306

compute_logs:
  module: dagster.core.storage.local_compute_log_manager
  class: LocalComputeLogManager
  config:
    base_dir: 
      env: DAGSTER_HOME

local_artifact_storage:
  module: dagster.core.storage.root
  class: LocalArtifactStorage
  config:
    base_dir: 
      env: DAGSTER_HOME

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

clairelin135 commented 1 year ago

@bkozura -- thanks for all the details here. Oddly, when I run an asset with mysql storage on the versions you mentioned, I don't see an error. This error is also surprising given that the multi partition key class is a string subclass, so it should be able to serialize...

Would you mind also sharing a code sample of the asset you're running, and validating that those versions above apply to your deployment?

pirackr commented 1 year ago

@clairelin135 I got the same issue and I can reproduce this with basically any multi-partition keys. Something like this and run it as an asset will trigger the issue

monthly = MonthlyPartitionsDefinition(
    start_date="2023-01-01"
)

other_partitions = DynamicPartitionsDefinition(name="other") # or other can be a static partition

partition = MultiPartitionsDefinition(
    {"date": monthly, "other": other_partitions }
)

@asset(partitions_def=partition, name="asset")
def my_asset(context: OpExecutionContext):
    return []

job = build_schedule_from_partitioned_job(job=my_asset, default_status=DefaultScheduleStatus.RUNNING)

I think SQLAlchemy doesn't understand the MultiPartitionKey as a string, even though it is subclass of str. This could potentially be a compatibility issue between mysql-connector and SQLAlchemy - I read SQLAlchemy docs and they don't recommend to use mysql-connector as the driver, it is not even included in their integration test pipeline.

A quick workaround can get me bypass this:

--- a/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py
+++ b/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py
           if event.dagster_event.partition:
                 partition = event.dagster_event.partition
+        partition = str(partition) if partition is not None else None
         # https://stackoverflow.com/a/54386260/324449
         return SqlEventLogStorageTable.insert().values(
             run_id=event.run_id,
             timestamp=datetime.utcfromtimestamp(event.timestamp),
             step_key=step_key,
             asset_key=asset_key_str,
             partition=partition,
         )

But I assume it's just a workaround.

clairelin135 commented 1 year ago

@pirackr got it, thanks for the detailed explanation. Glad you found a workaround--I'll update the code when I get the chance to handle this, likely with the same workaround you mentioned :)