LuterGS / TIL

today I learned
2 stars 0 forks source link

2023.11.20 - inside ksqlDB (Stateless operation) #15

Open LuterGS opened 1 year ago

LuterGS commented 1 year ago

Confluent Developer - ksqlDB's Architecture 의 글을 번역한 것임을 알립니다.

ksqlDB 의 구조

전형적인 스트리밍 파이프라인은 데이터 소스가 있는 데이터베이스로부터 이벤트를 발행하는 카프카 커넥터와 어플리케이션, 이벤트를 걸러내고 모으는 (aggregate) 스트림 프로세서와, 분석엔진이 접근할 수 있도록 이벤트를 저장하는 저장 장치가 함께합니다.

이건 일반적인 구조이고 잘 작동하지만, 장애가 일어나기 쉽고 확장성, 보안, 모니터링, 디버깅 및 운영을 하나로 통합하기 어렵게 만드는 움직이는 부품들로 구성되어 있습니다.

ksqlDB 구조는 더 간결합니다. 위에서 설명한 많은 외부 구조가 툴 자체로 통합되어있습니다. ksqlDB 는 connector 를 위한 기본 요소가 있고, 자체로도 스트림 프로세싱을 처리합니다. 또한 자체적으로 materialized view 를 수행해 데이터를 외부 소스로 내보낼 필요 없이 데이터베이스처럼 데이터를 쿼리할 수 있습니다.

사실, ksqlDB 는 postgreSQL 와 같은 전통적인 관계형 데이터베이스가 쿼리 실행, 인덱싱, 동시성 제어, 장애 회복과 같은 것들을 직관적인 인터페이스로 숨기고 있는 것처럼 복잡한 구조를 숨기고 있습니다.

ksqlDB 가 상태를 관리하기 위해 kafka 에 의존하기 때문에, 최종적으로 "연산 (ksqlDB) 과 저장소 (kafka)" 의 두 단계 아키텍처로 정리됩니다. 그리고 각자는 서로에 상관없이 스케일 가능합니다.

여러분은 관게형 데이터베이스처럼 하나의 ksqlDB 서버 혹은 여러 서버에 클라이언트-서버 CLI 를 구성해 접근하거나, web UI, REST API 혹은 Java와 같은 프로그래밍 언어를 통해 접근할 수 있습니다.




materialized view (구체화된 view) : (ksqlDB - Materialized Views 에서 참조) 어느 데이터베이스에서나 테이블의 역할은 데이터에 대해 효과적인 접근을 요청하기 위해서입니다. ksqlDB 는 이벤트를 간단한 key-value 구조를 사용해 Kafka 에 변경불가능하게 저장합니다. 그럼 어떻게 이 모델 (Kafka 에 key-value 구조로 저장) 에서 쿼리를 효과적으로 구성할 수 있을까요? 답은 materialized view 를 활용하는 것입니다.

Stream/Table 이중성 stream 과 table 은 아주 가깝게 연관되어 있습니다. stream 는 이벤트의 순서로써 table 을 생성할 수 있는 것입니다. 예를 들어, 대출 신청자의 신용 점수는 시간에 따라 변합니다. 이 때, 신용 점수의 순서는 stream 을 의미합니다. 이 때 현재 신청자의 신용 점수를 표현하기 위해 stream 을 table 로 변환할 수 있습니다.

반대로, 실제 신청자의 현재 신용 점수를 의미하는 table 은 실제로 두 가지의 값이 필요합니다. 각 신청자에 대한 현재 신용 점수와, 신청자에 대한 신용 점수의 변동 순서입니다.

전통적인 데이터베이스는 redo log 가 존재하지만, 변경 기록을 구독하는 것은 번거롭습니다. redo log 는 일반적으로 Kafka 의 changelog topic 보다 훨씬 짧은 retention 을 가집니다. 완전히 압축된 Kafka changelog topic 은 데이터베이스 스냅샷과 동일하다고 할 수 있습니다. 효과적인 쿼리는 여기서 변경점만을 추출해냅니다.

