Luigi is a Python module that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization etc. It also comes with Hadoop support built in.
Apache License 2.0
17.71k
stars
2.39k
forks
source link
sqla.SQLAlchemyTarget.exist() won't work with SqlAlchemy 2.x version #3278
from luigi import build
from luigi.contrib.sqla import CopyToTable
from sqlalchemy import create_engine
engine = create_engine('sqlite:///test.db')
class TaskCopy(CopyToTable):
connection_string = 'sqlite:///test.db'
table = 'my_table'
if __name__ == '__main__':
build([TaskCopy()], local_scheduler=True)
Error:
sqlalchemy.exc.ArgumentError: Column expression, FROM clause, or other columns clause element expected, got [Table('table_updates', MetaData(), Column('update_id', String(length=128), table=<table_updates>, primary_key=True, nullable=False), Column('target_table', String(length=128), table=<table_updates>), Column('inserted', DateTime(), table=<table_updates>, default=ScalarElementColumnDefault(datetime.datetime(2024, 2, 16, 11, 39, 6, 28774))), schema=None)]. Did you mean to say select(Table('table_updates', MetaData(), Column('update_id', String(length=128), table=<table_updates>, primary_key=True, nullable=False), Column('target_table', String(length=128), table=<table_updates>), Column('inserted', DateTime(), table=<table_updates>, default=ScalarElementColumnDefault(datetime.datetime(2024, 2, 16, 11, 39, 6, 28774))), schema=None))?
Description
It seems sqla.py module is using old SqlAlchemy select interface in: https://github.com/spotify/luigi/blob/25d179b70cbfaf8d78f6cee6c153fcf098babeec/luigi/contrib/sqla.py#L242
which seems to be incompatible with SqlAlchemy 2.x according to: https://docs.sqlalchemy.org/en/20/changelog/migration_14.html#change-5284
Details
Python version:
3.10.11
Package versions:
Snippet to reproduce:
Error: