TavenYin / Blog2

Github Issues Blog
1 stars 0 forks source link

Kafka Connect 实战:入门 #5

Open TavenYin opened 2 years ago

TavenYin commented 2 years ago

前提

首先你需要了解MQ / Kafka相关的知识

本文目标

了解 Kafka Connect 基本概念与功能

什么是Kafka Connect

Kafka Connect 是一款可扩展并且可靠地在 Apache Kafka 和其他系统之间进行数据传输的工具。 可以很简单的定义 connectors(连接器) 将大量数据迁入、迁出Kafka。

例如我现在想要把数据从MySQL迁移到ElasticSearch,为了保证高效和数据不会丢失,我们选择MQ作为中间件保存数据。这时候我们需要一个生产者线程,不断的从MySQL中读取数据并发送到MQ,还需要一个消费者线程消费MQ的数据写到ElasticSearch,这件事情似乎很简单,不需要任何框架。

但是如果我们想要保证生产者和消费者服务的高可用性,例如重启后生产者恢复到之前读取的位置,分布式部署并且节点宕机后将任务转移到其他节点。如果要加上这些的话,这件事就变得复杂起来了,而Kafka Connect 已经为我们造好这些轮子。

Kafka Connect 如何工作?

image.png

Kafka Connect 特性如下:

Kafka Connect Concepts

这里简单介绍下Kafka Connect 的概念与组成 更多细节请参考 👉 https://docs.confluent.io/platform/current/connect/concepts.html

Connectors

连接器,分为两种 Source(从源数据库拉取数据写入Kafka),Sink(从Kafka消费数据写入目标数据)

连接器其实并不参与实际的数据copy,连接器负责管理Task。连接器中定义了对应Task的类型,对外提供配置选项(用户创建连接器时需要提供对应的配置信息)。并且连接器还可以决定启动多少个Task线程。

用户可以通过Rest API 启停连接器,查看连接器状态

Confluent 已经提供了许多成熟的连接器,传送门👉 https://www.confluent.io/product/connectors/

Task

实际进行数据传输的单元,和连接器一样同样分为 Source和Sink

Task的配置和状态存储在Kafka的Topic中,config.storage.topicstatus.storage.topic。我们可以随时启动,停止任务,以提供弹性、可扩展的数据管道

Worker

刚刚我们讲的Connectors 和Task 属于逻辑单元,而Worker 是实际运行逻辑单元的进程,Worker 分为两种模式,单机模式和分布式模式

单机模式:比较简单,但是功能也受限,只有一些特殊的场景会使用到,例如收集主机的日志,通常来说更多的是使用分布式模式

分布式模式:为Kafka Connect提供了可扩展和故障转移。相同group.id的Worker,会自动组成集群。当新增Worker,或者有Worker挂掉时,集群会自动协调分配所有的Connector 和 Task(这个过程称为Rebalance)

Worker 集群

当使用Worker集群时,创建连接器,或者连接器Task数量变动时,都会触发Rebalance 以保证集群各个Worker节点负载均衡。但是当Task 进入Fail状态的时候并不会触发 Rebalance,只能通过Rest Api 对Task进行重启

Converters

Kafka Connect 通过 Converter 将数据在Kafka(字节数组)与Task(Object)之间进行转换

默认支持以下Converter

Converters 与 Connector 是解耦的,下图展示了在Kafka Connect中,Converter 在何时进行数据转换

image.png

Transforms

连接器可以通过配置Transform 实现对单个消息(对应代码中的Record)的转换和修改,可以配置多个Transform 组成一个。例如让所有消息的topic加一个前缀、sink无法消费source 写入的数据格式,这些场景都可以使用Transform 解决

Transform 如果配置在Source 则在Task之后执行,如果配置在Sink 则在Task之前执行

Dead Letter Queue

与其他MQ不同,Kafka 并没有死信队列这个功能。但是Kafka Connect提供了这一功能。

当Sink Task遇到无法处理的消息,会根据errors.tolerance配置项决定如何处理,默认情况下(errors.tolerance=none) Sink 遇到无法处理的记录会直接抛出异常,Task进入Fail 状态。开发人员需要根据Worker的错误日志解决问题,然后重启Task,才能继续消费数据

设置 errors.tolerance=all,Sink Task 会忽略所有的错误,继续处理。Worker中不会有任何错误日志。可以通过配置errors.deadletterqueue.topic.name = <dead-letter-topic-name> 让无法处理的消息路由到 Dead Letter Topic

快速上手

下面我来实战一下,如何使用Kafka Connect,我们先定一个小目标 将MySQL中的全量数据同步到Redis


  1. 新建文件 docker-compose.yaml

    version: '3.7'
    services:
    zookeeper:
    image: wurstmeister/zookeeper
    container_name: zk
    ports:
      - 2182:2181
    
    kafka:
    image: wurstmeister/kafka:2.13-2.7.0
    container_name: kafka
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 0
      # 宿主机ip
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.3.21:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
    depends_on:
      - zookeeper

    在终端上执行 docker-compose -f docker-compose.yaml up -d 启动docker容器

准备连接器,这里我是自己写了一个简单的连接器😄。下载地址:https://github.com/TavenYin/kafka-connect-example/blob/master/release/kafka-connector-example-bin.jar

# 将连接器上传到kafka 容器中
docker cp kafka-connector-example-bin.jar kafka:/opt/connectors
  1. 修改配置并启动Worker
    
    #在配置文件末尾追加 plugin.path=/opt/connectors
    vi /opt/kafka/config/connect-distributed.properties

启动Worker

bin/connect-distributed.sh -daemon config/connect-distributed.properties


3. 准备MySQL

由于我宿主机里已经安装了MySQL,我就直接使用了,使用如下Sql创建表。创建之后随便造几条数据