Materialized Views materialized view 의 장점은 전체 테이블에 대해 쿼리를 수행하는 것이 아닌, 변경점에 대한 쿼리를 수행한다는 것입니다.

새로운 이벤트가 발행되었을 때, 현재 상태의 view 는 새로운 상태로 변화합니다. 이 변화는 해당 view 를 정의한 aggregation function 을 수행함으로써 정의됩니다. 이 경우에 새로운 이벤트가 발행되더라도 view 는 "전체를 재계산" 하지 않습니다. 대신에 view 는 새로운 정보에 대해서만 값을 조정하며, 이 구조는 이 view 에 대한 쿼리가 매우 효율적임을 의미합니다.

ksqlDB 에서, table 은 view 로 구체화 (materialize) 될 수도 있고, 아닐 수도 있습니다. 만약 table 이 kafka topic 에서 바로 만들어졌다면, 구체화되지 않습니다. 구체화되지 않은 테이블은 비효율적이기 때문에 쿼리가 불가능합니다. 그러나 table 이 다른 collection (다른 table 혹은 stream) 에서 만들어졌다면, ksqlDB 는 그 결과를 구체화하기 때문에 해당 table 에 대해 쿼리가 가능합니다.

ksqlDB 는 각 table 의 구성 요소를 모두 저장함으로써 stream/table 이중성이라는 개념을 활용합니다. 현재 table 의 상태는 RocksDB 를 사용해 ksqlDB 서버 내부에 로컬 및 임시로 저장됩니다. table 에 반영된 변경 기록들은 kafka topic 에 저장되며, kafka broker 에 의해 복제됩니다. 만약 ksqlDB 가 table 의 구체화 (materialization) 에 실패한다면, 새로운 ksqlDB 서버는 kafka changelog 에서 값을 읽어 재구체화 (rematerialization) 합니다.

LuterGS commented 1 year ago

How Stateless Operations Work

다음과 같은 ksqlDB SQL 문이 있다고 합시다.

CREATE STREAM readings (
    sensor VARCHAR KEY,
    reading DOUBLE,
    location VARCHAR
) WITH (
    kafka_topic='readings',
    value_format='json',
    partitions=3
);

위 예제 스키마는 sensor, reading, location 3개로 이루어진 칼럼을 포함하고 있고, 그 타입도 string (varchar), double 을 통해 별도로 정의해주고 있습니다. 또한 sensor 칼럼은 KEY 키워드도 표시되어 있습니다.

추가적인 메타데이터도 있습니다. 위 명령은 어느 Kafka topic 에 (readings) 데이터를 저장할 것인지 나타내고 있고, 직렬화할 포맷 (JSON) 과 topic 의 partition 개수도 표시하고 있습니다. 그럼 이 SQL 문을 ksqlDB 명령줄에 실행했을 때, 내부적으론 어떤 일이 일어날까요?

ksqlDB 서버는 Kafka broker 와 직접적으로 통신합니다. 만약 제시한 topic 이 존재하지 않을 경우엔 kafka 가 새로 topic 을 생성합니다. 타입과 같은 추가 메타데이터는 ksqlDB 의 오브젝트 카탈로그에 저장됩니다. 따라서 명령이 살행되고 난 이후엔, Kafka 에 3개의 partition 을 가진 readings 라는 topic 이 생성됩니다.

다음으로, 이 비어있는 stream 에 데이터를 넣어봅시다. Kafka의 메시지와 동일한, row 를 사용함으로써 데이터를 넣을 수 있습니다. 관계형 데이터베이스와 같이 간단한 SQL 문을 통해 stream 에 row 를 넣을 수 있습니다. 만약 당신이 PostgreSQL 이나 MySQL 과 같은 관계형 데이터베이스를 사용해봤다면, 아래 구문이 익숙할 것입니다.

INSERT INTO readings (sensor, reading, location)
VALUES ('sensor-1', 45, 'wheel');  

