quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.6k stars 2.63k forks source link

K8s Service Binding for Kafka #15631

Closed FroMage closed 3 years ago

FroMage commented 3 years ago

Support setting up Kafka via the new Service Binding setup

What keys are required/supported?

quarkus-bot[bot] commented 3 years ago

/cc @cescoffier

FroMage commented 3 years ago

/cc @geoand

geoand commented 3 years ago

cc @wtrocki

lburgazzoli commented 3 years ago

/cc @nicolaferraro @johnpoth

geoand commented 3 years ago

@wtrocki we likely want to place the service binding glue code in either quarkus-smallrye-reactive-messaging-kafka or quarkus-kafka-client. Probably best to do it in the former for now.

In that module you'll have to add an optional dependency to quarkus-kubernetes-service-binding, like so:

        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-kubernetes-service-binding</artifactId>
            <optional>true</optional>
        </dependency>

The code itself should be very simple, something like:

public class KafkaBindingConverter implements ServiceBindingConverter {
    @Override
    public Optional<ServiceBindingConfigSource> convert(List<ServiceBinding> serviceBindings) {
                Optional<ServiceBinding> matchingByType = ServiceBinding.singleMatchingByType("kafka", serviceBindings);
        if (!matchingByType.isPresent()) {
            return Optional.empty();
        }

        Map<String, String> properties = new HashMap<>();
        ServiceBinding binding = matchingByType.get();

        //TODO: add the property mappings

        return Optional.of(new ServiceBindingConfigSource("kafka-k8s-service-binding-source", properties));
    }
}

Much like https://github.com/quarkusio/quarkus/blob/031e5bf4eddb2f6399ef8c726e8df8e63bb8439c/extensions/jdbc/jdbc-mssql/runtime/src/main/java/io/quarkus/jdbc/mssql/runtime/MsSQLServiceBindingConverter.java does for example.

lburgazzoli commented 3 years ago

If it is an option, I'd like to get it in quarkus-kafka-client or a dedicated extension rather than in the smallrye-reactive-messaging because that would require to bring the reactive messaging deps into the mix i.e. to use the service binding in camel-quarkus apps

machi1990 commented 3 years ago

If it is an option, I'd like to get it in quarkus-kafka-client or a dedicated extension rather than in the smallrye-reactive-messaging because that would require to bring the reactive messaging deps into the mix i.e. to use the service binding in camel-quarkus apps

+1 on adding it to both.

geoand commented 3 years ago

If it is an option, I'd like to get it in quarkus-kafka-client or a dedicated extension rather than in the smallrye-reactive-messaging because that would require to bring the reactive messaging deps into the mix i.e. to use the service binding in camel-quarkus apps

Good point - I hadn't thought about that

johnpoth commented 3 years ago

K8 Service Binding support in Strimzi tracked by https://github.com/strimzi/strimzi-kafka-operator/pull/2753

cescoffier commented 3 years ago

It should be added to the quarkus-kafka-client.

That extension is registering a bean providing the Map<String, ?> used to configure the clients.

In the future, when we add multiple broker support, we would need to extend that to produce multiple Maps with the right name.

cescoffier commented 3 years ago

@lburgazzoli you should use the exposed Map actually.

cescoffier commented 3 years ago

I have been thinking about that a bit this morning.

At the moment the quarkus-kafka-client extension exposes a CDI producer producing a Map<String, ?> with the name (@Named) kafka-default (or something like that). The extension extracts all the properties from the configuration starting with kafka..

This approach can be used for single binding. The properties provided by the binding can be pushed to that map.

Now, in the case of multiple bindings, we could do the following:

In terms of implementation, it's straightforward, as the mechanism is already in place. It just needs a few more Map producers (unremovable as the lookup is going to be programmatic). See https://github.com/quarkusio/quarkus/blob/master/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaRuntimeConfigProducer.java. They will have to be generated (unlike the existing one) and take the binding name as an argument.

@geoand WDYT?

PS: this does not include the work that needs to be done in reactive messaging to consume these bindings.

geoand commented 3 years ago

@cescoffier sounds reasonable, yes

geoand commented 3 years ago

@cescoffier at this point, do we only expect kafka.* properties to be set via the binding, or do we also expect the mp.messaging.* properties to be set as well? I assume only former, right?

geoand commented 3 years ago

@wtrocki what is the final set of files that will be injected into the container?

cescoffier commented 3 years ago

@geoand only kafka.*, you can't know the mp.messaging ones as they are application-specific (the channel name).

geoand commented 3 years ago

@geoand only kafka.*, you can't know the mp.messaging ones as they are application-specific (the channel name).

Right, that's what I though. Thanks

wtrocki commented 3 years ago

what is the final set of files that will be injected into the container?

Currently, I have added type and provider using mapping capability on SBO object which is not ideal. Working on adding those fields into the CR's


mappings:
   - name: provider
     value: upstream
   - name: type
     value: kafka    

This is how is would look like no matter what approach we going to use

> pwd 
/bindings

> tree 
.
└── kafka
    ├── bootstrapServers
    ├── password
    ├── provider
    ├── saslmechanism
    ├── securityprotocol
    ├── type
    └── user

1 directory, 7 files

Note that since type and provider are not supported by SBO operator the they need to be added manually to every CR. When adding them we sadly cannot hardcode them in the CRD because of this issue: https://github.com/redhat-developer/service-binding-operator/issues/843

For kafka that is not a big problem since we can add those fields in all clients that create CR's but it brings some limitations for additions of these patterns in the future.

Possible solutions:

1) We can use mapping on the SBO side to not force source service providers - for example if we have database object that was created by crossplane we cannot really change that. This was suggested as workaround for the issue:

https://github.com/redhat-developer/service-binding-operator/issues/843

2) Propose path for SBO team to add new CRDBinding objects that will add binding spec to the CRD's that we do not control/own.

This way we could enable quarkus to automatically work with resources created in IBM, Google cloud like db's etc. - typically ones we do not control or can add extra requirements for and also bind kafka with them.

geoand commented 3 years ago

@wtrocki thanks for the complete rundown.

If you would like to do the Quarkus part of the binding based on those files, then cool :). If you are too swamped to get it done by tomorrow, let me know and I'll code it up based on what you have described above.

geoand commented 3 years ago

I spoke with @wtrocki and due to the time constraints of this, I'll provide the initial PR myself.

geoand commented 3 years ago

https://github.com/quarkusio/quarkus/pull/15733 should (partly) take care of it

geoand commented 3 years ago

From @cescoffier, OAUTH support will need something like:

mp.messaging.connector.smallrye-kafka.security.protocol=SASL_PLAINTEXT mp.messaging.connector.smallrye-kafka.sasl.mechanism=OAUTHBEARER mp.messaging.connector.smallrye-kafka.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.client.id="team-a-client" \ oauth.client.secret="team-a-client-secret" \ oauth.token.endpoint.uri="http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/token" ; mp.messaging.connector.smallrye-kafka.sas

kafka.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandle