Parsely / pykafka

Apache Kafka client for Python; high-level & low-level consumer/producer, with great performance.
http://pykafka.readthedocs.org/
Apache License 2.0
1.12k stars 232 forks source link

Very slow when I use PyKafka with asyncio #1022

Closed littleningmeng closed 3 years ago

littleningmeng commented 3 years ago

here is my code

def process_buff(buff, direction): completed_buff = parser.parse(buff) if completed_buff: if G.dumper: G.dumper.dump(completed_buff, direction) else: logger.debug("buff is not an completed buff")

async def pipe(r, w, direction): try: while not r.at_eof(): buff = await r.read(65535) process_buff(buff, direction) w.write(buff) await w.drain() finally: w.close()

dump is: def dump(self, buff, direction): with self.topic.get_producer(delivery_reports=True) as producer: producer.produce(buff)