knative / eventing

Event-driven application platform for Kubernetes
https://knative.dev/docs/eventing
Apache License 2.0
1.41k stars 588 forks source link

Knative broker Not able to stream to Event Hubs in Azure. #5858

Closed soaand01 closed 2 years ago

soaand01 commented 2 years ago

Describe the bug The issue: I installed Knative-eventing with Knative kafka-broker to streaming and consume messages from Azure Event Hubs. However I'm getting Failed to create topic:

$kubectl get broker
NAME         URL   AGE     READY   REASON
lasbroker1         4m14s   False   Failed to create topic: knative-broker-default-lasbroker1

Expected behavior Produce/stream and consume messages/events on Azure Event Hub via the Knative Kafka Broker.

To Reproduce I'm following this guide https://knative.dev/docs/eventing/broker/kafka-broker/.

Install Knative-eventing:

kubectl apply -f https://github.com/knative/eventing/releases/download/v0.26.0/eventing-crds.yaml
kubectl apply -f https://github.com/knative/eventing/releases/download/v0.26.0/eventing-core.yaml

No Strimzi need here because I have my Event Hub working on "my-cluster-fqdn.servicebus.windows.net:9093" Yes, port 9093

Install a default Channel (messaging) layer Here https://knative.dev/docs/admin/install/eventing/install-eventing-with-yaml/ says to use this https://github.com/knative-sandbox/eventing-kafka for this step.

The page says: Note: Replace REPLACE_WITH_CLUSTER_URL in the yaml with the URI to the Kafka Cluster (eg. my-cluster-kafka-bootstrap.kafka:9092/) So I downloaded each yaml file and changed inside to my-cluster-fqdn.servicebus.windows.net:9093

# Install the Kafka Source
kubectl apply -f https://storage.googleapis.com/knative-nightly/eventing-kafka/latest/source.yaml

# Install the Kafka "Consolidated" Channel
kubectl apply -f https://storage.googleapis.com/knative-nightly/eventing-kafka/latest/channel-consolidated.yaml

# Install the Kafka "Distributed" Channel
kubectl apply -f https://storage.googleapis.com/knative-nightly/eventing-kafka/latest/channel-distributed.yaml

Then the next step is create kafka broker:

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    # case-sensitive
    eventing.knative.dev/broker.class: Kafka
  name: lasbroker1
  namespace: default
spec:
  # Configuration specific to this broker.
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config
    namespace: knative-eventing

  # Optional dead letter sink, you can specify either:
  #  - deadLetterSink.ref, which is a reference to a Callable
  #  - deadLetterSink.uri, which is an absolute URI to a Callable (It can potentially be out of the Kubernetes cluster)
  delivery:
    deadLetterSink:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: dlq-service
        data: 
          default.topic.partitions: "10"
          default.topic.replication.factor: "1"
          bootstrap.servers: "my-cluster-fqdn.servicebus.windows.net:9093"

kubectl -f apply my-broker.yaml

After all this, I get the following error and cannot move forward because of that:

soareslo:~$kubectl get broker
NAME         URL   AGE   READY   REASON
lasbroker1         31m   False   Failed to create topic: knative-broker-default-lasbroker1

Knative release version latest

Additional context I really appreciate any support.

pierDipi commented 2 years ago

What's the output of this command?

kubectl describe broker -n default lasbroker1
pierDipi commented 2 years ago

Also, If you're using only the Broker object with broker class Kafka you don't need a channel installation, so you can skip the Install a default Channel (messaging) layer.

pierDipi commented 2 years ago

Can you also post logs of the kafka-controller pods in the knative-eventing namespace?

pierDipi commented 2 years ago

@soaand01 can you also show the output of this command?

kubectl get cm -n knative-eventing kafka-broker-config

I'm not familiar with Event Hub but you probably need to configure auth stuff as described here https://knative.dev/docs/eventing/broker/kafka-broker/#security

soaand01 commented 2 years ago

Hi @pierDipi

The is the output:

kubectl describe broker -n default lasbroker1
Name:         lasbroker1
Namespace:    default
Labels:       <none>
Annotations:  eventing.knative.dev/broker.class: Kafka
              eventing.knative.dev/creator: masterclient
              eventing.knative.dev/lastModifier: masterclient
API Version:  eventing.knative.dev/v1
Kind:         Broker
Metadata:
  Creation Timestamp:  2021-10-29T10:48:51Z
  Finalizers:
    brokers.eventing.knative.dev
  Generation:  1
  Managed Fields:
    API Version:  eventing.knative.dev/v1
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:annotations:
          .:
          f:eventing.knative.dev/broker.class:
          f:kubectl.kubernetes.io/last-applied-configuration:
      f:spec:
        .:
        f:config:
          .:
          f:apiVersion:
          f:kind:
          f:name:
          f:namespace:
        f:delivery:
          .:
          f:deadLetterSink:
            .:
            f:ref:
              .:
              f:apiVersion:
              f:kind:
              f:name:
    Manager:      kubectl-client-side-apply
    Operation:    Update
    Time:         2021-10-29T10:48:51Z
    API Version:  eventing.knative.dev/v1
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:finalizers:
          .:
          v:"brokers.eventing.knative.dev":
      f:status:
        .:
        f:address:
        f:conditions:
        f:observedGeneration:
    Manager:         kafka-controller
    Operation:       Update
    Time:            2021-10-29T10:48:53Z
  Resource Version:  77767
  UID:               c309f832-b142-4324-bf85-d8feb8ae4b69
