astronomer / airflow-provider-kafka

A provider package for kafka
Apache License 2.0
37 stars 16 forks source link

Unable to choose assign strategy for consumers #21

Closed BrandeeDev closed 1 year ago

BrandeeDev commented 1 year ago

Hello Sometimes we want to choose a different strategy for consumers like assigning one partition per consumer.

Actual behavior

def get_consumer(self) -> Consumer:
        """
        Returns a Consumer that has been subscribed to topics.
        """

        consumer = Consumer({**self.extra_configs, **self.config})
        consumer.subscribe(self.topics)

        return consumer

Desired behavior


def get_consumer(self) -> Consumer:

    consumer = Consumer({**self.extra_configs, **self.config})
    consumer.subscribe([self.input_topic], on_assign=self.on_assign)
    return consumer

def on_assign(
    self, consumer: DeserializingConsumer, assignment: List[TopicPartition]
):
    cluster_metadata: ClusterMetadata = self.admin.list_topics()
    topic_metadata: TopicMetadata = cluster_metadata.topics[self.input_topic]
    partitions = [
        TopicPartition(topic=self.input_topic, partition=partition)
        for partition in topic_metadata.partitions
    ]

    group_assignment: Dict[str, List[TopicPartition]] = custom_assignment_strategy(
        partitions
    )

    assignment = [
        assignment
        for member_id, assignment in group_assignment.items()
        if member_id == self.client_id
    ][0]
    self.consumer.assign(assignment)
dylanbstorey commented 1 year ago

Please feel free to submit a pull request.