CREATE TABLE test_user ( id bigint(20) NOT NULL AUTO_INCREMENT, name varchar(255) DEFAULT NULL, PRIMARY KEY (id) ) ;


4. 创建连接器

新建 `source.json`

{ "name" : "example-source", "config" : { "connector.class" : "com.github.taven.source.ExampleSourceConnector", "tasks.max" : "1", "database.url" : "jdbc:mysql://192.168.3.21:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=UTC&rewriteBatchedStatements=true", "database.username" : "root", "database.password" : "root", "database.tables" : "test_user" } }


向Worker 发送请求,创建连接器 
`curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json`

> `source.json` 中,有一些属性是Kafka Connect 提供的,例如上述文件中 `name`, `connector.class`, `tasks.max`,剩下的属性可以在开发Connector 时自定义。关于Kafka Connect Configuration 相关请阅读这里 👉 https://docs.confluent.io/platform/current/installation/configuration/connect/index.html

5. 确认数据是否写入Kafka

首先查看一下Worker中的运行状态,如果Task的state = RUNNING,代表Task没有抛出任何异常,平稳运行

bash-4.4# curl -X GET localhost:8083/connectors/example-source/status {"name":"example-source","connector":{"state":"RUNNING","worker_id":"172.21.0.3:8083"}, "tasks":[{"id":0,"state":"RUNNING","worker_id":"172.21.0.3:8083"}],"type":"source"}


查看kafka 中Topic 是否创建

bash-4.4# bin/kafka-topics.sh --list --zookeeper zookeeper:2181 __consumer_offsets connect-configs connect-offsets connect-status test_user

这些Topic 都存储了什么?

-  __consumer_offsets: 记录所有Kafka Consumer Group的Offset
- connect-configs: 存储连接器的配置,对应Connect 配置文件中`config.storage.topic`
- connect-offsets: 存储Source 的Offset,对应Connect 配置文件中`offset.storage.topic`
- connect-status: 连接器与Task的状态,对应Connect 配置文件中`status.storage.topic`

---

查看topic中数据,此时说明MySQL数据已经成功写入Kafka

bash-4.4# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_user --from-beginning {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"}],"optional":false,"name":"test_user"},"payload":{"id":1,"name":"yyyyyy"}} {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"}],"optional":false,"name":"test_user"},"payload":{"id":2,"name":"qqqq"}} {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"}],"optional":false,"name":"test_user"},"payload":{"id":3,"name":"eeee"}}

数据结构为Json,可以回顾一下上面我们修改的`connect-distributed.properties`,默认提供的Converter 为JsonConverter,所有的数据包含schema 和 payload 两项是因为配置文件中默认启动了`key.converter.schemas.enable=true`和`value.converter.schemas.enable=true`两个选项

6. 启动 Sink

新建sink.json

{ "name" : "example-sink", "config" : { "connector.class" : "com.github.taven.sink.ExampleSinkConnector", "topics" : "test_user, test_order", "tasks.max" : "1", "redis.host" : "192.168.3.21", "redis.port" : "6379", "redis.password" : "", "redis.database" : "0" } }


创建Sink Connector
` curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @sink.json`

然后查看Sink Connector Status,这里我发现由于我的Redis端口只对localhost开发,所以这里我的Task Fail了,修改了Redis配置之后,重启Task `curl -X POST localhost:8083/connectors/example-sink/tasks/0/restart`

在确认了Sink Status 为RUNNING 后,可以确认下Redis中是否有数据

> 关于Kafka Connect Rest api 文档,请参考👉https://docs.confluent.io/platform/current/connect/references/restapi.html

7. 如何查看Sink Offset消费情况

使用命令
`bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group connect-example-sink` 

下图代表 `test_user` topic 三条数据已经全部消费

![](https://upload-images.jianshu.io/upload_images/9949918-314c70a70aaa7a16.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

# Kafka Connect 高级功能

我们的小目标已经达成了。现在两个Task无事可做,正好借此机会我们来体验一下可扩展和故障转移

## 集群扩展
我启动了开发环境中的Kafka Connect Worker,根据官方文档所示通过注册同一个Kafka 并且使用相同的 `group.id=connect-cluster` 可以自动组成集群

启动我开发环境中的Kafka Connect,之后检查两个连接器状态

bash-4.4# curl -X GET localhost:8083/connectors/example-source/status {"name":"example-source","connector":{"state":"RUNNING","worker_id":"172.23.176.1:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.23.176.1:8083"}],"type":"source"}bash-4.4#

bash-4.4# curl -X GET localhost:8083/connectors/example-sink/status {"name":"example-sink","connector":{"state":"RUNNING","worker_id":"172.21.0.3:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.21.0.3:8083"}],"type":"sink"}


观察worker_id 可以发现,两个Connectors 已经分别运行在两个Worker上了

## 故障转移

此时我们通过`kill pid`结束docker中的Worker进程观察是否宕机之后自动转移,但是发现Task并没有转移到仅存的Worker中,Task 状态变为UNASSIGNED,这是为啥呢?难道是有什么操作错了?

在网上查阅了一番得知,Kafka Connect 的集群扩展与故障转移机制是通过Kafka Rebalance 协议实现的(Consumer也是该协议),当Worker节点宕机时间超过 `scheduled.rebalance.max.delay.ms` 时,Kafka才会将其踢出集群。踢出后将该节点的连接器和任务分配给其他Worker,`scheduled.rebalance.max.delay.ms`默认值为五分钟。

后来经测试发现,五分钟之后查看连接器信息,已经转移到存活的Worker节点了

本来还计划写一下如何开发连接器和Kafka Rebalance,但是这篇已经够长了,所以计划后续更新这两篇文章