Spec:
  Config:
    API Version:  v1
    Kind:         ConfigMap
    Name:         kafka-broker-config
    Namespace:    knative-eventing
  Delivery:
    Dead Letter Sink:
      Ref:
        API Version:  serving.knative.dev/v1
        Kind:         Service
        Name:         dlq-service
Status:
  Address:
  Conditions:
    Last Transition Time:  2021-10-29T10:48:53Z
    Status:                Unknown
    Type:                  Addressable
    Last Transition Time:  2021-10-29T10:48:53Z
    Status:                Unknown
    Type:                  ConfigMapUpdated
    Last Transition Time:  2021-10-29T10:48:53Z
    Status:                True
    Type:                  ConfigParsed
    Last Transition Time:  2021-10-29T10:48:53Z
    Status:                True
    Type:                  DataPlaneAvailable
    Last Transition Time:  2021-10-29T10:48:53Z
    Message:               failed to create cluster admin: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
    Reason:                Failed to create topic: knative-broker-default-lasbroker1
    Status:                False
    Type:                  Ready
    Last Transition Time:  2021-10-29T10:48:53Z
    Message:               failed to create cluster admin: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
    Reason:                Failed to create topic: knative-broker-default-lasbroker1
    Status:                False
    Type:                  TopicReady
  Observed Generation:     1
Events:
  Type     Reason           Age                  From               Message
  ----     ------           ----                 ----               -------
  Normal   FinalizerUpdate  61m                  broker-controller  Updated "lasbroker1" finalizers
  Warning  InternalError    6m6s (x21 over 61m)  broker-controller  failed to create topic: knative-broker-default-lasbroker1: failed to create cluster admin: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)

Another output:

oaresoareslo:~$kubectl get cm -n knative-eventing kafka-broker-config
NAME                  DATA   AGE
kafka-broker-config   3      83mlo:~$kubectl get cm -n knative-eventing kafka-broker-config
NAME                  DATA   AGE
kafka-broker-config   3      83m

Regarding the kafka-controller and kafka-broker-config logs, how do you get it?

soaand01 commented 2 years ago

Adding @csantanapr , maybe you know something as well.

pierDipi commented 2 years ago

client has run out of available brokers to talk to (Is your cluster reachable?)

This is a signal that your Kafka cluster is expecting auth credentials or the bootstrap.servers config is not correct, please see documentation at https://knative.dev/docs/eventing/broker/kafka-broker/#security

soaand01 commented 2 years ago

@pierDipi Do you think this can also be dns ? how can I test if my Kubernetes can talk to the bootstrap.servers? Both are in the same network and the bootstrap.servers is reachable to the world.

pierDipi commented 2 years ago

I don't know, you can try to troubleshoot DNS issues with https://kubernetes.io/docs/tasks/administer-cluster/dns-debugging-resolution/

I believe you need to configure SASL_SSL as described here https://knative.dev/docs/eventing/broker/kafka-broker/#authentication-using-sasl-and-encryption-using-ssl which is supported by Azure https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-for-kafka-ecosystem-overview#shared-access-signature-sas

soaand01 commented 2 years ago

Hi @pierDipi , I added the SASL_SSL and now the things has changed.

kubectl get broker 
NAME         URL   AGE    READY   REASON
lasbroker1         153m   False   invalid configuration - numPartitions: 0 - replicationFactor: 0 - bootstrapServers:  - ConfigMap data: map[auth.secret.ref.name:mysecret]
soareslo:~$

Got some updates here as well.

kubectl describe broker -n default lasbroker1                                                                     
Name:         lasbroker1
Namespace:    default
Labels:       <none>
Annotations:  eventing.knative.dev/broker.class: Kafka
              eventing.knative.dev/creator: masterclient
              eventing.knative.dev/lastModifier: masterclient
API Version:  eventing.knative.dev/v1
Kind:         Broker
Metadata:
  Creation Timestamp:  2021-10-29T10:48:51Z
  Finalizers:
    brokers.eventing.knative.dev
  Generation:  1
  Managed Fields:
    API Version:  eventing.knative.dev/v1
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:annotations:
          .:
          f:eventing.knative.dev/broker.class:
          f:kubectl.kubernetes.io/last-applied-configuration:
      f:spec:
        .:
        f:config:
          .:
          f:apiVersion:
          f:kind:
          f:name:
          f:namespace:
        f:delivery:
          .:
          f:deadLetterSink:
            .:
            f:ref:
              .:
              f:apiVersion:
              f:kind:
              f:name:
    Manager:      kubectl-client-side-apply
    Operation:    Update
    Time:         2021-10-29T10:48:51Z
    API Version:  eventing.knative.dev/v1
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:finalizers:
          .:
          v:"brokers.eventing.knative.dev":
      f:status:
        .:
        f:address:
        f:conditions:
        f:observedGeneration:
    Manager:         kafka-controller
    Operation:       Update
    Time:            2021-10-29T10:48:53Z
  Resource Version:  270053
  UID:               c309f832-b142-4324-bf85-d8feb8ae4b69
Spec:
  Config:
    API Version:  v1
    Kind:         ConfigMap
    Name:         kafka-broker-config
    Namespace:    knative-eventing
  Delivery:
    Dead Letter Sink:
      Ref:
        API Version:  serving.knative.dev/v1
        Kind:         Service
        Name:         dlq-service
