kgneng2 / blokg

blog
MIT License
0 stars 0 forks source link

kafka connector test 및 모니터링 #67

Open kgneng2 opened 2 years ago

kgneng2 commented 2 years ago

주요 config 정리

partition 별로 생성하는 단위 및 lag 사이즈 조절 flush에 관련된 config 테스트 필요합니다. 또한 파일 갯수가 많아짐을 줄여주는 작업이 필요합니다.( 한번에 memory에 들고있다가 flush 작업을 통해서 진행)

connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
partition.duration.ms=600000 -> hdfs parition 단위.(10분)
topics.dir=/data/log -> hdfs path
tasks.max=6 -> executor 갯수
topics=irene-shopping-log -> consume 대상 topic
format.class=io.confluent.connect.hdfs.avro.AvroFormat -> data format
partitioner.class=io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner -> hdfs partition class
path.format=yyyyMMdd/HH/mm/yyyyMMddHHmm 
timestamp.field=timestamp

flush.size=600000 -> record size 갯수가 지정 단위가 되면 flush
rotate.schedule.interval.ms=600000

Reference

https://docs.confluent.io/current/connect/kafka-connect-hdfs/configuration_options.html

error logging config

errors.log.enable=true  // (1) 에러 발생시 관련정보를 로깅할지
errors.log.include.messages=true  // (2) 에러발생시 consume한 record를 파일에 로깅할지

Dead Letter Queue( Sink connector만 가능)

config option description default value domain
errors.deadletterqueue.topic.name The name of the dead letter queue topic. If not set, this feature will be disabled. "" A valid Kafka topic name
errors.deadletterqueue.topic.replication.factor Replication factor used to create the dead letter queue topic when it doesn't already exist. 3 [1 ... Short.MAX_VALUE]
errors.deadletterqueue.context.headers.enable If true, multiple headers will be added to annotate the record with the error context false Boolean

kafka connector offset 관리

HDFS 임시 보관 directory path : /data/log/+tmp/irene-shopping-log/20190314/16/30/201903141630

file commit 이후에는 해당 디렉토리 내용은 삭제된다.

distributed topic 3개

connect-status

connect-offset

connect-config

{"state":"STARTED"}
{"properties":{"file":"/home1/irteam/jy-test-txt.txt","task.class":"org.apache.kafka.connect.file.FileStreamSinkTask","topics":"test1"}}
{"tasks":3}

monitoring API

지속적으로 status 체크를 진행해서, conenctor의 state를 확인하면 될꺼같은데, 다른방안이 더 있느지 확ㅇ니해본다

file 저장 형식을 다른 형식으로 되는지 확인하기

error handling test

configuration

errors.deadletterqueue.topic.name=test
errors.tolerance=all //에러 발생시 관련 정보를 로깅할지에 대한 여부
errors.retry.delay.max.ms=30000  //에러 발생시 재시도 딜레이 시간 (밀리초 단위), 30000이면 30초 마다 재시도 수행 
errors.log.enable=true 
errors.deadletterqueue.context.headers.enable=true
errors.log.include.messages=true // Record를 로그파일에 로깅할지 여부
errors.retry.timeout=150000

결론

Code 분석

kgneng2 commented 2 years ago

kafka connector fail over test

test prepare

value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=false

key.converter.schema.registry.url=http://dev.schema.navercorp.com:8081/

value.converter.schema.registry.url=http://dev.schema.navercorp.com:8081 group.id=connect-cluster

offset.storage.topic=connect-offsets offset.storage.replication.factor=1

config.storage.topic=connect-configs config.storage.replication.factor=1

status.storage.topic=connect-status status.storage.replication.factor=1

offset.flush.interval.ms=10000

plugin.path=/home1/irteam/apps/kafka-connector-hdfs/plugins

plugin.path=/home1/irteam/apps/confluent-5.1.2/share/java

access.control.allow.methods=GET,POST,PUT,OPTIONS access.control.allow.origin=*

- **hdfs 는 붙을수가 없어서, file sink 로 테스트 진행**
- ui : http://:8080/#/cluster/test/connector/FileStreamSinkConnector

- connector task configuration 

connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector file=/home1/irteam/jy-test-txt.txt topics=test1 tasks.max=3


## 테스트 진행 및 결과
### kafka connector node down 시켰을때
1. kafka message 를 지속적으로 발생시킴
2. kafka connector는 task.max 가 3이기 때문에, task 가 각기 다른 서버 3대에서 떠서 consume 후 파일 write하기 시작함.
3. 10.106.144.78 (2번서버) 에 kafka connector를 죽임.
4. 

[2019-03-12 17:30:34,179] INFO [Worker clientId=connect-1, groupId=connect-cluster] Attempt to heartbeat failed since group is rebalancing (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:831) [2019-03-12 17:30:34,182] INFO [Consumer clientId=consumer-8, groupId=connect-FileStreamSinkConnector] Sending LeaveGroup request to coordinator dev-kafkaconnector003-ncl.nfra.io:9092 (id: 2147483644 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:782) [2019-03-12 17:30:34,186] INFO Finished stopping tasks in preparation for rebalance (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1261) [2019-03-12 17:30:34,186] INFO [Worker clientId=connect-1, groupId=connect-cluster] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:486) [2019-03-12 17:30:34,329] INFO [Worker clientId=connect-1, groupId=connect-cluster] Successfully joined group with generation 39 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:450

다음과 같은 로그를 dev-kafkaconnector003-ncl(10.106.145.65) 에서 남기면서 UI 상에서 테스크가 다시할당됨을 확인함

<img width="718" alt="image" src="https://user-images.githubusercontent.com/7286378/164999111-d4d06f23-c2dd-4c96-bc71-3169e8729d68.png">

5. 데이터를 확인해보니 
- 2번 서버에서 {test=147 hello} 까지 consume해서 파일에 쓰고 죽었으나
- 3번 서버에서  

{test=141 hello} {test=142 hello} {test=150 hello} {test=152 hello} {test=153 hello} {test=147 hello} {test=149 hello} {test=151 hello} {test=154 hello} {test=155 hello} {test=156 hello}


이어 받아서 task를 진행함.
- {test=147 hello} 메시지를 중복해서 저장하긴하나 손실은 일어나지 않았음. (file sink임을 인지..hdfs sink일때는 어떻게 될지 모름)

6. 002 서버를 다시 재시작 하니 다음과 같이 task를 분배하였습니다

## reference
- https://docs.confluent.io/current/connect/concepts.html#task-rebalancing
- https://docs.confluent.io/current/connect/concepts.html#distributed-workers