alm0ra / mockafka-py

Mockafka-py is a Python library designed for in-memory mocking of Kafka.[aiokafka - confluence-kafka-python]
https://mockafka-py.readthedocs.io
MIT License
44 stars 11 forks source link

Fix implementation of repeated single-message fetching #110

Closed PeterJCLaw closed 3 months ago

PeterJCLaw commented 3 months ago

Summary

This brings the mock implementations in line with the behaviour observed in the default configurations of confluent-kafka and aiokafka respectively, both of which return subsequent messages on subsequent calls.

Testing

Tested by running kafka locally and validating the behaviour of the relevant libraries manually. Commands & demo scripts below.

$ docker run --detach -p 9092:9092 --rm apache/kafka
$ docker run -it --network=host --rm apache/kafka /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic testing-topic
$ docker run -it --network=host --rm apache/kafka /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic testing-topic
import uuid
import confluent_kafka

def main():
    consumer = confluent_kafka.Consumer({
        'bootstrap.servers': 'localhost:9092',
        'group.id': f'testing-group-{uuid.uuid4()}',
    })
    consumer.subscribe(['testing-topic'])
    print("Running")
    while True:
        msg = consumer.poll()
        print((msg.topic(), msg.key(), msg.value()))

main()
import asyncio
import aiokafka

async def main():
    consumer = aiokafka.AIOKafkaConsumer('testing-topic')
    await consumer.start()
    print("Running")
    while True:
        print(await consumer.getone())

asyncio.run(main())