Status:
  Address:
  Conditions:
    Last Transition Time:  2021-10-29T10:48:53Z
    Status:                Unknown
    Type:                  Addressable
    Last Transition Time:  2021-10-29T10:48:53Z
    Status:                Unknown
    Type:                  ConfigMapUpdated
    Last Transition Time:  2021-10-29T13:18:00Z
    Reason:                invalid configuration - numPartitions: 0 - replicationFactor: 0 - bootstrapServers:  - ConfigMap data: map[auth.secret.ref.name:mysecret]
    Status:                False
    Type:                  ConfigParsed
    Last Transition Time:  2021-10-29T12:49:31Z
    Status:                True
    Type:                  DataPlaneAvailable
    Last Transition Time:  2021-10-29T13:18:00Z
    Reason:                invalid configuration - numPartitions: 0 - replicationFactor: 0 - bootstrapServers:  - ConfigMap data: map[auth.secret.ref.name:mysecret]
    Status:                False
    Type:                  Ready
    Last Transition Time:  2021-10-29T10:48:53Z
    Message:               failed to create cluster admin: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
    Reason:                Failed to create topic: knative-broker-default-lasbroker1
    Status:                False
    Type:                  TopicReady
  Observed Generation:     1
Events:
  Type     Reason         Age                    From               Message
  ----     ------         ----                   ----               -------
  Warning  InternalError  32m (x13 over 32m)     broker-controller  Data plane not available: Did you install the data plane for this component?
  Warning  InternalError  8m51s (x19 over 30m)   broker-controller  failed to create topic: knative-broker-default-lasbroker1: failed to create cluster admin: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
  Warning  InternalError  2m22s (x2 over 2m22s)  broker-controller  failed to get contract configuration: invalid configuration - numPartitions: 0 - replicationFactor: 0 - bootstrapServers:  - ConfigMap data: map[auth.secret.ref.name:mysecret]
pierDipi commented 2 years ago

@soaand01 make sure to have also these configurations (default.topic.partitions, bootstrap.servers, default.topic.replication.factor) set in the referenced ConfigMap (in this case knative-eventing/kafka-broker-config) in addition to the auth secret (auth.secret.ref.name):

  # Number of topic partitions
  default.topic.partitions: "10"
  # Replication factor of topic messages.
  default.topic.replication.factor: "1"
  # A comma separated list of bootstrap servers. (It can be in or out the k8s cluster)
  bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"

  # Reference a Secret called mysecret
  auth.secret.ref.name: mysecret
soaand01 commented 2 years ago

Hi @pierDipi ,

Yeah, this is very weird, because I have.

cat kafka-broker-object.yaml 
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    # case-sensitive
    eventing.knative.dev/broker.class: Kafka
  name: lasbroker1
  namespace: default
spec:
  # Configuration specific to this broker.
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config
    namespace: knative-eventing

  # Optional dead letter sink, you can specify either:
  #  - deadLetterSink.ref, which is a reference to a Callable
  #  - deadLetterSink.uri, which is an absolute URI to a Callable (It can potentially be out of the Kubernetes cluster)
  delivery:
    deadLetterSink:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: dlq-service
        data: 
          default.topic.partitions: "10"
          default.topic.replication.factor: "1"
          bootstrap.servers: "my-cluster-here.servicebus.windows.net:9093"
pierDipi commented 2 years ago
# ...
  delivery:
    deadLetterSink:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: dlq-service
        data: 
          default.topic.partitions: "10"
          default.topic.replication.factor: "1"
          bootstrap.servers: "my-cluster-here.servicebus.windows.net:9093"

This doesn't look valid

pierDipi commented 2 years ago

https://knative.dev/docs/eventing/broker/kafka-broker/#create-a-kafka-broker

soaand01 commented 2 years ago

In the session below this one you sent says that I have to add this block.

spec.config should reference any ConfigMap that looks like the following:

apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-broker-config
  namespace: knative-eventing
data:
  # Number of topic partitions
  default.topic.partitions: "10"
  # Replication factor of topic messages.
  default.topic.replication.factor: "1"
  # A comma separated list of bootstrap servers. (It can be in or out the k8s cluster)
  bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"
pierDipi commented 2 years ago

broker.yaml

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    # case-sensitive
    eventing.knative.dev/broker.class: Kafka
  name: lasbroker1
  namespace: default
spec:
  # Configuration specific to this broker.
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config
    namespace: knative-eventing

  # Optional dead letter sink, you can specify either:
  #  - deadLetterSink.ref, which is a reference to a Callable
  #  - deadLetterSink.uri, which is an absolute URI to a Callable (It can potentially be out of the Kubernetes cluster)
  delivery:
    deadLetterSink:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: dlq-service

kafka-broker-config.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-broker-config
  namespace: knative-eventing
data:
  # Number of topic partitions
  default.topic.partitions: "10"
  # Replication factor of topic messages.
  default.topic.replication.factor: "1"
  # A comma separated list of bootstrap servers. (It can be in or out the k8s cluster)
  bootstrap.servers: "my-cluster-here.servicebus.windows.net:9093"
  # Reference a Secret called mysecret
  auth.secret.ref.name: "mysecret"

This is what you want

soaand01 commented 2 years ago

Let me try.

soaand01 commented 2 years ago

@pierDipi and @csantanapr , Just sharing the result.

pretty much the same.

