robinhood / faust

Python Stream Processing
Other
6.75k stars 534 forks source link

Creation of stream dynamically not working as expected #512

Open krejcmat opened 4 years ago

krejcmat commented 4 years ago

Hello!

Checklist

Steps to reproduce

App worker app.py

import faust
from faust import current_event
from mode.utils.aiter import aiter

app = faust.App('app16',
                broker='kafka://localhost:9092',
                store='memory://',
                consumer_auto_offset_reset='earliest',
                stream_wait_empty=False,
                autodiscover=False)
class CommandSubscribe(faust.Record):
    topic: str
command_topic = app.topic('command_topic',  value_type=CommandSubscribe)

@app.agent(command_topic, concurrency=2)
async def command_consumer(stream):
    async for record in stream:
        event = current_event()
        print(f'Got {record}')
        topic = app.topic(record.topic, value_type=bytes, value_serializer='raw')
        topic_iterator = aiter(topic)
        app.topics.add(topic)
        new_stream = faust.Stream(topic_iterator, app=app)
        event.ack()
        await data_consumer(new_stream)
        print('awaited')

async def data_consumer(stream):
    print('consumer called')
    counter = 0
    async for record in stream:
        event = current_event()
        if counter == 30:
            print('exiting')
            break
        print(f'Consumed {record}')
        counter += 1
        event.ack()
    print('exited')

if __name__ == '__main__':
    app.main()

Data producer worker data_producer.py

import faust
import asyncio

app = faust.App('app17',
                broker='kafka://localhost:9092',
                store='memory://',
                web_port=6067,
                autodiscover=False)
class CommandSubscribe(faust.Record):
    topic: str

command_topic = app.topic('command_topic', value_type=CommandSubscribe)

@app.task()
async def task():
    print('sending command')
    data_topic_name = 'data_topic'
    data_topic = app.topic(data_topic_name)
    await command_topic.send(value=CommandSubscribe(topic=data_topic_name))

    for idx in range(20):
        print('task sending data..')
        idx1=idx+5000
        await data_topic.send(value=f'message {idx1}_1')
        await data_topic.send(value=f'message {idx1}_2')
        await data_topic.send(value=f'message {idx1}_3')
        await data_topic.send(value=f'message {idx1}_4')
        await asyncio.sleep(4)

@app.task()
async def task1():
    print('sending command')
    data_topic_name1 = 'data_topic1'
    data_topic1 = app.topic(data_topic_name1)
    await command_topic.send(value=CommandSubscribe(topic=data_topic_name1))

    for idx in range(20):
        print('task1 sending data..')
        idx1=idx+4000
        await data_topic1.send(value=f'message {idx1}_1')
        await data_topic1.send(value=f'message {idx1}_2')
        await data_topic1.send(value=f'message {idx1}_3')
        await data_topic1.send(value=f'message {idx1}_4')
        await asyncio.sleep(4)

if __name__ == '__main__':
    app.main()

Expected behavior

When creating stream dynamically 1) it starts to consume in a reasonable time 2) it starts to consume from the earliest offset (consumer_auto_offset_reset='earliest')

Actual behavior

When creating the stream dynamically on runtime there are occurring two unexpected behaviours 1) After the creation of a new stream, it takes approx 45 seconds until it starts to consume from the stream. 2) It ignores configuration

consumer_auto_offset_reset='earliest'

and starts to consume only from that time coming messages

Question: I wonder if this is the right way to create the topic dynamically or if there is a better approach.

Full traceback

app.py

