python-streamz / streamz

Real-time stream processing for python
https://streamz.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
1.24k stars 149 forks source link

Multi-kafka for high availability #308

Open jsmaupin opened 4 years ago

jsmaupin commented 4 years ago

We are setting up Kafka in multiple data centers to achieve high availability. I expected that Streamz could do a union between both of these clusters. Everything works as expected, but when I stop one of the Kafka clusters, the stream freezes. I'm thinking there is a callback somewhere that is blocking operations?

import asyncio

import confluent_kafka as ck
from streamz import from_kafka_batched

async def produce(broker, cluster_id, topic):
    p = ck.Producer({'bootstrap.servers': broker})

    for i in range(100000):
        p.poll(0)
        p.produce(topic, 'cluster-{} value-{}'.format(cluster_id, i))
        p.flush(timeout=0.5)
        await asyncio.sleep(1)

async def main():
    topic = 'test-topic'
    args1 = {'bootstrap.servers': 'localhost:9092', 'group.id': 'test-group'}
    args2 = {'bootstrap.servers': 'localhost:9093', 'group.id': 'test-group'}
    a = from_kafka_batched(topic, args1, npartitions=1, start=True,
                           asynchronous=True)
    b = from_kafka_batched(topic, args2, npartitions=1, start=True,
                           asynchronous=True)

    a.union(b).sink(print)

    asyncio.create_task(produce('localhost:9092', 1, topic))
    asyncio.create_task(produce('localhost:9093', 2, topic))

    while True:
        await asyncio.sleep(1)

if __name__ == '__main__':
    asyncio.run(main())
satishvarmadandu commented 4 years ago

Thanks @jsmaupin for opening the issue. Ideally we would like "Union" of multiple streams with stream failure support. If any stream times-out or can't fetch messages; then it would be good to have an option to return empty list. By default may be we can match with other streaming solutions to fail union if any one of the stream fails. But we should be able to pass a flag so failed stream would return an empty for union to work

There are several advantages with this approach. We have a primary kafka and secondary kafka (could be AWS MSK). Secondary kafka will have messages only if primary is in failed state. Having Streamz doing a seamless union would make downstream system job easier to have more HA.

martindurant commented 4 years ago

Agree that an exception upstream should not affect the functioning of a union node.

Conversely, an exception downstream (even if caused by a reference counter callback) should stop all earlier parts of the stream, unless we have a way of working out which input caused the failure.

jsmaupin commented 4 years ago

We have two inputs into the stream, stream a and stream b. Would it make sense if the pipeline kept moving on stream b if, for example, stream a stopped working due to an issue with Kafka? Then, when the Confluent lib was able to connect again, everything would continue working as before.

martindurant commented 4 years ago

Yes, I believe it would make sense for the stream to continue if one of the inputs to a union node fails, whether or not it comes back. For other multi-input nodes like zip, you need all the upstreams to be live.

jsmaupin commented 4 years ago

Note that this has been root caused to this existing issue: https://github.com/confluentinc/confluent-kafka-python/issues/413