faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.66k stars 183 forks source link

Unable to subscribe to topics by pattern #62

Open xgamer4 opened 3 years ago

xgamer4 commented 3 years ago

Steps to reproduce

Created a topic specifying pattern= with a valid regex that should have matched multiple topics on the kafka cluster. Running the worker with -l info shows everything starting up, but the only topic subscription is to the base assignor topic.

This is a known, existing issue dating back to at least July 25, 2019 as detailed in robinhood/faust: https://github.com/robinhood/faust/issues/390

I suspect, but have not verified, that the issue occurs in 'faust/faust/transport/drivers/aiokafka:subscribe' given the comment XXX pattern does not work :/ from a prior dev.

Still occurs on master

Expected behavior

Actual behavior

Full traceback

No traceback - silently fails to do anything

Versions

import faust
import io
import json
from datetime import datetime
import random
import os
import ssl
import sys

from decimal import *

import typing

class Greeting(faust.Record):
    from_name: str
    to_name: str

app = faust.App('faust-dev', broker='kafka://broker:29092')
topic = app.topic('MyGreatTopic-1', value_type=Greeting)
topic2 = app.topic('MyGreatTopic-2', value_type=Greeting)
topic3 = app.topic('MyGreatTopic-3', value_type=Greeting)

@app.task
async def create_topics():
    await topic.maybe_declare()
    await topic2.maybe_declare()
    await topic3.maybe_declare()

regex_topic = app.topic(pattern="^MyGreatTopic-.*$", value_type=Greeting)

@app.agent(regex_topic)
async def hello(greetings):
    async for event in greetings.events():
        greeting = event.value
        print(f'{event.message.topic} says: Hello from {greeting.from_name} to {greeting.to_name}')

@app.timer(interval=1.0)
async def example_sender(app):
    await topic.send(
        value=Greeting(from_name='Faust', to_name='you'),
    )
    await topic2.send(value=Greeting(from_name='Faust 2', to_name='you'))
    await topic3.send(value=Greeting(from_name='Faust 3', to_name='you'))

if __name__ == '__main__':
    app.main()
xgamer4 commented 3 years ago

I've been doing a bit of research and I think the problem is two-fold:

Faust-Side not linking pattern to anything:

So the pattern parameter is sent and verified on Topic creation and then... never touched again creating a functionally-useless topic.

Faust <-> aiokafka connections:

Thoughts then. Pretending to support regex matching for topics, exposing a way to do it, then silently failing and not actually having a way after all is not good behavior. Things that could be done:

Adding support for pattern= seems like the best solution, and doing so by allowing more than one AIOKafkaConsumer, seems like the ideal course of action, but that's a fairly large change to the overall architecture and I'm not sure what that might do for performance - I'm not sure why that initial architecture was chosen to begin with.

Hamdiovish commented 3 years ago

Any update on this issue please? The regex based subscription is a crucial feature in streaming applications, any plan to support it?

wbarnha commented 1 year ago

Just experimented with this. Don't worry I didn't forget. This is really complicated. If a user seeks to use regex patterns with their Faust applications, they'll need to make sure separate consumers are used. Otherwise you'll see errors such as:

[^---Conductor]: Crashed reason=IllegalStateError('Subscription to topics, partitions and pattern are mutually exclusive: SubscriptionType.AUTO_TOPICS') 

Note that I added the SubscriptionType log statement manually for the sake of understanding what's going on here. So if you have any table topics or channels with defined topic names, it'll crash. I'll have some changes committed soon with a rough demo of what I did.

Even if I patch this, I have more bugs to fix:

[^--Consumer]: Drain messages raised: KeyError(TopicPartition(topic='inserttopicnameherelol', partition=0)) 

Edit: Some changes are in the regex-version branch, hopefully I can resume progress soon.

wbarnha commented 1 year ago

On further examination, since the majority of Faust functions require topic names to be explicitly declared and share a single Kafka Consumer, it's not possible to support topic and pattern specification simultaneously. You will have to create a separate AIOKafkaConsumer instance with a pattern subscription. See https://aiokafka.readthedocs.io/en/stable/consumer.html#topic-subscription-by-pattern for an example.

The good news is that it's possible to run this Consumer inside your Faust application:

@app.task
async def consume(self):
    consumer = aiokafka.AIOKafkaConsumer(**dict_containing_your_config)
    # Apparently you need to subscribe before starting and seeking, contrary to what the aiokafka docs depict in examples?
    consumer.subscribe(topics=[], pattern=r"[0-9a-f]{32}\Z")  # UUID regex pattern for an example
    await consumer.start()
    await consumer.seek_to_beginning()
    for tp, messages in (await consumer.getmany(timeout_ms=1000)).items():
         for msg in messages:
             print("Consumed msg %s %s %s" % (msg.topic, msg.partition, msg.value))

I apologize that I can't provide a more helpful solution in Faust right off the bat. A single application uses a single Kafka Consumer at a time, and I'd need to figure out how to split an application across multiple consumers. You'd think that Faust would already have that functionality, given how it can handle the complexity of simultaneously balancing so many tables, etc., but apparently not.