NW-book-club / apache-kafka-application-programming

아파치 카프카 어플리케이션 프로그래밍 with 자바 (최원영)
0 stars 0 forks source link

Chapter 3 - 카프카 기본 개념 설명 #3

Open jjy0918 opened 2 years ago

jjy0918 commented 2 years ago

카프카 속도 향상 전략

카프카 캐시 전략

DefaultPartitioner

image

KKambi commented 2 years ago

페이지 캐시

https://en.wikipedia.org/wiki/Page_cache 운영체제는 메인 메모리(RAM)의 사용하지 않는 부분을 디스크 캐시로 사용한다.


카프카와 페이지 캐시

스크린샷 2022-05-22 오후 5 19 10 https://kafka.apache.org/documentation/#persistence https://medium.com/@sunny_81705/what-makes-apache-kafka-so-fast-71b477dcbf0 카프카는 메세지를 보관하고 캐싱하는데 파일시스템에 많이 의존


SSD vs HDD

자세하게는 모르겠지만, SSD는 전기를 HDD는 자기를 이용함


jmx port

https://www.baeldung.com/java-management-extensions JMX(Java Management eXtension)

Instrumentation : 컴퓨터 프로그래밍에서 인스트루먼테이션은 오류를 진단하거나 추적 정보를 쓰기 위해 제품의 성능 정도를 모니터하거나 측정하는 기능을 가리킨다. 프로그래머들은 시스템에서 특정한 구성 요소를 모니터링하는 코드 명령어 형태로 인스트루먼테이션을 구현할 수 있다.


Producer ACKS

https://www.popit.kr/kafka-%EC%9A%B4%EC%98%81%EC%9E%90%EA%B0%80-%EB%A7%90%ED%95%98%EB%8A%94-producer-acks/


ack = 0일 경우


ack = 1일 경우


ack = -1일 경우


min.insync.replicas 프로듀서가 acks = -1의 브로커에게 메세지를 보낼 때, write를 성공하기 위한 최소 복제본의 수

jjy0918 commented 2 years ago

kafka의 heartbeat

  1. kafka가 poll을 시작하면, heartbeat 스레드를 시작한다.

    
    public boolean poll(Timer timer) {
        maybeUpdateSubscriptionMetadata();
    
        invokeCompletedOffsetCommitCallbacks();
    
        if (subscriptions.hasAutoAssignedPartitions()) {
         ...
               if (!ensureActiveGroup(timer)) {
                    return false;
                }
         }
    ...
    }

...

boolean ensureActiveGroup(final Timer timer) { // always ensure that the coordinator is ready because we may have been disconnected // when sending heartbeats and does not necessarily require us to rejoin the group. if (!ensureCoordinatorReady(timer)) { return false; }

    startHeartbeatThreadIfNeeded();
    return joinGroupIfNeeded(timer);

}

...

private synchronized void startHeartbeatThreadIfNeeded() { if (heartbeatThread == null) { heartbeatThread = new HeartbeatThread(); heartbeatThread.start(); } }


2. heartbeat 스레드는 컨슈머 그룹에서 레코드를 정상적으로 받아 올 수 있는 상태인지 판단한다.

