robinhood / faust

Python Stream Processing
Other
6.74k stars 535 forks source link

[Questions] - Deployment #187

Open emirot opened 6 years ago

emirot commented 6 years ago

I was wondering how you deploy/scale Faust in production ? Using docker + k8s ? Happy to add that in the documentation.

vineetgoel commented 6 years ago

We currently simply run instances of the app using supervisord. In the future though, we plan on moving to k8.

ask commented 6 years ago

We definitely want documentation for deployment with docker/k8s/etc. though!

zamai commented 5 years ago

I've seen nice MR by @omarrayward which setups the consumer and producer, is a good starting point. The only thing that I'm missing from the example is way of running multiple agents, that consult same topic. If somebody have an example I'd love to see it / add to the docs. Thanks!

AhmedBytesBits commented 5 years ago

@zamai Here is a Dockerfile example

FROM python:3.6.8
WORKDIR /usr/share/app
COPY . /usr/share/app

RUN chmod -R 777 /usr/share/app
RUN pip3 install --upgrade pip
RUN pip3 install -r requirements
RUN python3 db_migrate.py
ENTRYPOINT faust -A aggregator worker -l info

The last line is changed for each replica-set aka deployments so one docker could be: ENTRYPOINT faust -A aggregator worker -l info

While another could be:

ENTRYPOINT faust -A upload worker -l info

So using same code base while running different workers with different configuration

zamai commented 5 years ago

Hey @ahmedrshdy Thanks for the example and explanation!

Following up on your example, I was trying to run multiple aggregator workers to scale the throughput of my workers. If I want kubernetes to scale up the number of the pods with the same configuration (that would consume from the same topic) I can add an additional unique identifier to the command: part of the manifest.

metadata.name would be a good candidate for unique id

ENTRYPOINT faust -A aggregator-{pod1.name} worker -l info
ENTRYPOINT faust -A aggregator-{pod2.name} worker -l info
ENTRYPOINT faust -A aggregator-{pod3.name} worker -l info

I'll test this idea in my cluster and if it works, will add PR that would describe this in docs.

Sturgelose commented 5 years ago

Hey I just found this issue and I'm also interested in the how-to in K8s with multiple agents.

I'm half way testing Faust (as I still have to find some time to finish to check how it works). Anyways, as I understand, each agent has its own RocksDB and will get a set of shards from Kafka through Faust, which means they need to have their id with the same value (at least this is what I got from the last two previous comments).

If it's so, instead of using the pod name and deployments, we should (well, at least I'd suggest) to be using StatefulSets (https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/#pod-identity), which give an ID to each of the workers and in case it dies it assigns the same ID again. Otherwise, the pod would die, and the name would change (the end of the pod's hash changes after the pod's recreation).

Then, my worries are: if I scale up or down the amount of agents, what happens? I haven't had enough time to test this with Faust.... In upscale, I guess that new shards get rescheduled? In downscale, do we recalculate all the history to get the aggregations? Or data in RocksDB is sent to the leader and this one reschedules everything?

To sum it up I'd like to build an autoscaling set of Faust workers so we can scale up or down the amount of resources needed to process all the events in case of spikes, which K8s Horizontal Pod Autoscaler (which works for both Deployments and Stateful Sets) mostly takes care of everything :D

riccardolorenzon commented 4 years ago

@Sturgelose i'm interested on the outcome of your investigation. Did you deploy Faust workers as a StatefulSet in k8s? what are your takes on that?

Sturgelose commented 4 years ago

@riccardolorenzon Sorry for answering late, but since then I haven't ben able to do any tests (too damn busy to find some time to check the logic). Mostly the same questions I had in february still exist: what happens if I scale up or down?

This problematic only happens when using tables/windowing/rocksDB. If you do not store any state locally, you won't have any problem scaling up or down.

I did some tests locally in my computer creating and deleting agents. What I saw is the following (though it may have changed in the latest versions): Scale UP (Easy)

Scale DOWN (the tricky part)

So, the main problem here is that if we are doing autoscaling we are definitely removing the node and killing it, which will force other agent nodes to reprocess everything (and a waste of time, resources and CPU power making it slow).

What could make sense is that if the node is closed/shut down properly (no SIGKILL), the node could to a shutdown procedure to forward all its data to the leader and then the leader to redistribute it or maybe just forward it to the other new owner (this could be a nice feature :D ). However, now, let's imagine a worst case: what would happen if while forwarding this data to the leader, there is a failure, or there is a failure when forwarding the data to the new responsible owner. What do we do here? I'm not quite sure... Though, maybe another idea would be to do a "backup" or a "diff backup" every X time to the leader and other places in order to make sure we do not lose anything. BUt... Maybe we can use Kafka for this? Though the size limit for a message is pretty low...

Anyways, those are lots of thoughts and ideas, but there is one clear thing: we need a way to migrate data to other agents whenever an agent starts closing or otherwise we have a painful reprocessing of data. I guess this is a must if you are using tables and windowed operations. Otherwise, you can have fault toleration and scaling up, but downscaling and autoscaling become painful.

Not sure if we could maybe come up with a plan, but if there are ideas and suggestions sure we can come up with a PR to include all the extra logic to handle scaling-down agents :)

Also, if everything I just wrote already exists, please tell me because I'll start testing it right away :D

What do you think?

gcemaj commented 4 years ago

Also curious about this. Whats the correct way to deploy a faust app such that different stages have different number of workers? How does that work with internal topics?