PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
16.26k stars 1.58k forks source link

Intertask data streaming #867

Closed OliverColeman closed 1 year ago

OliverColeman commented 5 years ago

I have a use-case in which there are two Tasks in a Flow called A and B. A produces a list of items which B consumes. I need B to be able to start consuming items as soon as A has produced the first one. We've no time to waste!

A and B both have long set-up times and are resource intensive while they're running (but different resources, A is CPU intensive, B is GPU intensive), thus running multiple Flows in parallel does not seem practical.

The component pieces of code in A and B could be combined into a Task C. However some Flows use A, some B, and and some both, thus this solution would only be in order to work around the framework, which appears to be something Prefect takes pains to avoid having to do...

Note: after talking to Chris White about this use case he suggested posting it as an issue. @cicdw

jlowin commented 5 years ago

Hi Oliver,

Thanks for describing this! We're collecting requirements for some streaming / event driven use cases and this is great to hear. We're especially interested in supporting situations where a discrete DAG-like workflow is kicked off by a continuous process. We have a few needs for this internally and it's a common enough "origin" for workflow semantics (processing items off a queue or stream, for example).

One thing I'd like to dig into, though, is that most varieties of the pattern that we're looking at involve a long-running task, A, which kicks off new instances of a downstream task B (followed by C and D) in response to events. For example, A monitors a stream and for each item i, it launches a new set of processing steps B[i] -> C[i] -> D[i].

In your situation, A and B sound like they are both always-on due to the resource requirements, and the complexity you'd like to manage is the constant communication from A to B. I'd want to explore a little bit what Prefect's workflow semantics look like in that case, as most of Prefect's state management tools have a baked assumption that upstream tasks finish (or at least enter a known final state) before downstream tasks start, forming a more strict state machine.

I wouldn't shy too far from the solution @cicdw described, though, in which you have two separate tasks A and B that you combine into a task C for this specific Flow, and leave separate for other Flows. It's actually a useful enough pattern that we are actively considering the best way to support it automatically though we haven't found the magic combination of arguments quite yet! The use case we've considered previously is an ETL job that is so resource/data intensive that you want to guarantee that all data is processed on the same node, so you'd combine a source task and a sink task into a new task that would necessarily run in one place.

Similarly, you could have A and B live in your task library, and build a very simple task C that simply instantiated A, instantiated B, and then called them in succession.

Not saying that's the best solution for your situation, necessarily, but it's definitely viable for some circumstances!

OliverColeman commented 5 years ago

Hi Jeremiah,

Thanks for thorough reply (and to Chris, too, in the chat). All the functionality you describe sounds very useful, and potentially applicable to my use-case, eg something like the event-driven approach.

Perhaps it would be more useful to provide the actual concrete example of my use-case. We have a homegrown task management system. It currently works on a cluster and handles basic task requirements (different machines have different hardware), a simple concept of task interdependence (and when two tasks should be run on the same machine), and task state handling. I'd like to offload the engineering of this task management to a platform such as Prefect. The tasks are:

Neither of these tasks are always-on, they're set-up for some particular parameters, run to completion, then deallocated (freeing up the hardware resources they were claiming).

OliverColeman commented 5 years ago

I should add that in the example above tasks A and B may not run on the same machine, precluding combining them into the same task.

jlowin commented 5 years ago

Thanks Oliver --

This is extremely helpful -- I immediately see why Prefect's semantics are appropriate, but also why the alternative solutions Chris and I proposed are inadequate.

As I mentioned, we're collecting use cases like yours, and some clear patterns are emerging (unsurprisingly). There are a few ways for us to support these modes in Prefect, but there are still a number of questions to answer. I expect that in the near future you'll see some of those designs start to be floated here for discussion, but if you'd be interested in chatting with us directly, we'd love your input -- hello at prefect.io

Thanks!

jlowin commented 5 years ago

Sorry Oliver, didn't mean to close!

lromor commented 3 years ago

Hi, I have an identical problem!

sm-Fifteen commented 2 years ago

It's unclear to me from reading the Orion announcement whether or not it's going to fix this issue. It's mentioned as a case where the current DAG approach doesn't cut it:

As Prefect has matured, so has the modern data stack. The on-demand, dynamic, highly scalable workflows that used to exist principally in the domain of data science and analytics are now prevalent throughout all of data engineering. Few companies have workflows that don’t deal at least in part with streaming data, uncertain timing, runtime logic, upstream dependencies, dynamic mapping, dataflow, complex conditions, versioning, subgraph retries, or custom scheduling, just to name a few.

It's not really mentioned elsewhere in the announcement, but I'd be interested to know if it's a use case that Orion can cover where Prefect couldn't.

cicdw commented 2 years ago

Yes! We have work to do to make streaming a first-class pattern, but Orion was structured in a way that will allow us to do so. In particular, because the separation between orchestration and execution in Orion is cleaner, we have a large number of ways to configure execution that include streaming patterns. I don't expect a full streaming story to be released with the MVP but I do expect us to take this up this year.

mattfysh commented 2 years ago

Hi @cicdw - is streaming still on the agenda for Prefect this year? I am currently working on a flow that produces a large amount of data, and then I'd like to have thousands of other flows that define a filter on the data, and run these flows when new data matching the filter criteria arrives (as close to RT as possible). thanks!

sapcode commented 1 year ago

Hi prefect team, we would also like to realize a streaming consument task which will get SSE Server side requests using the requests api. The SSE events would deliver an endless stream of data, which will be written to a database. Best regards

github-actions[bot] commented 1 year ago

This issue is stale because it has been open 30 days with no activity. To keep this issue open remove stale label or comment.

github-actions[bot] commented 1 year ago

This issue was closed because it has been stale for 14 days with no activity. If this issue is important or you have more to add feel free to re-open it.