cat kafka-broker-object.yaml 
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    # case-sensitive
    eventing.knative.dev/broker.class: Kafka
  name: lasbroker1
  namespace: default
spec:
  # Configuration specific to this broker.
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config
    namespace: knative-eventing
  # Optional dead letter sink, you can specify either:
  #  - deadLetterSink.ref, which is a reference to a Callable
  #  - deadLetterSink.uri, which is an absolute URI to a Callable (It can potentially be out of the Kubernetes cluster)
  delivery:
    deadLetterSink:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: dlq-service

Here fine now:

cat broker-configmap-update.yaml 
apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-broker-config
  namespace: knative-eventing
data:
  # Number of topic partitions
  default.topic.partitions: "10"
  # Replication factor of topic messages.
  default.topic.replication.factor: "1"
  # A comma separated list of bootstrap servers. (It can be in or out the k8s cluster)
  bootstrap.servers: "my-cluster-here.servicebus.windows.net:9093"
  auth.secret.ref.name: "mysecret"

Now I get something different but still weird behavior.

kubectl create secret --namespace knative-eventing generic mysecret \
  --from-literal=protocol=SASL_PLAINTEXT \
  --from-literal=sasl.mechanism=PLAIN \
  --from-literal=user="$ConnectionString" \
  --from-literal=password="Endpoint=sb:/MY-END-POINT-HERE"
kubectl describe broker -n default lasbroker1 
Name:         lasbroker1
Namespace:    default
Labels:       <none>
Annotations:  eventing.knative.dev/broker.class: Kafka
              eventing.knative.dev/creator: masterclient
              eventing.knative.dev/lastModifier: masterclient
API Version:  eventing.knative.dev/v1
Kind:         Broker
Metadata:
  Creation Timestamp:  2021-10-29T14:03:33Z
  Finalizers:
    brokers.eventing.knative.dev
  Generation:  1
  Managed Fields:
    API Version:  eventing.knative.dev/v1
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:annotations:
          .:
          f:eventing.knative.dev/broker.class:
          f:kubectl.kubernetes.io/last-applied-configuration:
      f:spec:
        .:
        f:config:
          .:
          f:apiVersion:
          f:kind:
          f:name:
          f:namespace:
        f:delivery:
          .:
          f:deadLetterSink:
            .:
            f:ref:
              .:
              f:apiVersion:
              f:kind:
              f:name:
    Manager:      kubectl-client-side-apply
    Operation:    Update
    Time:         2021-10-29T14:03:33Z
    API Version:  eventing.knative.dev/v1
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:finalizers:
          .:
          v:"brokers.eventing.knative.dev":
      f:status:
        .:
        f:address:
        f:conditions:
        f:observedGeneration:
    Manager:         kafka-controller
    Operation:       Update
    Time:            2021-10-29T14:03:34Z
  Resource Version:  10128
  UID:               32f681df-e5f2-4bcf-b306-e7e636dfeab7
Spec:
  Config:
    API Version:  v1
    Kind:         ConfigMap
    Name:         kafka-broker-config
    Namespace:    knative-eventing
  Delivery:
    Dead Letter Sink:
      Ref:
        API Version:  serving.knative.dev/v1
        Kind:         Service
        Name:         dlq-service
Status:
  Address:
  Conditions:
    Last Transition Time:  2021-10-29T14:03:34Z
    Status:                Unknown
    Type:                  Addressable
    Last Transition Time:  2021-10-29T14:03:34Z
    Status:                Unknown
    Type:                  ConfigMapUpdated
    Last Transition Time:  2021-10-29T14:14:02Z
    Status:                True
    Type:                  ConfigParsed
    Last Transition Time:  2021-10-29T14:03:34Z
    Status:                True
    Type:                  DataPlaneAvailable
    Last Transition Time:  2021-10-29T14:14:02Z
    Message:               [protocol SASL_PLAINTEXT] SASL user required (key: user)
    Reason:                Failed to create topic: knative-broker-default-lasbroker1
    Status:                False
    Type:                  Ready
    Last Transition Time:  2021-10-29T14:14:02Z
    Message:               [protocol SASL_PLAINTEXT] SASL user required (key: user)
    Reason:                Failed to create topic: knative-broker-default-lasbroker1
    Status:                False
    Type:                  TopicReady
  Observed Generation:     1
Events:
  Type     Reason           Age                    From               Message
  ----     ------           ----                   ----               -------
  Normal   FinalizerUpdate  19m                    broker-controller  Updated "lasbroker1" finalizers
  Warning  InternalError    19m (x12 over 19m)     broker-controller  failed to create topic: knative-broker-default-lasbroker1: failed to create cluster admin: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
  Warning  InternalError    16m (x4 over 19m)      broker-controller  failed to get secret: failed to get secret knative-eventing/mysecret: secrets "mysecret" not found
  Warning  InternalError    14m (x3 over 16m)      broker-controller  failed to get contract configuration: invalid configuration - numPartitions: 0 - replicationFactor: 0 - bootstrapServers:  - ConfigMap data: map[auth.secret.ref.name:mysecret]
  Warning  InternalError    9m40s (x2 over 9m40s)  broker-controller  failed to get contract configuration: failed to get configmap knative-eventing/kafka-broker-config: configmap "kafka-broker-config" not found
  Warning  InternalError    8m53s (x3 over 8m53s)  broker-controller  failed to create topic: knative-broker-default-lasbroker1: [protocol SASL_PLAINTEXT] SASL user required (key: user)

