vmware / versatile-data-kit

One framework to develop, deploy and operate data workflows with Python and SQL.
Apache License 2.0
417 stars 54 forks source link

Instantiate a VDK job_input from code #785

Closed mrdavidlaing closed 1 year ago

mrdavidlaing commented 2 years ago

What is the feature request? What problem does it solve? Currently it's only possible to get access to an instantiated job_input when the VDK framework executes the def run(job_input) method.

The means that it isn't possible to instantiate a job_input object if the entry point to your datajob isn't via the def run(job_input) method and thus from a call to vdk run.

This makes it difficult to integrate the VDK with other orchestration frameworks (like Dagster.io) that do not use vdk run as their entry point.

Suggested solution

Add a new public API that facilitates instantiating a VDK job_input object outside the context of vdk run, eg:

vdk_datajob = DataJobFactory.new_datajob(...)
vdk_job_input = vdk_datajob.get_jobinput()

Additional context PR with proposed solution to follow

antoniivanov commented 2 years ago

Thanks for helping with this!

Can you explain a bit what is the business problem? I think we've discussed it. I believe the reason for this feature is to enable easier integration with 3rd party tool (in your case dagster). This also makes the VDK SDK usable as a standalone library which would be helpful if you just want to take care of the ingestion/transformation utilities in another runtime.

I am looking forward to the PR. If you need any advice or help prior feel free to ping me.

ivakoleva commented 2 years ago

Welcome, David, Is the API issue with the database connection supplier that is singleton, or the job input supplier that is singleton?

My understanding is, relationships currently look like (all one-to-one): [job] 1<--->1 [job_input] 1<--->1 [managed_connection] Therefore, one data job would receive a single job_input via def run, and could get one database connection via job_input.get_managed_connection.

(A) Is the suggested solution: [job] 1<--->1 [job_input] 1<--->1..* [managed_connection]

(B) or is it rather: [job] 1<--->1..* [job_input] 1<--->1 [managed_connection]

mrdavidlaing commented 2 years ago

@ivakoleva

I'm anticipating (B) being the right way to go, with each process getting its own isolated job_input object.

mrdavidlaing commented 2 years ago

@tozka - yep - the business problem is easier integration with orchestration frameworks like Dagster (which want to own "starting" the job)

mrdavidlaing commented 2 years ago

@ivakoleva / @tozka I've submitted PR #793 as a first stab at functionality implements a feature to address the use case described in this issue.

I'd really appreciate any early feedback you have on the implementation - am I going in the right direction, or can you see a better way to approach the problem.

Thanks!

ivakoleva commented 2 years ago

@mrdavidlaing thank you, my impression is this effort is purposed to enable parallel database operations, is that correct? That seems a valuable effort to me, and I am looking forward to deep-dive into interface and workflow changes.

The suggestion is having multiple job_input instances, and job_input interface exposes those operations:

[JobInput]
+get_managed_connection
+get_arguments
+get_property
+get_all_properties
+set_all_properties(properties)
+execute_query(sql)
+send_object_for_ingestion(payload,)
+send_tabular_data_for_ingestion(rows,column_names,)
+execute_template(template_name,template_args)
+get_name
+get_job_directory
+get_execution_properties

(A) One way is extending the JobInput interface, to enable provisioning with multiple PEP249 connections on-demand. In that case, no job_input provisioning changes expected.

(B) Another way is to keep get_managed_connection returning a unique PEP249 connection (still singleton - per job_input instance), and multiply the job_input instances instead. 1. What about the other job_input operations, are there any changes expected, and are the rest operations affected by introducing multiple job_input instances? Also, the realisation proposal includes changing the data job provisioning of job_input objects - since def run(job_input) now only supports one job_input so far. Currently vdk run a-data-job-dir/ starts a Python process that evaluates the VDK entry point, then the workflow includes hooks being potentially evaluated, and that data job def run(job_input) implementation standard entry point is being evaluated by wiring the job_input pre-initialized. 2. Where are the 1..* job_input objects instantiated, and how does the workflow looks like?

ivakoleva commented 2 years ago

I don't think we have very good visibility of https://github.com/vmware/versatile-data-kit/tree/main/specs yet, especially on API changes that are pretty major

antoniivanov commented 1 year ago

Fixe by introducing StandaloneDataJob

ivakoleva commented 1 year ago

Is it true that the approach introduced (StandaloneDataJob) results in multiple execution ids, in the scope of a single execution?