bakdata / kpops

Deploy Kafka pipelines to Kubernetes
https://bakdata.github.io/kpops
MIT License
12 stars 1 forks source link

Connector instantiation requires information that may not be present at the moment #413

Closed sujuka99 closed 6 months ago

sujuka99 commented 6 months ago

Example code:

class CustomApp(StreamsApp):

    def inflate(self) -> list[PipelineComponent]:
        return [self, *self.derive_connectors()]

    def derive_connectors(self) -> list[PipelineComponent]:
        connectors = []
        for topic_name in [...]:
              connectors.append(self.to_connector(topic_name))
        return connectors

    def to_connector(self, topic_name) -> KafkaSinkConnector:
        name = f"{topic_name}"
        return KafkaSinkConnector(
            name=name,
            config=self.config,
            handlers=self.handlers,
            app=KafkaConnectorConfig(
                **{
                    "name": "${pipeline.name}-" + name,  # using `self.full_name` results in an error
                    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",  # Cannot be set in `defaults.yaml`
                }
            ),
        )

When initializing the connector in the above example, self.full_name == pipeline_name-connector_name, but ${pipeline.name}-connector_name is expected, i.e. self.prefix is already substituted.

It can be circumvented by setting the name in KafkaConnectorConfig to KafkaSinkConnector.model_fields["prefix"].default + name, but it is a workaround.

Closely related to #412