Which is weird because here says secrets "mysecret" not found

Actually is there: kubectl get secrets --namespace knative-eventing

NAME                                   TYPE                                  DATA   AGE
default-token-k2l6w                    kubernetes.io/service-account-token   3      6m16s
eventing-controller-token-dqlpt        kubernetes.io/service-account-token   3      6m16s
eventing-webhook-certs                 Opaque                                3      6m14s
eventing-webhook-token-nvcvx           kubernetes.io/service-account-token   3      6m16s
kafka-controller-token-lw8dv           kubernetes.io/service-account-token   3      6m
kafka-webhook-eventing-certs           Opaque                                3      6m
kafka-webhook-eventing-token-tvvh8     kubernetes.io/service-account-token   3      6m
knative-kafka-data-plane-token-25bqt   kubernetes.io/service-account-token   3      5m56s
mysecret                               Opaque                                4      2m33s
pingsource-mt-adapter-token-pz9sj      kubernetes.io/service-account-token   3      6m16s

Even with the SASL I get the error.

1: [protocol SASL_PLAINTEXT] SASL user required (key: user)

pierDipi commented 2 years ago

kubectl create secret --namespace knative-eventing generic mysecret \ --from-literal=protocol=SASL_PLAINTEXT \ --from-literal=sasl.mechanism=PLAIN \ --from-literal=user="$ConnectionString" \ --from-literal=password="Endpoint=sb:/MY-END-POINT-HERE"

Event Hub supports only

SASL_SSL for the protocol and PLAIN for the mechanism.

ref: https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-for-kafka-ecosystem-overview#shared-access-signature-sas

protocol=SASL_PLAINTEXT != protocol=SASL_SSL + sasl.mechanism=PLAIN

You need to follow this: https://knative.dev/docs/eventing/broker/kafka-broker/#authentication-using-sasl-and-encryption-using-ssl "Authentication using SASL and encryption using SSL"

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SASL_SSL \
  --from-literal=sasl.mechanism=PLAIN \
  --from-literal=user=<my_user> \
  --from-literal=password=<my_password>
soaand01 commented 2 years ago

Updating the thread. Slowly I'm getting there, for some reason do not recognize the mysecret

kubectl describe broker -n default lasbroker1 
Name:         lasbroker1
Namespace:    default
Labels:       <none>
Annotations:  eventing.knative.dev/broker.class: Kafka
              eventing.knative.dev/creator: masterclient
              eventing.knative.dev/lastModifier: masterclient
API Version:  eventing.knative.dev/v1
Kind:         Broker
Metadata:
  Creation Timestamp:  2021-10-29T15:52:17Z
  Finalizers:
    brokers.eventing.knative.dev
  Generation:  1
  Managed Fields:
    API Version:  eventing.knative.dev/v1
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:annotations:
          .:
          f:eventing.knative.dev/broker.class:
          f:kubectl.kubernetes.io/last-applied-configuration:
      f:spec:
        .:
        f:config:
          .:
          f:apiVersion:
          f:kind:
          f:name:
          f:namespace:
        f:delivery:
          .:
          f:deadLetterSink:
            .:
            f:ref:
              .:
              f:apiVersion:
              f:kind:
              f:name:
    Manager:      kubectl-client-side-apply
    Operation:    Update
    Time:         2021-10-29T15:52:17Z
    API Version:  eventing.knative.dev/v1
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:finalizers:
          .:
          v:"brokers.eventing.knative.dev":
      f:status:
        .:
        f:address:
        f:conditions:
        f:observedGeneration:
    Manager:         kafka-controller
    Operation:       Update
    Time:            2021-10-29T15:52:18Z
  Resource Version:  2660
  UID:               55b133e5-92dd-4975-b3f9-f6459e508630
Spec:
  Config:
    API Version:  v1
    Kind:         ConfigMap
    Name:         kafka-broker-config
    Namespace:    knative-eventing
  Delivery:
    Dead Letter Sink:
      Ref:
        API Version:  serving.knative.dev/v1
        Kind:         Service
        Name:         dlq-service
Status:
  Address:
  Conditions:
    Last Transition Time:  2021-10-29T15:52:18Z
    Status:                Unknown
    Type:                  Addressable
    Last Transition Time:  2021-10-29T15:52:18Z
    Status:                Unknown
    Type:                  ConfigMapUpdated
    Last Transition Time:  2021-10-29T15:52:18Z
    Status:                True
    Type:                  ConfigParsed
    Last Transition Time:  2021-10-29T15:52:18Z
    Status:                True
    Type:                  DataPlaneAvailable
    Last Transition Time:  2021-10-29T15:52:18Z
    Message:               failed to create cluster admin: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
    Reason:                Failed to create topic: knative-broker-default-lasbroker1
    Status:                False
    Type:                  Ready
    Last Transition Time:  2021-10-29T15:52:18Z
    Message:               failed to create cluster admin: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
    Reason:                Failed to create topic: knative-broker-default-lasbroker1
    Status:                False
    Type:                  TopicReady
  Observed Generation:     1
Events:
  Type     Reason           Age                     From               Message
  ----     ------           ----                    ----               -------
  Normal   FinalizerUpdate  3m22s                   broker-controller  Updated "lasbroker1" finalizers
  Warning  InternalError    2m34s (x14 over 3m21s)  broker-controller  failed to create topic: knative-broker-default-lasbroker1: failed to create cluster admin: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
  Warning  InternalError    113s (x2 over 2m25s)    broker-controller  failed to get secret: failed to get secret knative-eventing/mysecret: secrets "mysecret" not found