```java
if (state != MemberState.STABLE) {
     // the group is not stable (perhaps because we left the group or because the coordinator
    // kicked us out), so disable heartbeats and wait for the main thread to rejoin.
    disable();
   continue;
}
  1. 브로커에 연결될 수 있는 상태인지 판단한다.

    image
  2. 브로커에 연결되었다면, 세션 타임 아웃과 poll 타임 아웃, 설정한 heartbeatTImeout을 체크한다.

image
  1. 위 조건을 만족한다면 heartbeat를 전송한다.
image
  1. heartbeat를 전송이 성공하는 경우 타이머를 업데이트 한다.

    image image
  2. heartbeat를 전송이 실패하는 경우 리벨런싱 중인지 판단하고, 그렇지 않다면 인터벌 타임을 조절하여 hearbeat를 전송한다.

    image

정리

https://chrzaszcz.dev/2019/06/kafka-heartbeat-thread/

KKambi commented 2 years ago

KStream, KTable, GlobalKTable

https://gunju-ko.github.io/kafka/kafka-stream/2018/05/24/KTable-GlobalKTable.html https://m.blog.naver.com/syam2000/222158302362

핵심 : 레코드의 모음(특정 토픽)을 레코드의 스트림(시간순 중요)으로 추상화해서 데이터를 조회하는 것 ex) Test 토픽의 데이터를 KStream으로 조회한다.


import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;

StreamsBuilder builder = new StreamsBuilder();

KStream<String, Long> wordCounts = builder.stream(
    "word-counts-input-topic", /* input topic */
    Consumed.with(
      Serdes.String(), /* key serde */
      Serdes.Long()   /* value serde */
    )
);

KStream


스크린샷 2022-05-29 오후 3 55 00 KTable


GlobalKTable

추상화된 스트림 간의 Join

Kstream & KTable

KStream, KTable & GlobalKTable

jjy0918 commented 2 years ago

카프카의 기본 설정

Properties

StreamConfig


val props = Properties()
props[StreamsConfig.APPLICATION_ID_CONFIG] = APPLICATION_NAME
props[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = BOOTSTRAP_SERVERS
props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.name
props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.name

... 

val streams = KafkaStreams(builder.build(), props)
streams.start()
image image image image image image image image image
KKambi commented 2 years ago

카프카 스트림즈의 장점

https://www.youtube.com/watch?v=vKxhPUUEDmM

  1. 카프카 호환성 (with 카프카 릴리즈)
  2. 유실 & 중복처리되지 않도록 딱 한 번만 처리되도록 하는 기능
  3. 별도의 스케줄링 도구 X (주로 사용하는 스파크 스트림을 쓰면, 별도의 클러스터 관리자 / 리소스 매니저 필요)
  4. 스트림즈DSL (for 이벤트 데이터 파이프라인) & 프로세서API (스트림즈에 없는 기능 추가 구현) 대부분은 스트림즈DSL로 해결 가능
  5. 자체적인 상태저장소 보유 (rocksDB, 상태에 대한 변환 정보는 카프카의 changelog에 저장)


실시간으로 데이터를 처리하는 방식은 크게 2가지

  1. 비상태기반 처리
    • 데이터가 들어올 때마다 바로 처리하고 프로듀스
    • 필터링, 데이터변환
  2. 상태기반 처리
    • 매우 어려움 (이전에 받았던 데이터를 메모리에 저장하고, 다음 데이터를 참조해서 처리)
    • window, join, aggregation
    • 스트림즈는 상태저장소를 가지고 있으므로 장애 복구에 유리

이것저것

https://engineering.linecorp.com/ko/blog/applying-kafka-streams-for-internal-message-delivery-pipeline/

Fault-tolerance local state DB

Loopback replicator

jjy0918 commented 2 years ago

카프카 미러 메이커 규칙

  1. 단방향 복사
    • 기본적으로 카프카 미러메이커는 단방향으로 복사가 진행된다.
    • 클러스터 A -> 클러스터 B / 클러스터 B -> 클러스터 A
  2. 하나 이상의 Topic 복사
    • 미러 메이커는 기본적으로 1개 이상의 Topic을 복사한다.
  3. 각 Topic은 원격 Topic에 정확하게 복제된다(일대일 매핑)
    • Topic뿐만 아니라 레코드도 동일한 파티션에 정확히 일대일로 매핑되어 복사된다.
  4. 자동 Topic 생성
    • 복사될 Topic이나 그 설정(파티션 수)이 존재하지 않는 경우에 생성하여 복사된다.
  5. 원격 Topic 이름 변경
    • Topic을 복사하면서 생성되는 Topic의 이름은 클러스터명을 포함한다.
    • {원본 클러스터 이름}.{토픽이름}
      • 원본 클러스터 이름이 A이고, 토픽이 topic인 경우 복사되는 토픽의 이름은 A.topic이 된다.
      • clusterA -> clusterB, clusterB -> clusterC 형태가 되는 경우 clusterC의 토픽 이름은 clusterA.clusterB.topicName 이 된다.
        1. 다중 mirror flow
      • 미러메이커2는 기본적으로 양방향을 포함하여 클러스터간 여러 토폴리지를 형성할 수 있다.
      • Fan out(1개의 소스, 여러개의 타겟)
      • 클러스터A -> 클러스터B
      • 클러스터A -> 클러스터C
      • Fan in(여러개의 소스, 1개의 타겟)
      • 클러스터 A -> 클러스터 B
      • 클러스터 C -> 클러스터 B
      • Pipe(여러 클러스터 전달)
      • 클러스터 A -> 클러스터 B -> 클러스터C
      • Bidirectional
      • 클러스터 A -> 클러스터 B
      • 클러스터 B -> 클러스터 A
      • Complex
      • 클러스터 A -> 클러스터 B
      • 클러스터 B -> 클러스터 A
      • 클러스터 C -> 클러스터 B
      • 클러스터 A -> 클러스터 D
        1. 사이클 방지
      • 양방향 클러스터등을 통해 infinity mirror와 같은 현상이 발생할 수 있다.
      • 이러한 것을 방지하기 위해 미러메이커2는 topic으로 필터링하는 기능을 제공한다.
        1. 자신의 클러스터 이름을 가진 토픽 복사 방지
      • 미러메이커2는 기본적으로 토픽에 자기 클러스터 이름이 들어가 있는 경우 복사를 하지 않는다.
      • A클러스터에 B.topic1이 있는 경우 B클러스터는 B.topic1을 복사하지 않는다

출처 : https://www.instaclustr.com/blog/kafka-mirrormaker-2-theory/

KKambi commented 2 years ago

https://hyperconnect.github.io/2021/01/11/cdc-platform.html

카프카와 CDC Platform


CDC Platform에서 활용하는 카프카 커넥트

운영 환경이므로 분산 모드 커넥트만 생각한다. 카프카 커넥트의 아키텍쳐는 크게 3가지 모델로 나뉜다.

  1. Connector Model : 어떤 데이터를 어떤 형식으로 복제할 것인지?
  2. Worker Model : 워커가 태스크를 어떤 로직으로 수행할 것인지?
  3. Data Model : 메세지(데이터)의 형식과 내용은 어떤지?


하나의 카프카 커넥트

image

  1. Datasource로부터 변경된 데이터를 읽음 (Source에 맞는 커넥터 사용)
  2. 데이터를 가공할 수 있는 Transform (특정 값 추가,제거 / 날짜 형태 변경 등)
  3. 가공한 데이터를 Kafka에 전송하기 위한 형태로 변환하는 Converter (Json, ProtoBuf 등의 직렬화 등 -> 최종에는 카프카에 전송하기 위한 ByteArray)


다수의 카프카 커넥트

image


MySQL Source Connector

MySQL은 Create, Drop과 같은 DDL / Insert, Update, Delete와 같은 DML문으로 데이터를 변경할 때 해당 이벤트를 로그 파일로 기록


https://engineering.linecorp.com/ko/blog/line-shopping-platform-kafka-mongodb-kubernetes/ 널리 사용되는 대부분의 DBMS는 데이터 수준의 로그를 남긴다 (MongoDB도 있다고 함)

Kafka 커넥터를 제작할 때 JDBC와 같은 통일된 규격이 존재하지 않음

KKambi commented 2 years ago

ExecutorService

https://www.baeldung.com/java-executor-service-tutorial

ForkJoinPool을 지원하는 fork/join framework와 다른 것!

ExecutorService - 각 스레드가 각자 다른 일을 담당할 경우 fork/join - 각 스레드가 하나의 일을 분할(fork)해서 정복(join)할 경우

단계1 - 인스턴스화

ExecutorService 타입에 인스턴스를 할당해야 한다.

방법1) Executors 클래스의 팩토리 메소드 사용

  1. newCachedThreadPool
  2. newFiexdThreadPool
  3. newScheduledThreadPool
    ExecutorService executor = Executors.newFixedThreadPool(10);

방법2) 직접 ExecutorService 생성 ExecutorService는 인터페이스이므로, 어떤 구현체든 사용할 수 있다. java.util.concurrent 패키지에 정의된 구현체를 사용해도 되고, 직접 정의해도 된다.

// 사실 newSingleThreadExecutor() 팩토리 메소드의 소스 코드와 유사하다.
// 대부분의 케이스에서 팩토리 메소드로 해결 가능
ExecutorService executorService = 
  new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,   
       new LinkedBlockingQueue<Runnable>()
  );

단계2 - 태스크 할당 & 수행

ExecutorService는 Runnable / Callable 타입의 태스크를 실행한다. (책에서 Runnable 인터페이스를 구현한 커스텀 Worker 클래스를 만듬)

  1. 둘 중 한 타입을 구현한 객체를 생성한다.
  2. ExecutorService 객체의 메소드 중 하나를 선택해 태스크를 할당한다.
    1. execute() : task 수행 결과를 얻거나 체크할 수 없음 (return void)
    2. submit() : Future 타입 반환
    3. invokeAny() : 태스크 컬렉션 할당. 모두 실행하고, 컬렉션 중 성공한 하나의 결과를 반환
    4. invokeAll() : 태스크 컬렉션 할당. 모두 실행하고 List<Future> 반환
      executorService.execute(runnableTask);
      Future<String> future = executorService.submit(callableTask);
      String result = executorService.invokeAny(callableTasks);
      List<Future<String>> futures = executorService.invokeAll(callableTasks);

단계3 - ExecutorService 종료

일반적으로 태스크가 없어도 해당 객체는 자동 파괴되지 않음 (생성한 스레드 풀이 잔재)

Oracle이 추천하는 방법. 일정 시간 동안 작업 수행을 기다린 뒤, 이후에도 끝나지 않으면 즉시 종료.

executorService.shutdown();
try {
    if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
        executorService.shutdownNow();
    } 
} catch (InterruptedException e) {
    executorService.shutdownNow();
}


Future 인터페이스

submit()과 invokeAll()의 반환 객체. 비동기 작업의 수행 결과를 담는 인터페이스.


Executors 팩토리 메소드에 파라미터를 넘기지 않으면?

Q. 생성된 Pool은 몇 개의 스레드를 갖고 있는 걸까요? A. 스레드 풀의 종류마다 다르다

키바나 인덱스 패턴

엘라스틱서치

키바나

class MetricHelper:
    def __init__(self):
        self.host = '엘라스틱 서치 url'
        self.port = 9200
        self.es = Elasticsearch([{'host': self.host, 'port': self.port}, ], timeout=10)
        self._index = "item_count-{:%Y.%m}".format(datetime.now(timezone('Asia/Seoul')))

이 경우 월별로 다른 인덱스에 데이터를 저장하게 될 것이다. 이 때 키바나에서 모든 item_count 인덱스들의 데이터를 시각화하고 싶다면, 인덱스 패턴을 item_count-*로 설정하게 된다.

jjy0918 commented 2 years ago

무중단 배포 방법

롤링

image

블루-그린

image

카나리

image

https://www.koyeb.com/blog/blue-green-rolling-and-canary-continuous-deployments-explained

jjy0918 commented 2 years ago

스프링 카프카 AutoConfiguration

KafkaProperties

image

KafkaAutoConfiguration

image image

AutoConfiguration 과정

ConfigurationProperties

@ConfigurationProperties("my.service")
public class MyProperties {

    private boolean enabled;

    private InetAddress remoteAddress;

    private final Security security = new Security();

    // getters / setters...

    public static class Security {

        private String username;

        private String password;

        private List<String> roles = new ArrayList<>(Collections.singleton("USER"));

        // getters / setters...

    }

}

AutoConfiguration

ConditionalOnClass / ConditionalOnMissingBean

springboot.autoconfiguration

https://docs.spring.io/spring-boot/docs/current/reference/html/features.html#features.external-config.yaml

KKambi commented 2 years ago

Future 좀 더 파헤치기

Future와 작업 결과

image Future는 비동기 작업의 결과를 담는 역할


비동기 작업의 결과를 콜백 함수로 처리하기 위해선 다양한 방법이 있다.

ListenableFuture

image 기존 Future처럼 사용하되, 객체 생성 후 callback을 나중에 등록할 수 있음

CompleteableFuture

작업이 완료될 것을 가정하고 fluent한 함수형 프로그래밍 스타일로 코드를 작성할 수 있는 future

Hadoop

분산처리 시스템을 제공하는 아파치 재단의 오픈 소스 프레임워크

Amazon S3, 네이버에서 우리가 사용하는 스토리지 머더라 그것도 분산 파일 시스템

장점

단점