┌ƒaµS† v1.10.0┬─────────────────────────────────────────────┐
│ id          │ app16                                       │
│ transport   │ [URL('kafka://localhost:9092')]             │
│ store       │ memory:                                     │
│ web         │ http://localhost:6066/                      │
│ log         │ -stderr- (info)                             │
│ pid         │ 21034                                       │
│ hostname    │ localhost.localdomain                       │
│ platform    │ CPython 3.7.3 (Linux x86_64)                │
│ drivers     │                                             │
│   transport │ aiokafka=1.1.3                              │
│   web       │ aiohttp=3.6.2                               │
│ datadir     │ /home/matt/PycharmProjects/kx/app16-data    │
│ appdir      │ /home/matt/PycharmProjects/kx/app16-data/v1 │
└─────────────┴─────────────────────────────────────────────┘
[2020-01-17 10:27:22,479] [21034] [INFO] [^Worker]: Starting... 
[2020-01-17 10:27:22,512] [21034] [INFO] [^-App]: Starting... 
[2020-01-17 10:27:22,513] [21034] [INFO] [^--Monitor]: Starting... 
[2020-01-17 10:27:22,514] [21034] [INFO] [^--Producer]: Starting... 
[2020-01-17 10:27:22,515] [21034] [INFO] [^---ProducerBuffer]: Starting... 
[2020-01-17 10:27:22,558] [21034] [INFO] [^--CacheBackend]: Starting... 
[2020-01-17 10:27:22,558] [21034] [INFO] [^--Web]: Starting... 
[2020-01-17 10:27:22,559] [21034] [INFO] [^---Server]: Starting... 
[2020-01-17 10:27:22,560] [21034] [INFO] [^--Consumer]: Starting... 
[2020-01-17 10:27:22,561] [21034] [INFO] [^---AIOKafkaConsumerThread]: Starting... 
[2020-01-17 10:27:22,608] [21034] [INFO] [^--LeaderAssignor]: Starting... 
[2020-01-17 10:27:22,609] [21034] [INFO] [^--Producer]: Creating topic 'app16-__assignor-__leader' 
[2020-01-17 10:27:22,722] [21034] [INFO] [^--Producer]: Topic 'app16-__assignor-__leader' created. 
[2020-01-17 10:27:22,736] [21034] [INFO] [^--ReplyConsumer]: Starting... 
[2020-01-17 10:27:22,738] [21034] [INFO] [^--AgentManager]: Starting... 
[2020-01-17 10:27:22,739] [21034] [INFO] [^---Agent: __main__.command_consumer]: Starting... 
[2020-01-17 10:27:22,746] [21034] [INFO] [^----OneForOneSupervisor: (2@0x7f4b6ee5dd30)]: Starting... 
[2020-01-17 10:27:22,748] [21034] [INFO] [^---Conductor]: Starting... 
[2020-01-17 10:27:22,748] [21034] [INFO] [^--TableManager]: Starting... 
[2020-01-17 10:27:22,750] [21034] [INFO] [^---Conductor]: Waiting for agents to start... 
[2020-01-17 10:27:22,752] [21034] [INFO] [^---Conductor]: Waiting for tables to be registered... 
[2020-01-17 10:27:23,752] [21034] [INFO] [^---Recovery]: Starting... 
[2020-01-17 10:27:23,756] [21034] [INFO] [^--Producer]: Creating topic 'app16-__assignor-__leader' 
[2020-01-17 10:27:24,611] [21034] [INFO] Updating subscribed topics to: frozenset({'command_topic', 'app16-__assignor-__leader'}) 
[2020-01-17 10:27:24,614] [21034] [INFO] Subscribed to topic(s): {'command_topic', 'app16-__assignor-__leader'} 
[2020-01-17 10:27:24,659] [21034] [INFO] Discovered coordinator 1 for group app16 
[2020-01-17 10:27:24,669] [21034] [INFO] Revoking previously assigned partitions set() for group app16 
[2020-01-17 10:27:24,692] [21034] [WARNING] Topic command_topic is not available during auto-create initialization 
[2020-01-17 10:27:25,612] [21034] [INFO] (Re-)joining group app16 
[2020-01-17 10:27:25,638] [21034] [INFO] Joined group 'app16' (generation 1) with member_id faust-1.10.0-c100be8c-252b-46c9-8b09-e17bd787cc0f 
[2020-01-17 10:27:25,639] [21034] [INFO] Elected group leader -- performing partition assignments using faust 
[2020-01-17 10:27:25,643] [21034] [WARNING] Ignoring missing topic: 'command_topic' 
[2020-01-17 10:27:25,668] [21034] [INFO] Successfully synced group app16 with generation 1 
[2020-01-17 10:27:25,670] [21034] [ERROR] Rejoining group -- Need to rejoin! -- Topics not yet created: {'command_topic'} 
[2020-01-17 10:27:25,686] [21034] [INFO] (Re-)joining group app16 
[2020-01-17 10:27:25,717] [21034] [INFO] Joined group 'app16' (generation 2) with member_id faust-1.10.0-c100be8c-252b-46c9-8b09-e17bd787cc0f 
[2020-01-17 10:27:25,718] [21034] [INFO] Elected group leader -- performing partition assignments using faust 
[2020-01-17 10:27:25,734] [21034] [INFO] Successfully synced group app16 with generation 2 
[2020-01-17 10:27:25,735] [21034] [INFO] Setting newly assigned partitions {TopicPartition(topic='app16-__assignor-__leader', partition=0), TopicPartition(topic='command_topic', partition=0)} for group app16 
[2020-01-17 10:27:26,532] [21034] [INFO] [^---Recovery]: Resuming flow... 
[2020-01-17 10:27:26,533] [21034] [INFO] [^---Recovery]: Seek stream partitions to committed offsets. 
[2020-01-17 10:27:27,526] [21034] [INFO] [^---Fetcher]: Starting... 
[2020-01-17 10:27:27,528] [21034] [INFO] [^---Recovery]: Worker ready 
[2020-01-17 10:27:27,530] [21034] [INFO] [^Worker]: Ready 
[2020-01-17 10:27:37,535] [21034] [WARNING] Got <CommandSubscribe: topic='data_topic1'> 
[2020-01-17 10:27:37,536] [21034] [WARNING] consumer called 
[2020-01-17 10:27:37,536] [21034] [WARNING] Got <CommandSubscribe: topic='data_topic'> 
[2020-01-17 10:27:37,539] [21034] [WARNING] consumer called 
[2020-01-17 10:28:22,544] [21034] [INFO] [^--Producer]: Creating topic 'app16-__assignor-__leader' 
[2020-01-17 10:28:22,566] [21034] [INFO] Updating subscribed topics to: frozenset({'data_topic', 'command_topic', 'data_topic1', 'app16-__assignor-__leader'}) 
[2020-01-17 10:28:22,568] [21034] [INFO] Subscribed to topic(s): {'data_topic', 'command_topic', 'data_topic1', 'app16-__assignor-__leader'} 
[2020-01-17 10:28:22,572] [21034] [INFO] Revoking previously assigned partitions frozenset({TopicPartition(topic='app16-__assignor-__leader', partition=0), TopicPartition(topic='command_topic', partition=0)}) for group app16 
[2020-01-17 10:28:22,659] [21034] [INFO] (Re-)joining group app16 
[2020-01-17 10:28:22,666] [21034] [INFO] Joined group 'app16' (generation 3) with member_id faust-1.10.0-c100be8c-252b-46c9-8b09-e17bd787cc0f 
[2020-01-17 10:28:22,666] [21034] [INFO] Elected group leader -- performing partition assignments using faust 
[2020-01-17 10:28:22,674] [21034] [INFO] Successfully synced group app16 with generation 3 
[2020-01-17 10:28:22,675] [21034] [INFO] Setting newly assigned partitions {TopicPartition(topic='app16-__assignor-__leader', partition=0), TopicPartition(topic='data_topic', partition=0), TopicPartition(topic='command_topic', partition=0), TopicPartition(topic='data_topic1', partition=0)} for group app16 
[2020-01-17 10:28:22,758] [21034] [INFO] [^---Recovery]: Resuming flow... 
[2020-01-17 10:28:22,758] [21034] [INFO] [^---Recovery]: Seek stream partitions to committed offsets. 
[2020-01-17 10:28:23,593] [21034] [INFO] [^---Recovery]: Worker ready 
[2020-01-17 10:28:26,597] [21034] [WARNING] Consumed b'"message 4012_1"' 
[2020-01-17 10:28:27,163] [21034] [WARNING] Consumed b'"message 4012_2"' 
[2020-01-17 10:28:27,165] [21034] [WARNING] Consumed b'"message 5012_1"' 
[2020-01-17 10:28:27,166] [21034] [WARNING] Consumed b'"message 4012_3"' 
[2020-01-17 10:28:27,167] [21034] [WARNING] Consumed b'"message 5012_2"' 
[2020-01-17 10:28:27,169] [21034] [WARNING] Consumed b'"message 4012_4"' 
[2020-01-17 10:28:27,170] [21034] [WARNING] Consumed b'"message 5012_3"' 
[2020-01-17 10:28:27,171] [21034] [WARNING] Consumed b'"message 5012_4"' 
[2020-01-17 10:28:30,343] [21034] [WARNING] Consumed b'"message 4013_1"' 
[2020-01-17 10:28:30,606] [21034] [WARNING] Consumed b'"message 4013_2"' 
....
....

Versions

Many thanks for any insides!

krejcmat commented 4 years ago

Could anybody give me hint on how to deal with it? Thank you..

luinnar commented 4 years ago

Keyed streams are solution to your problem. Use different keys in single topic instead of creating many topics.