faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.66k stars 183 forks source link

Message isn't published although logs say it was sent #393

Open eblis opened 2 years ago

eblis commented 2 years ago

Checklist

Steps to reproduce

I have some Faust streaming code that publishes messages to a Kafka topic, but although the logs report that the message was sent I can't see it in Kafka using Offset Explorer.

class Container(faust.Record, serializer='json', coerce=True):
    vin: str
    container_location: str
    kam: str
    container_timestamp: datetime.datetime  # dates should always be in UTC "timezone"
    request_id: Optional[str] = ""

class ContainerPublisher:
    # [..]

    def __init__(self, config):
        self.config = config

        # default_topic_partitions = 16
        # store = "memory://"
        self.faust_app = faust.App(name, broker=broker_url, logging_config=logging_config, topic_partitions=default_topic_partitions, store=store, web_port=web_port)

        # [..]

        # config.kafka.topic.partitions = 64

        self._containers_topic = self.faust_app.topic(
            config.kafka.topic.name,
            value_type=Container,
            partitions=config.kafka.topic.partitions,
            retention=datetime.timedelta(weeks=4 * 6))

    async def publish_containers(self, vehicles: dict, raw_containers: dict[]):
        logger.info(f"Publishing {len(raw_containers)} containers to Kafka")
        converted: dict[str, Container] = {k: self._convert_container(vehicles=vehicles, key=k, raw_container=v) for k,v in raw_containers.items()}

        for candidate in converted:
            logger.info(f"Publishing {candidate} container to Kafka")
            value = converted[candidate]
            result = await self._containers_topic.send(key=candidate, value=value, timestamp=value.container_timestamp.timestamp())
        logger.info(f"Finished publishing {len(raw_containers)} containers to Kafka")

Expected behavior

The message should be available in Kafka if it's reported as being sent.

Actual behavior

I can't see the message in Kafka using Offset Explorer.

Full traceback

I have this log in the application which makes me understand that the message was recorded as being sent to Kafka

2022-10-24 15:08:16,234 - [vin] - {MainThread} - [peds.__main__] - INFO - Publishing 1 containers to Kafka
2022-10-24 15:08:16,249 - [vin] - {MainThread} - [peds.__main__] - INFO - Publishing vin_20221024_125153 container to Kafka
2022-10-24 15:08:16,249 - [vin] - {MainThread} - [faust.topics] - DEBUG - send: topic='containers-production' k=b'vin_20221024_125153' v=b'{"vin":"vin","container_location":"vin_20221024_125153.ZIP","kam":"kam","container_timestamp":"2022-10-24T12:51:53","request_id":"","__faust":{"ns":"models.Container.Container"}}' timestamp=1666608713.0 partition=None
2022-10-24 15:08:16,249 - [vin] - {MainThread} - [peds.__main__] - INFO - Finished publishing 1 containers to Kafka

But in Kafka I have no message with that key: image

Versions

nucflash commented 8 months ago

+1 using faust-streaming==0.10.23 + rocksDB; consumer and producer running on macOS M2Max connecting to a docker Kafka 3.3 (by bitnami) w/ KRaft