The secret is there, it states as Opaque, but it was applied. No SASL_SSL issue, just complain now about the secret file.

soareslo:~$kubectl get secrets -n knative-eventing
NAME                                   TYPE                                  DATA   AGE
default-token-btphv                    kubernetes.io/service-account-token   3      4m58s
eventing-controller-token-vhv54        kubernetes.io/service-account-token   3      4m58s
eventing-webhook-certs                 Opaque                                3      4m54s
eventing-webhook-token-shcrx           kubernetes.io/service-account-token   3      4m58s
kafka-controller-token-jmmbg           kubernetes.io/service-account-token   3      4m34s
kafka-webhook-eventing-certs           Opaque                                3      4m34s
kafka-webhook-eventing-token-ffmfw     kubernetes.io/service-account-token   3      4m34s
knative-kafka-data-plane-token-q8tjz   kubernetes.io/service-account-token   3      4m30s
mysecret                               Opaque                                4      103s
pingsource-mt-adapter-token-868ns      kubernetes.io/service-account-token   3      4m58s
pierDipi commented 2 years ago

The secret was created after.

103s ago:

NAME                                   TYPE                                  DATA   AGE
mysecret                               Opaque                                4      103s

But the event was generated 113s ago.

 Warning  InternalError    113s (x2 over 2m25s)    broker-controller  failed to get secret: failed to get secret knative-eventing/mysecret: secrets "mysecret" not found

We will eventually retry but since it failed many times it might take some time to do it again (exponential backoff).

To speed up the process you can add a placeholder annotation to your broker:

kubectl patch broker -n default lasbroker1 -p '{"metadata": { "annotations": { "app": "1" } }}'
soaand01 commented 2 years ago

@pierDipi I destroyed everything and redeployed again. The password issue seems to be fine now, for some reason I'm still getting this. I have no idea why states this Is your cluster reachable? I know it's working because I created a python script to produce and consume messages, I run it from my laptop and it works, so the service is reachable, but for some reason I get this from the knative kafka broker.

Normal   FinalizerUpdate  6m12s                 broker-controller  Updated "newbroker" finalizers
  Warning  InternalError    35s (x17 over 6m11s)  broker-controller  failed to create topic: knative-broker-default-newbroker: failed to create cluster admin: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
kubectl describe broker -n default newbroker
Name:         newbroker
Namespace:    default
Labels:       <none>
Annotations:  eventing.knative.dev/broker.class: Kafka
              eventing.knative.dev/creator: soaresanderson221@gmail.com
              eventing.knative.dev/lastModifier: soaresanderson221@gmail.com
API Version:  eventing.knative.dev/v1
Kind:         Broker
Metadata:
  Creation Timestamp:  2021-10-31T17:45:28Z
  Finalizers:
    brokers.eventing.knative.dev
  Generation:  1
  Managed Fields:
    API Version:  eventing.knative.dev/v1
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:annotations:
          .:
          f:eventing.knative.dev/broker.class:
          f:kubectl.kubernetes.io/last-applied-configuration:
      f:spec:
        .:
        f:config:
          .:
          f:apiVersion:
          f:kind:
          f:name:
          f:namespace:
        f:delivery:
          .:
          f:deadLetterSink:
            .:
            f:ref:
              .:
              f:apiVersion:
              f:kind:
              f:name:
    Manager:      kubectl-client-side-apply
    Operation:    Update
    Time:         2021-10-31T17:45:28Z
    API Version:  eventing.knative.dev/v1
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:finalizers:
          .:
          v:"brokers.eventing.knative.dev":
      f:status:
        .:
        f:address:
        f:conditions:
        f:observedGeneration:
    Manager:         kafka-controller
    Operation:       Update
    Time:            2021-10-31T17:45:29Z
  Resource Version:  11909938
  UID:               21422643-6f90-426e-807e-fe2b9b9e565a
Spec:
  Config:
    API Version:  v1
    Kind:         ConfigMap
    Name:         kafka-broker-config
    Namespace:    knative-eventing
  Delivery:
    Dead Letter Sink:
      Ref:
        API Version:  serving.knative.dev/v1
        Kind:         Service
        Name:         dlq-service
Status:
  Address:
  Conditions:
    Last Transition Time:  2021-10-31T17:45:29Z
    Status:                Unknown
    Type:                  Addressable
    Last Transition Time:  2021-10-31T17:45:29Z
    Status:                Unknown
    Type:                  ConfigMapUpdated
    Last Transition Time:  2021-10-31T17:45:29Z
    Status:                True
    Type:                  ConfigParsed
    Last Transition Time:  2021-10-31T17:45:29Z
    Status:                True
    Type:                  DataPlaneAvailable
    Last Transition Time:  2021-10-31T17:45:29Z
    Message:               failed to create cluster admin: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
    Reason:                Failed to create topic: knative-broker-default-newbroker
    Status:                False
    Type:                  Ready
    Last Transition Time:  2021-10-31T17:45:29Z
    Message:               failed to create cluster admin: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
    Reason:                Failed to create topic: knative-broker-default-newbroker
    Status:                False
    Type:                  TopicReady
  Observed Generation:     1