INSERT INTO readings (sensor, reading, location)
VALUES ('sensor-2', 41, 'motor');  

따라서 readings stream 에 값을 넣어줄 column 까지 직접 정의해서 넣어줬다고 생각해봅시다. 각 INSERT 구문은 kafka producer call 과 동일하며, 데이터는 stream 의 topic 에 저장됩니다.

이제 kafka topic 안에 있는 ksqlDB row 에 대해 표현해 보겠습니다. 첫 INSERT 문을 확인해봅시다.

해당 SQL 은 모든 kafka record 처럼, readings topic 의 첫 번째 파티션의 첫 번째 row, offset 이 0 인 데이터가 됩니다. keysensor-1 이 되고 (stream 생성 시 sensor 필드에 KEY 파라미터를 넣어줬기 때문에), value 는 reading 이 45, locationwheel 인 map 이 됩니다. 각 row 는 key 의 해싱된 값을 기반으로 partition 에 분배됩니다. 동일한 key 를 가진 row 는 같은 partition 에 쓰여집니다.




A Simple Filter

이제 stream 에 데이터가 있다고 가정하고 실시간 프로세싱을 해 봅시다. 가장 간단한 작업은 변환으로, 다음 SQL 구문에선 location 필드를 모두 대문자로 변경합니다.

CREATE STREAM clean AS
    SELECT sensor,
        reading,
        UCASE(location) AS location
    FROM readings
    EMIT changes;

이 쿼리는 readings stream 에서 값을 읽어 location 을 대문자로 변경하고, 새로운 stream 인 clean 에 저장됩니다.

이 쿼리를 실행하면 ksqlDB 서버로 보내지고, 쿼리를 실행하기 위해 ksqlDB 에 의해 Kafka Streams topology 로 변환됩니다. 이 쿼리는 클라이언트의 연결이 끊어지더라도 계속 작동하고 ksqlDB 가 재시작하면 같이 재시작됩니다. 우린 이걸 persistent query 라고 부릅니다.

각 row 가 진행됨에 따라 ksqlDB consumer 의 offset 은 각 partition 에서 최신 상태로 업데이트됩니다. 이 offset 은 kafka consumer 프로토콜에 따라 트래킹됩니다.




Combining Filters

ksqlDB 는 stream 에 대해 필터링 조건을 걸 수 있습니다. 아래 예시는 직전에 생성한 clean stream 에서 reading 의 값이 41보다 큰 값만을 걸러내는 구문입니다.

CREATE STREAM high_readings AS
    SELECT sensor, reading, location
    FROM clean
    WHERE reading > 41
    EMIT CHANGES;

여러 동작을 한 번에 SQL 문으로 구현할 수 있습니다. 아래 구문은 대문자 변환 및 필터링을 한 번에 수행할 수 있게 합니다.

CREATE STREAM high_pri AS
    SELECT sensor,
        reading, 
        UCASE(location) AS location
    FROM readings
    WHERE reading > 41
    EMIT CHANGES;




Multiple Queries Reading the Same Data

ksqlDB SQL 문이 kafka streams topology 로 변환되어 실행되기 때문에, kafka consumer 가 보장하는 것들을 모두 보장합니다. 여러 개의 persistent query 가 하나의 stream 으로부터 값을 읽어올 때도, 각 persistent query 는 독립적인 kafka consumer 로 작동하기 때문에, 고유의 offset 을 가지게 되고 서로 간섭하지 않습니다. 각 persistent query 가 수행하는 데이터는 원본 (source) kafka topic 에 한 번만 존재하고, kafka 는 각 query 에 대해 독립적으로 offset 을 추적합니다. 이 방식으로 각 query 의 의존성을 생각할 필요 없이 하나의 데이터를 많은 어플리케이션에서 consume 할 수 있습니다.

LuterGS commented 1 year ago

Kafka Streams topology 변환 : Under the hood, the engine parses your SQL statements and builds corresponding Kafka Streams topologies. --> KsqlEngine.java