treasure-data / luigi-td

Luigi Workflow Engine integration for Treasure Data
http://docs.treasuredata.com/articles/luigi
Apache License 2.0
16 stars 15 forks source link

Feature Request: Allow unspecified schema for luigi_td.TableTarget #2

Open kevhill opened 8 years ago

kevhill commented 8 years ago

With TreasueData's flexible schemas, it generally isn't required to specify a schema in code but rely on automated updates with imports. This is a great feature that keeps our luigi code small and more easily maintainable. The notable exception is when using luigi_td.TableTask when empty=False. Then you must specify the exact schema of the table. This is a shame, and it limits the usefulness of TableTask.

It looks like this behavior is inherited from TableTarget.exists() where a schema of None gets mapped to [], which requires an empty schema on the table.

The desired behavior would be to only check the schema if TableTarget.schema is not None. This would allow both explicitly checking for an empty schema if the user explicitly passed schema=[] to TableTask but also enable a check just for the table existing, regardless of schema with the default schema=None

k24d commented 8 years ago

That makes sense. I can't remember the reason why I designed TableTarget (and TableTask) like this, but apparently I always set schema explicitly for all tables I created at that time.

I totally agree with your proposal, but the real solution would probably be not to use TableTask at all. After writing some data pipelines with Luigi, I concluded that defining table operations as separate tasks makes things too complex. Currently I always create a table and run a query in a single task like this:

class MyTask(...):
    ...

    def run(self):
        # create a table
        client = tdclient.Client()
        client.create_log_table('mydb', 'mytable')
        # run a query
        result = self.run_query(self.query)
        self.output().save_result_state(result)

Can this solve your issue?

kevhill commented 8 years ago

The use case is a table that updates periodically with new data. Before the first job runs, we need to create a table, but before the 2nd job runs, we certainly don't want the table to be emptied. The solution I've used is to just make the tables through either the GUI or a separate task list. During development when things get wiped regularly, this becomes a bit of a pain. Also it is a bit confusing to other team members who want to help with development. 'Oh yeah, the pipeline works, you just need to do something outside the pipeline first to set it up' And then you need to keep that list of 'tables to make' fresh with code changes, etc.

So there needs to be at least a bit of conditional logic in there, but maybe I can do this outside of a task as well. I only want to create the table if one doesn't already exist, and because of the strengths of treasure data, I don't want to have to specify the schema on the table that exists.

Looking at the td_client TableAPI I could use list_tables and my own conditional logic, but it seems like I would be reinventing the wheel, since TableTask is so close to what I want.

k24d commented 8 years ago

I see. Give me a few days to fix the code and make a release.

kevhill commented 8 years ago

:+1:

k24d commented 8 years ago

Released 0.6.6.

kevhill commented 8 years ago

Thanks for the quick response. Unfortunately this doesn't seem to fix the issue. The TableTarget needs to be modified too.

ERROR: [pid 75] Worker Worker(salt=213001358, workers=4, host=9db28d3f3901, username=etl, pid=62) failed    TDPartialDeleteTask(destination_database=kh_dev, destination_schema=None, start_date_utc=2015-10-01, end_date_utc=2015-11-01, destination_table=stage_user_last_location)
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/luigi/worker.py", line 148, in run
    missing = [dep.task_id for dep in self.task.deps() if not dep.complete()]
  File "/usr/local/lib/python2.7/dist-packages/luigi/task.py", line 347, in complete
    return all(map(lambda output: output.exists(), outputs))
  File "/usr/local/lib/python2.7/dist-packages/luigi/task.py", line 347, in <lambda>
    return all(map(lambda output: output.exists(), outputs))
  File "/usr/local/lib/python2.7/dist-packages/luigi_td/targets/td.py", line 48, in exists
    raise SchemaError('table schema for {0}.{1} does not match'.format(self.database_name, self.table_name))
SchemaError: table schema for kh_dev.stage_user_last_location does not match

See the first comment for what I'm pretty sure are the problem lines.