Closed saylorsd closed 8 years ago
Here are some goals that we probably want to have:
With the current architecture, pipelines are generated in a functional manner (e.g. pipeline = Pipeline().extract(FileExtractor).schema(MySchema).load(Datapusher)
) which means that you could, for example, import pipeline; pipeline.schedule()
. I think this would be good because we could keep a central registry that understands which pipelines are currently registered, and then have a pipeline's schedule
method keep track of everything. These could then be executed as part of a cron script. It would also be possible to have the pipeline's execution code live in the same place where a pipeline is declared and have many cron scripts, one for each pipeline (which might make it harder to knock over the whole system if something is busted).
In terms of the monitoring mixin itself, the problem with using a flat file is that you lose historical information. Instead, I think we should dump the information into a database (another benefit of this is that we can write a pipeline that makes the ETL monitoring data available on the portal!). Sqlite is probably sufficient, and would be pretty easy to work with. It would basically be a one-table bit of information. An open question would be about the schema that should be used.
What do you think?
Hi @saylorsd I just commited b75ad19, which is a implementation of the above thinking. It uses sqlite with the relevant python standard library bindings to store metadata about pipeline runs. Let me know what you think.
@bsmithgall Can you expand a bit on what you're thinking with pipeline.schedule
? Would it be called via a cron script or something to check the statusdb and from there decide whether or not to run the pipeline? Also, I do like the idea of keeping the scheduling decentralized.
Also, I like the status database. Regarding the schema, I like what's there so far as a start. I think as we work through more examples, and different types of extractions and transformations, more types of fields may become apparent.
Good question @saylorsd -- I think there are two possible choices going forward.
schedule
method. I think the right way to do this then would be to add some sort of register
method to the overall pipeline class and then write a top-level script that crawls through the jobs
directory, finds the pipelines, and runs them. This script could be executed by cron. It would look something like this:python find_registered_jobs.py
statusdb
sqlite file would keep track of whether or not execution should occur (it should only happen if the most job is successful and older than an hour). We could consider adding the click library (written by the creator of the excellent flask and jinja2 libraries) to make it a bit easier to handle. In this case, an example line in the file might look like pipeline jobs.fatal_od.fatal_od_pipeline --schedule --hourly
Right now I'm leaning a bit more towards the second one because it gives you a lot more control over each individual job directly at the cron level, and allows us to potentially break out the pipeline
project into its own standalone python project that could be pip-installable. This would be a big win because it could be supported independently and possibly adopted elsewhere.
Thoughts?
If you want to jump on a call to talk about this let me know @saylorsd
@bsmithgall That'd be great. If you can call me before 4:30, I'll be free. Otherwise I can call you on Monday or this weekend if that works better for you.
Per offline discussion, we are going to move forward with the second option
I'm thinking of having keeping a schedule record (probably just JSON for now) of all the jobs. Then have a python script that manages what to run and when. This, in turn, could be run as a cron job every 5 minutes or something. e.g.