Open qafro1 opened 3 years ago
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: kafka-connect
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.0.0
replicas: 1
bootstrapServers: kafka1-cluster-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: kafka1-cluster-cluster-ca-cert
certificate: ca.crt
authentication:
type: tls
certificateAndKey:
secretName: kafka-int-tls
certificate: user.crt
key: user.key
config:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
# key.converter.schemas.enable: true
# value.converter.schemas.enable: true
config.providers: directory
config.providers.directory.class: org.apache.kafka.common.config.provider.DirectoryConfigProvider
build:
output:
type: docker
image: asia.gcr.io/my-gcp-project-name/kafka-connect:latest # will auto create and push the image to gcr with this name
pushSecret: docker-credentials
plugins:
- name: gcp-bigquery
artifacts:
- type: jar
url: https://github.com/GoogleCloudPlatform/pubsub/releases/download/v0.11-alpha/pubsub-kafka-connector.jar
sha512sum: ee07f6334f5ea77d39689b0e764aaf32f9324ad90811bfe41c5039cd212bbd715e913bc7c9a8c3157b0f91726e1b2aba6b7e53bfacb5e558eeab870c6dc6f60e
---
# gcloud auth print-access-token | docker login -u oauth2accesstoken --password-stdin https://asia.gcr.io
# OR
# cat keyfile.json | docker login -u _json_key --password-stdin https://asia.gcr.io
# kubectl create secret generic docker-credentials --from-file=.dockerconfigjson=~/.docker/config.json --type=kubernetes.io/dockerconfigjson
apiVersion: v1
kind: Secret
metadata:
name: docker-credentials
labels:
app: connect
namespace: kafka
type: kubernetes.io/dockerconfigjson
data:
.dockerconfigjson: ewogICJ0eXBlIjogInNlcnZpY2Vf # your docker config file in base64 encoded, I have given two commads above to generate this secret. Srimzi Doc link: https://strimzi.io/docs/operators/latest/deploying.html#creating-new-image-using-kafka-connect-build-str
Strimzi kafka connector resource
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: source-pubsub
labels:
strimzi.io/cluster: kafka-connect
spec:
class: com.google.pubsub.kafka.source.CloudPubSubSourceConnector
tasksMax: 2
config:
cps.project: my-gcp-project-name
cps.subscription: test-topic-sub # pubsub subscription name
kafka.topic: topic2 # kafka topic name where messages will be consumed
Now when i consume messages from kafka cluster they look like this.
$ ./bin/kafka-console-consumer.sh --bootstrap-server kafka1-cluster-kafka-bootstrap:9097 --topic topic2 --from-beginning --consumer.config /tmp/config.properties
{"schema":{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"message"},{"type":"string","optional":false,"field":"_stream"},{"type":"string","optional":false,"field":"_namespace"}],"optional":false},"payload":{"message":"eyJfYWlyYnl0ZV9hYl9pZCI6IjA1YjA3MzgxLWE2NmEtNDMwOC05Mjg0LWU1ZTBiZjk4NmYxNyIsIl9haXJieXRlX2RhdGEiOnsic2Vzc2lvbl9kYXRlIjoiMjAyMS0xMi0xM1QwMDowMDowMFoiLCJjb3VudHJ5IjoiSW5kaWEiLCJjaXR5IjoiSmFpcHVyIiwic2Vzc2lvbl9jb3VudCI6MzI2NTM3fSwiX2FpcmJ5dGVfZW1pdHRlZF9hdCI6MTY0MzIxNDY5NzU1NX0","_stream":"pb_test_table","_namespace":"behaviour_us"}}
Next challenge is to decode the base64 encoded payload received from pubsub OR is there any way to directly receive base64 decoded json payload. On the plus note - Able to receive messages in kafka from pubsub.
Was there ever a solution to this?
Am interested if you can provide a docker image and guidance to support strimzi.io kafka connect?