Events:
  Type     Reason           Age                   From               Message
  ----     ------           ----                  ----               -------
  Normal   FinalizerUpdate  6m12s                 broker-controller  Updated "newbroker" finalizers
  Warning  InternalError    35s (x17 over 6m11s)  broker-controller  failed to create topic: knative-broker-default-newbroker: failed to create cluster admin: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
pierDipi commented 2 years ago

Hi @soaand01, that error means that we can't connect to your Kafka cluster correctly with SASL_SSL.

Can you post here your python program?

soaand01 commented 2 years ago

Hi @pierDipi , Hmmm, weird because I get no authentication issue messages. Sure, below you have both, the one which receive and the one which send the messages.

You can even try telnet and you will see it's finetelnet streaming-demo.servicebus.windows.net 9093 you can netcat as well. nc -v streaming-demo.servicebus.windows.net 9093

sender.py

import asyncio
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData

evh_namespace_string = 'Endpoint=sb://streaming-demo.servicebus.windows.net/;SharedAccessKeyName=streaming-authorization-demo;SharedAccessKey=<Just-removed-my-key-from-here'
evh_name = 'straming-hub-demo'
async def run():
    # Create a producer client to send messages to the event hub.
    # Specify a connection string to your event hubs namespace and
    # the event hub name.
    producer = EventHubProducerClient.from_connection_string(conn_str=evh_namespace_string, eventhub_name=evh_name)
    async with producer:
        # Create a batch.
        event_data_batch = await producer.create_batch()

        # Add events to the batch.
        event_data_batch.add(EventData('First event '))
        event_data_batch.add(EventData('Second event'))
        event_data_batch.add(EventData('Third event'))

        # Send the batch of events to the event hub.
        await producer.send_batch(event_data_batch)

loop = asyncio.get_event_loop()
loop.run_until_complete(run())

_In the script below you will the saaccount_name this key is used by Event hub to store the messages in a folder, nothing related with the kafka connection itself. (just FYI)

receiver.py

import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

evh_namespace_string = 'Endpoint=sb://streaming-demo.servicebus.windows.net/;SharedAccessKeyName=streaming-authorization-demo;SharedAccessKey=<Just-removed-my-key-from-here'
evh_name = 'straming-hub-demo'
saaccount_name = 'DefaultEndpointsProtocol=https;AccountName=sastreaming;AccountKey=<Just-removed-my-key-from-here==;EndpointSuffix=core.windows.net'
sablobcontainer_name = 'streamingcontainer'

async def on_event(partition_context, event):
    # Print the event data.
    print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))

    # Update the checkpoint so that the program doesn't read the events
    # that it has already read when you run it next time.
    await partition_context.update_checkpoint(event)

async def main():
    # Create an Azure blob checkpoint store to store the checkpoints.
    checkpoint_store = BlobCheckpointStore.from_connection_string(saaccount_name, sablobcontainer_name)

    # Create a consumer client for the event hub.
    client = EventHubConsumerClient.from_connection_string(evh_namespace_string, consumer_group="$Default", eventhub_name=evh_name, checkpoint_store=checkpoint_store)
    async with client:
        # Call the receive method. Read from the beginning of the partition (starting_position: "-1")
        await client.receive(on_event=on_event,  starting_position="-1")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # Run the main method.
    loop.run_until_complete(main())
soaand01 commented 2 years ago

Hi @pierDipi ,

I refreshed and I'm getting this.

Warning InternalError 50s (x2 over 50s) broker-controller failed to create topic: knative-broker-default-newbroker: [protocol SASL_SSL] SASL user required (key: user)

pierDipi commented 2 years ago

Ok, I see you're using their library which hides the weird stuff they're doing.

I'm reading this article https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quickstart-kafka-enabled-event-hubs and I think by creating a secret like the following we should be able to have a proper secret to connect to their service.

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SASL_SSL \
  --from-literal=sasl.mechanism=PLAIN \
  --from-literal=user='$ConnectionString' \
  --from-literal=password=<put evh_namespace_string of your python script>

Note: $ConnectionString is a constant.

pierDipi commented 2 years ago

I was also reading this example code, and what I'm worried about next is that there is no way to change Version and InsecureSkipVerify via a configuration.

Version:

config.Version = sarama.V1_0_0_0

and InsecureSkipVerify set to true:

InsecureSkipVerify: true
soaand01 commented 2 years ago

Hi @pierDipi ,

Wow, the topics have been created, that's amazing. Thank you very much. I used your example:

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SASL_SSL \
  --from-literal=sasl.mechanism=PLAIN \
  --from-literal=user='$ConnectionString' \
  --from-literal=password=<put evh_namespace_string of your python script>

The only one difference between yours and mine is that I have used --from-literal=user="$ConnectionString" ("") and you have used --from-literal=user='$ConnectionString' ('').

I didn't know that '' and "" could impact.

Below is the result.

kubectl describe broker -n default newbroker 
Name:         newbroker
Namespace:    default
Labels:       <none>
Annotations:  eventing.knative.dev/broker.class: Kafka
              eventing.knative.dev/creator: soaresanderson221@gmail.com
              eventing.knative.dev/lastModifier: soaresanderson221@gmail.com
