kgneng2 / blokg

blog
MIT License
0 stars 0 forks source link

Elasticsearch kafka sink connector (설치 법 포함) #38

Open kgneng2 opened 3 years ago

kgneng2 commented 3 years ago

kafka-connector-elasticsearch

https://github.com/confluentinc/kafka-connect-elasticsearch/blob/master/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnector.java

exactly once

mapping

Reindexing

test server

kafka connector , kafka

es

install guide

Configuration

connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
type.name=es
topics=es_test
tasks.max=1
batch.size=5 // hdfs의 flush.size와 동일한 기능
connection.url=http://10.113.112.154:9200/
key.ignore=true  // key 값을 이용해서 es document id를 생성함.
schema.ignore=true // es mapping 무시하는 값

구축함으로써 얻을 수 있는 장점

테스트 및 더 조사해볼 점

mapping 및 parsing error시에 worker 가 죽는데, 다시 살리거나 무시하는경우 찾기.

connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
type.name=es
topics=eses
tasks.max=3
batch.size=5
drop.invalid.message=true //동작을 안하는듯.....
key.ignore=true
schema.ignore=true
behavior.on.malformed.documents=ignore
key.converter.schemas.enable=false
value.converter.schemas.enable=false
connection.url=http://10.113.112.154:9200/
errors.tolerance=all //이 값을 작성시 발생 exception 무시 한다 

schema evolution test 필요

logTime 및 timestamp indexing

drop.invalid.message, behavior.on.malformed.documents 여기 값 지정시 error log가 찍히지 않는다.

kafka connector elasticsearch cluster 구성

value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=false

key.converter.schema.registry.url=http://dev.schema.navercorp.com:8081/

value.converter.schema.registry.url=http://dev.schema.navercorp.com:8081 group.id=test-velvet-connect-cluster

offset.storage.topic=test-velvet-connect-offsets offset.storage.replication.factor=1

config.storage.topic=test-velvet-connect-configs config.storage.replication.factor=1

status.storage.topic=test-velvet-connect-status status.storage.replication.factor=1

offset.flush.interval.ms=10000

plugin.path=/home1/irteam/apps/kafka-connector-hdfs/plugins

plugin.path=/home1/irteam/apps/confluent/share/java,/home1/irteam/apps/confluent/share/confluent-hub-components

access.control.allow.methods=GET,POST,PUT,OPTIONS access.control.allow.origin=*

topic.schema.ignore=true

topic.key.ignore=true

drop.invalid.message=true

behavior.on.null.values=ignore

behavior.on.malformed.documents=ignore

error

errors.tolerance=all erros.log.enable=true errors.log.include.messages=true


