robinhood / faust

Python Stream Processing
Other
6.73k stars 535 forks source link

What could be causing inconsistent message processing in a Kafka-based system used to synchronize data between MSSQL and MySQL databases, despite successful message delivery to the Kafka topic #780

Open nithinmurali94 opened 4 months ago

nithinmurali94 commented 4 months ago

Hi, I'm encountering an issue with my application. It involves using Kafka to synchronize data between MSSQL and MySQL databases. Specifically, I've employed the Kafka JDBC connector to retrieve data from MSSQL. Additionally, I've developed a Faust application to process messages from the Kafka topic, which in this instance is named 'orderentry'.

In my testing, I noticed that when I input 100 records (modifying the MSSQL database) into the Kafka topic 'orderentry', I verified that 100 messages successfully reached the topic by monitoring it through the terminal. However, I've observed inconsistent behavior with Faust processing. Sometimes it successfully processes all 100 messages, but at other times, it only processes a subset, such as 75 or 80 messages, without any apparent errors logged.

I'm uncertain why this discrepancy occurs. Below, I've included a portion of the relevant code for reference.

app = faust.App('alpha100', broker='kafka://localhost')

@app.agent(app.topic('OrderEntry', key_serializer='json', value_serializer='json'), concurrency=10) async def alpha2_OrderEntry_handler(stream): async for key, value in stream.items(): try: print("[{}] alpha2 OrderEntry Topic Handler |".format( utils.get_current_datetime_in_ms())) ariel_inventory.order_detail(value) print("Sync using Topic From alpha2 to Ariel4 | alpha2.OrderEntry | OrderEntry data is : {}".format(value['payload'])) if utils.check_alpha2_sync_flag(value): await app.loop.run_in_executor(thread_pool, test_order, value) except Exception as ex : print(f"{key} error in processing: {ex}")

Please help! I'm stuck