Parsely / pykafka

Apache Kafka client for Python; high-level & low-level consumer/producer, with great performance.
http://pykafka.readthedocs.org/
Apache License 2.0
1.12k stars 232 forks source link

Last two messages are not written to the Topic #994

Open ijbo opened 4 years ago

ijbo commented 4 years ago

Below is my code : Problem : Producer not writing the last two messages to the Topic.

`import os
import asyncio
import websockets
from pykafka import KafkaClient
from websockets.extensions import permessage_deflate

class Server:
    client = None
    kafka_client = None
    topic_name = "tpa_19"
    topic = None
    producer = None

    def get_port(self):
        return os.getenv('WS_PORT', '10015')

    def connect_kafka_client(self):
        client = KafkaClient(hosts="localhost:9092", use_greenlets=True)
        self.client = client
        self.set_topic()
        print("Connection Done with Kafka")

    def set_topic(self):
        self.topic = self.client.topics[self.topic_name]
        self.producer = self.topic.get_producer(min_queued_messages=1,max_queued_messages=0,
                                                linger_ms=500)

    def get_host(self):
        return os.getenv('WS_HOST', 'localhost')

    def start(self):
        return websockets.serve(self.handler, self.get_host(), self.get_port(), ping_interval=None, max_size=None,
                                max_queue=None,close_timeout=None,extensions=[
        permessage_deflate.ServerPerMessageDeflateFactory(
            server_max_window_bits=11,
            client_max_window_bits=11,
            compress_settings={'memLevel': 4},
        ),
    ])

    async def send_message_to_kafka(self, producer, row):
        try:
            # print(row)
            producer.produce(row.encode())
        except Exception as ex:
            print(ex)

    async def handler(self, websocket, path):
        async for row in websocket:
             await self.send_message_to_kafka(self.producer, row)

if __name__ == '__main__':
    ws = Server()
    ws.connect_kafka_client()
    asyncio.get_event_loop().run_until_complete(ws.start())
    asyncio.get_event_loop().run_forever()
`