wintoncode / winton-kafka-streams

A Python implementation of Apache Kafka Streams
Apache License 2.0
313 stars 55 forks source link

Wordcount example: TypeError: an integer is required (got type tuple) #61

Open dqii opened 4 years ago

dqii commented 4 years ago

I'm getting an error when I run the wordcount example. I cloned the git repo and did not make any changes. The issue seems to be that timestamp is represented as a tuple, but it expects an integer.

I ran zookeeper-server-start.sh, kafka-server-start.sh, example.py, and source_client.py.

Source client output:

producing a b c to wks-wordcount-example-topic source_client.py:9: DeprecationWarning: PY_SSIZE_T_CLEAN will be required for '#' formats p.produce(topic, data.encode('utf-8')) producing a b to wks-wordcount-example-topic producing a to wks-wordcount-example-topic

example.py output:

WARNING:winton_kafka_streams.processor._stream_thread(Thread-1):Unexpected state transition from RUNNING to ASSIGNING_PARTITIONS. WARNING:winton_kafka_streams.processor._stream_thread(Thread-1):Unexpected state transition from RUNNING to NOT_RUNNING. Exception in thread Thread-1: Traceback (most recent call last): File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner self.run() File "/usr/local/lib/python3.8/threading.py", line 870, in run self._target(*self._args, **self._kwargs) File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/processor/_stream_thread.py", line 140, in run self.process_and_punctuate() File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/processor/_stream_thread.py", line 181, in process_and_punctuate if task.process(): File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/processor/_stream_task.py", line 122, in process self.topology.sources[topic].process(key, value) File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/processor/topology.py", line 25, in process self.processor.process(key, value) File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/processor/processor.py", line 35, in process self.context.forward(key, value) File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/processor/processor_context.py", line 49, in forward child.process(key, value) File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/processor/topology.py", line 25, in process self.processor.process(key, value) File "example.py", line 37, in process self.word_count_store[word] = count + 1 File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/state/logging/change_logging_state_store.py", line 46, in setitem self.change_logger.log_change(key_bytes, value_bytes) File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/state/logging/store_change_logger.py", line 10, in log_change self.record_collector.send(self.topic, key, value, self.context.timestamp, partition=self.partition) File "/home/diqi/venv/lib/python3.8/site-packages/winton_kafka_streams/processor/_record_collector.py", line 38, in send self.producer.produce(topic, ser_value, ser_key, partition, self.on_delivery, partitioner, timestamp) TypeError: an integer is required (got type tuple)

seanrmurphy commented 4 years ago

I experienced the same issue - it seems to be an issue with the timestamp handling (not sure why) - try changing the offending line to

                self.producer.produce(topic, ser_value, ser_key, partition, self.on_delivery, partitioner, timestamp[1])
MaximWolpher commented 4 years ago

You can try this fix: https://github.com/wintoncode/winton-kafka-streams/pull/58/files I had issues with timestamp[1] due to sometimes not receiving a tuple but a float.