myntra / pipeline

Pipeline is a package to build multi-staged concurrent workflows with a centralized logging output.
MIT License
477 stars 54 forks source link

Data processing pipeline #5

Open sagikazarmark opened 7 years ago

sagikazarmark commented 7 years ago

Hey there,

Thanks for the great library, the Go market really lacked it.

Currently I am working on a data processing pipeline which imports data from various sources. This means that I have a data source with rows of data which I would like to process.

Any ideas how I could do it with this library?

(I know https://github.com/dailyburn/ratchet but I just can't get used to it's syntax)

Thanks in advance!

adnaan commented 7 years ago

Hey, Sorry for the delayed response.

Well it was never a primary intention of this library to be a establish data processing pipelines. For one, it provides only the flow control and leaves the processing to the implementer. I guess you would have probably have to write most of the data processing part. At this moment I am not sure, how would that work. I am definitely going to think about it though.

sagikazarmark commented 7 years ago

Thanks for your response.

I've been thinking about some solutions and I basically always got back to the same: make the pipeline stateless (currently it's not).

It would absolutely help to call the same pipeline multiple times which could be useful not just for a data processing pipeline. Currently you have to create a new pipeline each time you want to run one, even if you want to do the same steps on the context. I think it would be a great addition to achieve this (and probably can be done in a BC manner).

One way of doing that would be a way to pass the initial context/first request to the workflow.Run method. This would require change in the library.

Another solution (rather workaround) I have in mind is having a first step which reads from a channel every time I call Run, but it doesn't sound like a safe solution. A rather improved version of this would be using two channels: one carrying the context, another being just a signal that new data arrived and the the process should call workflow.Run.

I am pretty much in favor of option one. What do you think?

adnaan commented 7 years ago

That sounds reasonable. About initial request:

How about we have another api ?

func RunRequest(request *Request) *Result 

But you would still need to have a new pipeline, because the following methods depend on it: Out, GetProgessPercent, GetDuration

If you do want to reuse the same steps, one might do this:

newPipeline.AddStage(oldstages...)
newPipeline.RunRequest(request *Request)*Result

I definitely think this can be implemented in a much better way, but don't see how it can be done BC. Maybe we can we have a better implementation in v2.

Please let me know if I am totally missing the point.

Also, could you elaborate more about the pipeline being stateless ?

sagikazarmark commented 7 years ago

How about we have another api ?

Yeah, I was thinking about something similar. About the naming: I was thinking about something like RunWithContext, RunWithRequest.

Since the workflow is currently not stateless, it could also be part of the pipeline:

pipeline.InitialRequest(request)
pipeline.Run()

But this is probably less ideal.

Also, could you elaborate more about the pipeline being stateless ?

By stateless I mean there is no internal state in the pipeline, like initial data in a stage or like the ones mentioned (progress and duration). This would mean that subsequent (or even concurrent) calls to the same pipeline would be possible.

If making the pipeline stateless is not possible because of those details you mentioned (like we cannot return some thing as the result of calling Run along with Result) then we could create a PipelineBuilder holding the API for adding stages. Then create the stateful pipeline: pipeline := pipelineBuilder.Build()

This way building pipelines for batch use cases becomes a lot easier. Then the pipeline can be stateful and you do not reuse the pipeline itself, but it's builder.

Some code presenting the two solutions:

result, pipelineResult := pipeline.Run()

pipelineResult.Out()

This might not be perfect as it would require to return the result asynchronously.

pipelineBuilder.AddStage(...)

// loop
pipeline := pipelineBuilder.Build()
result := pipeline.Run()

Note that I haven't looked at the library code in-depth, so I might not be right about everything above, just trying to present the use-case and a few possible directions.

Right now for the V1 the pipeline builder might be the way (by deprecating adding stages directly to the pipeline)

adnaan commented 7 years ago

I have a running engagement which is keeping me busy. Allow me some time to think more about this.

sagikazarmark commented 7 years ago

Sure thing.

adnaan commented 7 years ago

Hey there. I have just started looking at this again. Was wondering do you have an example of a library in another language which suits your purposes?

Also have you looked at Kasper : https://movio.co/en/blog/Kasper-process-library/

sagikazarmark commented 7 years ago

Yeah, I know Kasper, but it looks like a bit huge for my needs ATM.

I know other implementations written in PHP (which I co-author):

https://github.com/portphp/portphp

Hope that helps.

On Jul 17, 2017 6:59 AM, "Adnaan Badr" notifications@github.com wrote:

Hey there. I have just started looking at this again. Was wondering do you have an example of a library in another language which suits your purposes?

Also have you looked at Kasper : https://movio.co/en/blog/ Kasper-process-library/

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/myntra/pipeline/issues/5#issuecomment-315668416, or mute the thread https://github.com/notifications/unsubscribe-auth/ABK2kIsNIwYD_0TWKsPC9oAXLZdrYgjKks5sOuohgaJpZM4NobRz .

adnaan commented 7 years ago

portphp explains a lot. Going through it.

sagikazarmark commented 7 years ago

Did you have the chance to give this a thought?

adnaan commented 7 years ago

Sorry about the delay. Went through portphp as far as I could. Would love to connect over voice if possible. Please let me know: badr.adnaan at gmail

sagikazarmark commented 7 years ago

I suggest that you sign on to slack and we talk there: http://slack.portphp.org/

adnaan commented 6 years ago

For the issue log: have started working on this.

adnaan commented 6 years ago

Incomplete PR for early feedback: https://github.com/myntra/pipeline/pull/7

sagikazarmark commented 6 years ago

Sorry, I didn't have too much time to review it so far, I'll try to do so.

In the meantime, here is a library with similar goals, probably with a smaller feature set: https://github.com/mitchellh/multistep