# Example
## send_sms
```properties
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
type.name=send_sms // elasticsearch type 
topics=send_sms
tasks.max=3
batch.size=10 
transforms=Router //transform alias
key.ignore=true
schema.ignore=true // schema 무시
key.converter.schemas.enable=false
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
connection.url=http://test-elastic001.ncl.nfra.io:9200
transforms.Router.type=org.apache.kafka.connect.transforms.TimestampRouter 
transforms.Router.topic.format=${topic}-${timestamp} // index format -> send_sms-yyyyMM
transforms.Router.timestamp.format=yyyyMM  // format에 영향을 준다.

vshopping-log-click example (es)

connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
type.name=vshopping-log-click
transforms.Router.topic.format=${topic}-${timestamp}
schema.compatibility=BACKWARD
topics=vshopping-log-click
tasks.max=3
batch.size=10000
transforms=Router
key.ignore=true
max.buffered.records=100000
schema.ignore=true
key.converter.schemas.enable=false
value.converter.schema.registry.url=http://dev.schema.navercorp.com:8081
flush.timeout.ms=60000
transforms.Router.timestamp.format=yyyyMMdd
value.converter.schemas.enable=false
value.converter=io.confluent.connect.avro.AvroConverter
connection.url=http://test-elastic001.ncl.nfra.io:9200
transforms.Router.type=org.apache.kafka.connect.transforms.TimestampRouter
kgneng2 commented 2 years ago

application.sh

Environment variables

KAFKA_CONNECT_HOME=/home1/irteam/apps/kafka-connect BIN_DIRECTORY=${KAFKA_CONNECT_HOME}/bin LOG_DIRECTORY=${KAFKA_CONNECT_HOME}/log GC_LOG_DIRECTORY=${KAFKA_CONNECT_HOME}/log/gc LOG_PROPERTIES=${KAFKA_CONNECT_HOME}/conf/connect-log4j.properties CONNECT_PROPERTIES=${KAFKA_CONNECT_HOME}/conf/connect-distributed.properties

CONFLUENT_HOME=/home1/irteam/apps/confluent CONFLUENT_KAFKA_CONNECT_SCRIPT=${CONFLUENT_HOME}/bin/connect-distributed

홈디렉토리 이동 후 작업 시작

cd ${KAFKA_CONNECT_HOME}

export KAFKA_HEAP_OPTS="-Xmx2g -Xms2g"

confluent connect-distributed 스크립트를 실행할때 아래 export한 변수를 이용해서 java startup하기때문에 여기서 선정의한다.

export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xloggc:$GC_LOG_DIRECTORY/gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=2M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$GC_LOG_DIRECTORY/heapdump.hprof" export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG_PROPERTIES}"

PIDFile="application.pid"

function check_if_pid_file_exists { if [[ ! -f $PIDFile ]] then echo "PID file not found: $PIDFile" fi }

function check_if_process_is_running { if [[ ! -f $PIDFile ]] then return 1 else if ps -p $(print_process) > /dev/null then return 0 else return 1 fi fi }

function print_process { echo $(<"$PIDFile") }

case "$1" in status) check_if_pid_file_exists if check_if_process_is_running then echo $(print_process)" is running" else echo "Process not running: $(print_process)" fi ;; stop) check_if_pid_file_exists if ! check_if_process_is_running then echo "Process $(print_process) already stopped" exit 0 fi kill -TERM $(print_process) echo "Waiting for process to stop" NOT_KILLED=1 for i in {1..60}; do if check_if_process_is_running then echo "." sleep 1 else NOT_KILLED=0 fi done echo if [[ $NOT_KILLED = 1 ]] then echo "Cannot kill process $(print_process)" exit 1 fi echo "Process stopped" ;; start) if [[ -f $PIDFile ]] && check_if_process_is_running then echo "Process $(print_process) already running" exit 1 fi

    nohup ${CONFLUENT_KAFKA_CONNECT_SCRIPT} ${CONNECT_PROPERTIES} 2>&1 > /dev/null &

    PID=$!
    echo ${PID} > ${PIDFile}
    echo "Process ${PID} started"
;;
restart)
    $0 stop
    if [[ $? = 1 ]]
    then
        exit 1
    fi
        $0 start
;;
*)
    echo "Usage: $0 {start|stop|restart|status}"
    exit 1

esac exit 0

kgneng2 commented 2 years ago

log4j

stdout appender

log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n

file appender

log4j.appender.file=org.apache.log4j.RollingFileAppender log4j.appender.file.File=/home1/irteam/apps/kafka-connect/log/kafka-connect.log log4j.appender.file.MaxFileSize=100MB log4j.appender.file.MaxBackupIndex=100 log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=[%d] %p %m (%c:%L)%n

아래 패키지 로깅 레벨은 confluent에서 설정한 값임

로깅레벨을 더 낮출경우 WARN 메시지가 어플리케이션 startup시에 마구 뜨는데, confluent에서는 이를 ignore하는 것으로 함

log4j.logger.org.apache.zookeeper=ERROR log4j.logger.org.I0Itec.zkclient=ERROR log4j.logger.org.reflections=ERROR

kgneng2 commented 2 years ago

1. download and install

#!/bin/bash

# irteam 계정 기준 kafka-connect 관련 디렉토리 생성
mkdir -p /home1/irteam/apps/kafka-connect/log/gc
mkdir -p /home1/irteam/apps/kafka-connect/bin
mkdir -p /home1/irteam/apps/kafka-connect/conf

cd /home1/irteam/apps

# conflueht zip file download
# https://docs.confluent.io/current/installation/installing_cp/zip-tar.html

curl -O http://packages.confluent.io/archive/5.0/confluent-oss-5.0.1-2.11.zip
unzip confluent-oss-5.0.1-2.11.zip

ln -s confluent-5.0.1 confluent

2. hadoop install

#!/bin/bash

cd /home1/irteam/apps

curl -O http://apache.mirror.cdnetworks.com/hadoop/common/hadoop-3.0.3/hadoop-3.0.3.tar.gz
tar xvfz hadoop-3.0.3.tar.gz

ln -s hadoop-3.0.3 hadoop

3. 위에 application.sh, connect-distributed.props, log4j.props. 생성

kgneng2 commented 2 years ago
type.name=dunkerque
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
topics=biz-platform-dunkerque
tasks.max=1
batch.size=5000
transforms=dailyIndex
max.retries=3
key.ignore=true
max.in.flight.requests=5
transforms.dailyIndex.type=org.apache.kafka.connect.transforms.TimestampRouter
max.buffered.records=20000
schema.ignore=true
transforms.dailyIndex.timestamp.format=YYYYMMdd
transforms.dailyIndex.topic.format=dunkerque-${timestamp}
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
connection.url=http://test-elastic001.ncl.nfra.io:9200,http://test-elastic002.ncl.nfra.io:9200,http://test-elastic003.ncl.nfra.io:9200,http://test-elastic004.ncl.nfra.io:9200,http://test-elastic005.ncl.nfra.io:9200,http://test-elastic006.ncl.nfra.io:9200,http://test-elastic007.ncl.nfra.io:9200,http://test-elastic011-ncl.nfra.io:9200,http://test-elastic012-ncl.nfra.io:9200,http://test-elastic013-ncl.nfra.io:9200,http://test-elastic014-ncl.nfra.io:9200,http://test-elastic015-ncl.nfra.io:9200,http://test-elastic016-ncl.nfra.io:9200,http://test-elastic017-ncl.nfra.io:9200,http://test-elastic018-ncl.nfra.io:9200,http://test-elastic019-ncl.nfra.io:9200,http://test-elastic020-ncl.nfra.io:9200
read.timeout.ms=9000
kgneng2 commented 2 years ago
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
type.name=htl_autocomplete_clk
transforms.Router.topic.format=${topic}-${timestamp}
schema.compatibility=BACKWARD
topics=htl-autocomplete-clk
tasks.max=3
batch.size=1000
transforms=Router
key.ignore=true // documentId 값을 무시한다. topic+partition+offset 으로 진행
schema.ignore=true // mapping 무시해서 넣는다. 
key.converter.schemas.enable=false
value.converter.schema.registry.url=http://dev.schema.navercorp.com:8081
transforms.Router.timestamp.format=yyyyMMdd
connection.url=http://dev-hotel-es000-ncl:9200,http://dev-hotel-es001-ncl:9200,http://dev-hotel-es002-ncl:9200
value.converter=io.confluent.connect.avro.AvroConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms.Router.type=org.apache.kafka.connect.transforms.TimestampRouter
read.timeout.ms=9000
kgneng2 commented 2 years ago

schema evolution

The Elasticsearch connector writes data from different topics in Kafka to different indices. All data for a topic will have the same type in Elasticsearch. This allows an independent evolution of schemas for data from different topics. This simplifies the schema evolution because Elasticsearch has one enforcement on mappings; that is, all fields with the same name in the same index must have the same mapping.

Elasticsearch supports dynamic mapping: when it encounters previously unknown field in a document, it uses dynamic mapping to determine the datatype for the field and automatically adds the new field to the type mapping.

When dynamic mapping is enabled, the Elasticsearch connector supports schema evolution. This is because mappings in Elasticsearch are more flexible than the schema evolution allowed in Connect when different converters are used. For example, when the Avro converter is used, backward, forward, and fully compatible schema evolutions are allowed.

When dynamic mapping is enabled, the Elasticsearch connector allows the following schema changes:

Adding Fields: Adding one or more fields to Kafka messages. Elasticsearch adds the new fields to the mapping when dynamic mapping is enabled. Removing Fields: Removing one or more fields from Kafka messages. Missing fields are treated as the null value defined for those fields in the mapping. Changing types that can be merged: Changing a field from integer type to string type. Elasticsearch can convert integers to strings. The following change is not allowed:

Changing types that can not be merged: Changing a field from a string type to an integer type. Because mappings are more flexible, schema compatibility should be enforced when writing data to Kafka.