meltano / sdk

Write 70% less code by using the SDK to build custom extractors and loaders that adhere to the Singer standard: https://sdk.meltano.com
https://sdk.meltano.com
Apache License 2.0
88 stars 64 forks source link

Parallelize taps by partition (configurable in meltano.yml) #2121

Open melgazar9 opened 7 months ago

melgazar9 commented 7 months ago

Feature scope

Taps (catalog, state, stream maps, tests, etc.)

Description

I'm considering writing my own parallelized task for a meltano tap. I'm submitting this feature request since this can be generalized to run taps in parallel by partition (or even by state, but it's less obvious how to do that IMO). I think we can use the multiprocessing library to run multi-processes by partition. Default behavior of meltano taps would run unless both of these conditions are set:

Something like this:

from multiprocessing import Process

processes = []
for i in range(<meltano processes set in meltano.yml>):
    p = Process(target=<meltano run command for specific partition>, args=args)
    p.start()
    processes.append(p)

for process in processes:
    process.join()

or similarly:

with concurrent.futures.ProcessPoolExecutor() as executor:
    results = [<meltano run command> for cmd in range(<n processes in meltano.yml>)]

    for r in concurrent.futures.as_completed(results):
        r.result()

I think you can also do something like this:

executor.map(processes, args)
SidduHussain commented 1 month ago

@melgazar9 - were you able to implement this.

melgazar9 commented 1 month ago

Hey @iSidduHussain I haven't attempted to generalize this process and integrate it into meltano, but I did write something rough in one of my side projects. I haven't fully tested it or know if what I did is best practice following meltano guidelines, but one thing I noticed is it's referencing the same .meltano/meltano.db db even while running in parallel with different state_ids. I think this is ok since I'm auto-generating different state-ids, but I doubt this is a good idea if using the same state-id.

https://github.com/melgazar9/ds-mono-repo/blob/dev/projects/financial_elt/routes.py#L65-L131