Closed tirelibirefe closed 4 years ago
Do you have Connector Operator enabled in your Kafka Connect? If yes, did you add network policy to allow access to your application?
I don't know what Connector Operator is... Sorry.
I use only Strimzi Cluster Operator just as before.
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
name: kafka-beytepe
spec:
kafka:
version: 2.5.0
replicas: 3
listeners:
plain: {}
tls: {}
readinessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
livenessProbe:
...
and
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
name: custom-kafka-connect-cluster
namespace: kafka
labels:
app: custom-kafka-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 2.5.0
image: harbor.dev1.kik.io/kafka/kc:v0.1
replicas: 1
bootstrapServers: kafka-beytepe-kafka-bootstrap:9093
externalConfiguration:
...
Connector Operator is the a process inside the Cluster Operator. It enabled the use of the KafkaConnector
resource to manage connectors in connect. It is enabled by this annotation which you have in your KafkaConnect
resourced:
annotations:
strimzi.io/use-connector-resources: "true"
As part of enabling it, some parts of the REST API should not be used (e.g. creating connectors, editing connectors, pausing connectors) since anything you do that way will be reverted by the operator. Because the connector operator needs access to the Connect API, it needs to set a network policy. If you want to use the connector operator and still also access the API, you will need to create your own policy to give your applications access to the REST API. If you want to use only the REST API, and not the KafkaConnector
resources, you should just remove the annotation above.
You can have a look for example here https://github.com/strimzi/strimzi-kafka-operator/issues/3143 how the netork policy should look like - you will need to adjust it to your own names etc. Just make sure to not edit the existing policy created by the operator and to give it your own name.
...but; my connector operator is enabled. my brooker and connector pods and services are all in the same namespace, so they no need to a network policy to communicate with each other. #3143 is an example for ingress, I am not trying to connect kafkaconnect through ingress.
The Ingress
in the network policy means that it controls who can connect into the Kafka Connect pod and not where the Kafka Connect pod can connect (that would be Egress). The other components also have network policies and for example they would prevent you from accessing Zookeeper as well (among other things - Zoo has also TLS authentication).
I've just noticed; clusteroperator/connectoperator already created network policy and there is no any restriction:
devadmin@vdi-mk2-ubn:~$ kubectl get networkpolicy -n kafka
NAME POD-SELECTOR AGE
custom-kafka-connect-cluster-connect strimzi.io/cluster=custom-kafka-connect-cluster,strimzi.io/kind=KafkaConnect,strimzi.io/name=custom-kafka-connect-cluster-connect 159m
kafka-beytepe-network-policy-kafka strimzi.io/name=kafka-beytepe-kafka 2d3h
kafka-beytepe-network-policy-zookeeper strimzi.io/name=kafka-beytepe-zookeeper 2d3h
devadmin@vdi-mk2-ubn:~$ kubectl describe networkpolicy custom-kafka-connect-cluster-connect -n kafka
Name: custom-kafka-connect-cluster-connect
Namespace: kafka
Created on: 2020-07-08 20:37:07 +0300 +03
Labels: app=custom-kafka-connect-cluster
app.kubernetes.io/instance=custom-kafka-connect-cluster
app.kubernetes.io/managed-by=strimzi-cluster-operator
app.kubernetes.io/name=kafka-connect
app.kubernetes.io/part-of=strimzi-custom-kafka-connect-cluster
strimzi.io/cluster=custom-kafka-connect-cluster
strimzi.io/kind=KafkaConnect
strimzi.io/name=strimzi
Annotations: <none>
Spec:
PodSelector: strimzi.io/cluster=custom-kafka-connect-cluster,strimzi.io/kind=KafkaConnect,strimzi.io/name=custom-kafka-connect-cluster-connect
Allowing ingress traffic:
To Port: 8083/TCP
From:
PodSelector: strimzi.io/cluster=custom-kafka-connect-cluster,strimzi.io/kind=KafkaConnect,strimzi.io/name=custom-kafka-connect-cluster-connect
From:
NamespaceSelector: <none>
PodSelector: strimzi.io/kind=cluster-operator
----------
To Port: 9404/TCP
From: <any> (traffic not restricted by source)
Not affecting egress traffic
Policy Types: Ingress
devadmin@vdi-mk2-ubn:~$ kubectl get pods -n kafka -l strimzi.io/cluster=custom-kafka-connect-cluster
NAME READY STATUS RESTARTS AGE
custom-kafka-connect-cluster-connect-6595dd78fd-55r6z 1/1 Running 0 161m
devadmin@vdi-mk2-ubn:~$
Thsi is the restriction:
To Port: 8083/TCP
From:
PodSelector: strimzi.io/cluster=custom-kafka-connect-cluster,strimzi.io/kind=KafkaConnect,strimzi.io/name=custom-kafka-connect-cluster-connect
From:
NamespaceSelector: <none>
PodSelector: strimzi.io/kind=cluster-operator
This say that only pods matching the selector strimzi.io/kind=cluster-operator
or strimzi.io/cluster=custom-kafka-connect-cluster,strimzi.io/kind=KafkaConnect,strimzi.io/name=custom-kafka-connect-cluster-connect
can connect to port 8083. And that is why you need to create your own network policy which would also allow your pods which need the access.
It worked! Thank you very much @scholzj you helped me a lot as usual! I'm very appreciated.
Hey @scholzj! sorry bothering you so much these as I get stuck on different issues. Now, I am getting a similar issue. When I deploy Kafka connect, it is failing with the following exception.
2023-03-29 19:34:46,297 INFO [Worker clientId=connect-1, groupId=my-connect] Successfully joined group with generation Generation{generationId=39658, memberId='connect-1-641a814a-0e5f-4389-b0c6-cbbcab847518', protocol='sessioned'} (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2023-03-29 19:34:46,300 INFO [Worker clientId=connect-1, groupId=my-connect] Successfully synced group in generation Generation{generationId=39658, memberId='connect-1-641a814a-0e5f-4389-b0c6-cbbcab847518', protocol='sessioned'} (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2023-03-29 19:34:46,300 INFO [Worker clientId=connect-1, groupId=my-connect] Joined group at generation 39658 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-8a3b58e7-217b-4414-9105-ab870c9937e6', leaderUrl='http://10.33.155.47:8083/', offset=1, connectorIds=[], taskIds=[s3-sink-connector-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2023-03-29 19:34:46,300 INFO [Worker clientId=connect-1, groupId=my-connect] Starting connectors and tasks using config offset 1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2023-03-29 19:34:46,300 INFO [Worker clientId=connect-1, groupId=my-connect] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2023-03-29 19:34:47,824 ERROR IO error forwarding REST request: (org.apache.kafka.connect.runtime.rest.RestClient) [qtp1239728853-20] java.util.concurrent.ExecutionException: java.net.SocketTimeoutException: Connect Timeout
I even added the network policy with full complete open ingress/egress rules. I am deploying it on EKS inside the private subnet. Do you have some suggestions for me? thanks
If you have multiple Kafka Connect clusters deploeyd, make sure they each use different topics and group. Otherwise they mix into one big cluster. That is all what comes to my mind based on this snippet.
Thanks, that was indeed the problem.
Hello; I need to your help again. I've just installed Kafka & KafkaConnect via Strimzi but KafkaConnect worker cannot be accessed via rest 8083 neither with name nor ip. If I set
rest.advertised.host.name
in kc config, operator says:WARN AbstractConfiguration:129 - Configuration option "rest.advertised.host.name" is forbidden and will be ignored
The problem is described below:
I am not able to find what I'm doing wrongly, could you please advise?