API Version:  eventing.knative.dev/v1
Kind:         Broker
Metadata:
  Creation Timestamp:  2021-10-31T17:45:28Z
  Finalizers:
    brokers.eventing.knative.dev
  Generation:  1
  Managed Fields:
    API Version:  eventing.knative.dev/v1
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:annotations:
          .:
          f:eventing.knative.dev/broker.class:
          f:kubectl.kubernetes.io/last-applied-configuration:
      f:spec:
        .:
        f:config:
          .:
          f:apiVersion:
          f:kind:
          f:name:
          f:namespace:
        f:delivery:
          .:
          f:deadLetterSink:
            .:
            f:ref:
              .:
              f:apiVersion:
              f:kind:
              f:name:
    Manager:      kubectl-client-side-apply
    Operation:    Update
    Time:         2021-10-31T17:45:28Z
    API Version:  eventing.knative.dev/v1
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:finalizers:
          .:
          v:"brokers.eventing.knative.dev":
      f:status:
        .:
        f:address:
        f:conditions:
        f:observedGeneration:
    Manager:         kafka-controller
    Operation:       Update
    Time:            2021-10-31T17:45:29Z
  Resource Version:  11975897
  UID:               21422643-6f90-426e-807e-fe2b9b9e565a
Spec:
  Config:
    API Version:  v1
    Kind:         ConfigMap
    Name:         kafka-broker-config
    Namespace:    knative-eventing
  Delivery:
    Dead Letter Sink:
      Ref:
        API Version:  serving.knative.dev/v1
        Kind:         Service
        Name:         dlq-service
Status:
  Address:
  Conditions:
    Last Transition Time:  2021-10-31T17:45:29Z
    Status:                Unknown
    Type:                  Addressable
    Last Transition Time:  2021-10-31T17:45:29Z
    Status:                Unknown
    Type:                  ConfigMapUpdated
    Last Transition Time:  2021-10-31T17:45:29Z
    Status:                True
    Type:                  ConfigParsed
    Last Transition Time:  2021-10-31T17:45:29Z
    Status:                True
    Type:                  DataPlaneAvailable
    Last Transition Time:  2021-10-31T19:14:13Z
    Status:                Unknown
    Type:                  Ready
    Last Transition Time:  2021-10-31T19:14:13Z
    Reason:                Topic knative-broker-default-newbroker created
    Status:                True
    Type:                  TopicReady
  Observed Generation:     1
Events:
  Type     Reason         Age                 From               Message
  ----     ------         ----                ----               -------
  Warning  InternalError  50m (x21 over 89m)  broker-controller  failed to create topic: knative-broker-default-newbroker: failed to create cluster admin: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
  Warning  InternalError  16m (x4 over 48m)   broker-controller  failed to create topic: knative-broker-default-newbroker: [protocol SASL_SSL] SASL user required (key: user)
  Warning  InternalError  15s (x3 over 16s)   broker-controller  failed to get contract configuration: failed to resolve Spec.Delivery.DeadLetterSink: services.serving.knative.dev "dlq-service" not found

Now I'm getting dql-service not sure if this something that I should be concerned. There are these comments during the broker creation:

  # Optional dead letter sink, you can specify either:
  #  - deadLetterSink.ref, which is a reference to a Callable
  #  - deadLetterSink.uri, which is an absolute URI to a Callable (It can potentially be out of the Kubernetes cluster)

But also there is this block underneath it.

        apiVersion: serving.knative.dev/v1
        kind: Service
        name: dlq-service

In the documentation doesn't says about to create this dlq-service service. Have any idea?

below I'm just showing the example given from: https://knative.dev/docs/eventing/broker/kafka-broker/ You saw that one already.

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    # case-sensitive
    eventing.knative.dev/broker.class: Kafka
  name: default
  namespace: default
spec:
  # Configuration specific to this broker.
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config
    namespace: knative-eventing
  # Optional dead letter sink, you can specify either:
  #  - deadLetterSink.ref, which is a reference to a Callable
  #  - deadLetterSink.uri, which is an absolute URI to a Callable (It can potentially be out of the Kubernetes cluster)
  delivery:
    deadLetterSink:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: dlq-service
soaand01 commented 2 years ago

@pierDipi and @csantanapr .

Now that I have the broker working. (Topic has been successfully created in my azure event hub). Should I create a trigger to produce the messages and another trigger to consume these messages ? It's not that clear for me, maybe you guys had tried it produce and consume messages from the broker.

csantanapr commented 2 years ago

@soaand01 You can now use examples from the docs Like this one to produce an event with curl and event-display to print the message

https://knative.dev/development/eventing/getting-started/#creating-event-consumers

pierDipi commented 2 years ago

The only one difference between yours and mine is that I have used --from-literal=user="$ConnectionString" ("") and you have used --from-literal=user='$ConnectionString' ('').

Ah, ok, bash interprets $ConnectionString as a variable inside double-quotes.

pierDipi commented 2 years ago

The deadLetterSink service should be created before you can reference it, if you don't have it remove this block from your broker:

  delivery:
    deadLetterSink:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: dlq-service
soaand01 commented 2 years ago

Hello @pierDipi ,

Thank you again, I just removed and now it's working, I can produce messages to the broker. Now I will find out how to consume it.

soaand01 commented 2 years ago

@csantanapr Thanks, I followed the steps.

Everything goes fine, except when I try to get the logs.

soareslo:~$kubectl -n default logs -l app=hello-display --tail=100
2021/11/02 08:11:04 Failed to read tracing config, using the no-op default: empty json tracing config

Something weird with the tracing, unfortunately, any idea?

soaand01 commented 2 years ago

@csantanapr Since this is a different issue, I raised a new one.

https://github.com/knative/eventing/issues/5866

pierDipi commented 2 years ago

Thanks, closing this for now.