Open kmg28801 opened 1 year ago
Schema는 정보를 구성하고 해석하는 것을 도와주는 Framework나 개념을 의미한다. 스키마 정의를 통해 데이터베이스의 관리 효율성이 높아지며 데이터 충돌을 방지한다. Kafka에서도 토픽으로 전송되는 메세지에 대해 미리 스키마를 정의한 후 전송 함으로 데이터베이스에서 얻을 수 있는 동일한 효과를 얻을 수 있다.
스키마가 미리 정의되어 있고 반드시 사전에 정의된 스키마의 형태로 데이터를 입력해야 하며, 사전에 정의된 스키마의 내용과 다른 데이터를 추가하려고 시도한다면 작업은 실패하게 된다.
이렇게 스키마를 정의해 두면 데이터 트러블슈팅 감소, 용이한 데이터 포맷 확인, 데이터 스키마 관련 커뮤니케이션 감소 등 얻을 수 있는 이점들이 많기 때문에 카프카에서 스키마 사용은 권장하고 있다.
![2번 프로듀서가 갑자기 메시지 스키마를 변경하여 쓴 상황. 컨슈머는 스키마 변경에 대응하지 못하고 4번 메시지를 처리할 수 없다.]
2번 프로듀서가 갑자기 메시지 스키마를 변경하여 쓴 상황. 컨슈머는 스키마 변경에 대응하지 못하고 4번 메시지를 처리할 수 없다.
컨슈머가 제대로 읽어드리지 못하는 이유는 프로듀서는 직렬화하여 메시지를 발행하고, 컨슈머는 역직렬화하여 메시지를 구독하기 때문입니다. 따라서 프로듀서와 컨슈머에 각각 메시지 구조(스키마)에 따라 직렬화/역직렬화 클래스가 구성되고, 이 둘은 강한 의존성(커플링, Coupling)을 갖게 됩니다. 결국, 구조적인 결합도는 낮췄지만 내부적인 결합도는 여전히 가지고 있게 됩니다.
스키마 레지스트리는 이 결합도를 낮추기 위해 고안되었다.
스키마 레지스트리는 컨플루언트 커뮤니티 라이선스를 갖고 있는데, 비상업적인 용도에 한해 스키마 레지스트리를 무료로 사용할 수 있다. 카프카와 별도로 구성된 독립적 어플리케이션이다.
클라이언트들이 스카마 정보를 사용하기 위해서는 프로듀서와 컨슈머, 스키마 레지스트리 간 직접 통신이 이뤄져야 한다.
에이브로는 시스템, 프로그래밍 언어, 프로세싱 프레임워크 사이에서 데이터 교환을 도와주는 오픈소스 직렬화 시스템이다. 대중적으로 많이 사용하는 포맷은 JSON이지만 컨플루언트는 아래의 사유로 인해 에이브로 포맷 사용을 권장한다.
{
"type": "record", ➊
"name": "Student", ➋
"namespace": "student.avro", ➌
"doc": "This is an example of Avro", ➍
"fields": [ ➎
{
"name": "name",
"type": "string",
"doc": "Name of the student"
},
{
"name": "class",
"type": "int",
"doc": "Class of the student"
} ➏
]
}
번호 | Description |
---|---|
➊ type | 에이브로는 record, enums, arrays, maps 등을 지원하며, 여기서는 record 타입으로 정의 |
➋ name | 레코드의 이름을 나타내는 문자열로서, 필숫 값 |
➌ namespace | 이름을 식별하는 문자열 |
➍ doc | 사용자들에게 이 스키마 정의 대한 설명 제공(주석) |
➎ fields | JSON 배열로서, 필드들의 리스트를 뜻함 |
➏ name | 필드의 이름 |
[Docker SR, SRU 생성 실습]
docker run -p 8081:8081 \
-e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=host.docker.internal:9092, host.docker.internal:9093, host.docker.internal:9094 \
-e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
-e SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081 \
-d confluentinc/cp-schema-registry:latest
환경 변수 | Description |
---|---|
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS | 연결하고자 하는 Kafka Broker 주소, 해당 인자 값은 콤마(,) 기준으로 N개 입력 가능 |
SCHEMA_REGISTRY_HOST_NAME | 해당 서비스의 명칭을 입력하는 Alias |
SCHEMA_REGISTRY_LISTENERS | 외부 입력 도메인 |
docker run -p 8000:8000 \
-e SCHEMAREGISTRY_URL=http://kubernetes.docker.internal:8081 \
-e SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS=GET,POST,PUT,OPTIONS,HEAD \
-e SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN='*' \
-e PROXY=true \
-d landoop/schema-registry-ui:latest
환경 변수 | Description |
---|---|
SCHEMAREGISTRY_URL | SR의 서비스 명(Local환경에서 연결 시, /etc/hosts를 변경해야 함, Default : localhost:8081) |
SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS | REST API를 사용하기 위한 허용 메소드 옵션 |
SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN | REST API를 행위를 허용 |
PROXY | CORS 문제가 있거나 방화벽을 통과하고 서버를 공유하려는 경우 PROXY옵션을 추가 해야함, SR과 SRU 연동 시 필수 옵션 |
프록싱을 할 때 SCHEMAREGISTRY_URL해당 IP 주소 또는 이에 대해 확인할 수 있는 도메인을 사용해야 함. Localhost에서 SR을 제공하더라도 사용할 수 없다. Docker Container에 자체 Network가 있으므로 Localhost와 Container 내 Localhost는 다르기 때문이다. | | PROXY_SKIP_VERIFY | 스키마 레지스트리가 자체 서명된 SSL 인증서를 사용하는 경우 PROXY_SKIP_VERIFY=true환경 변수를 사용하여 백엔드 TLS 인증서를 확인하지 않도록 프록시에 지시 가능하다. | | CADDY_OPTIONS | 이미지를 구동하는 웹서버인 CADDY 옵션 설정, 현재 Kubernetes에서는 시간 초과를 적용하지 않는 옵션 |
zayden@Justin-MacBook-Pro ~ docker run -p 8081:8081 \
-e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=host.docker.internal:9092,host.docker.internal:9093,host.docker.internal:9094 \
-e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
-e SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081 \
-d confluentinc/cp-schema-registry:latest
649504c9981d353aee1193a3b765d78d8b913899aeb002f0b1be78fa199629da
zayden@Justin-MacBook-Pro ~ docker run -p 8000:8000 \
-e SCHEMAREGISTRY_URL=http://kubernetes.docker.internal:8081 \
-e SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS=GET,POST,PUT,OPTIONS,HEAD \
-e SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN='*' \
-e PROXY=true \
-d landoop/schema-registry-ui:latest
467f80c6d2dad32e7da3693d10eaa626330d5720d32eafb303782a2f44fde869
zayden@Justin-MacBook-Pro ~ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
467f80c6d2da landoop/schema-registry-ui:latest "/run.sh" 6 minutes ago Up 6 minutes 0.0.0.0:8000->8000/tcp beautiful_colden
649504c9981d confluentinc/cp-schema-registry:latest "/etc/confluent/dock…" 6 minutes ago Up 6 minutes 0.0.0.0:8081->8081/tcp infallible_lichterman
SR이 Kafka Broker와 정상적으로 연결되지 않았을 경우
SR이 Kafka Broker와 정상적으로 연결되었을 경우
Field Name | Description |
---|---|
names | |
pace | 이름을 식별하는 문자열 |
type | 에이브로는 record, enums, arrays, maps 등을 지원하며 여기서는 record 타입으로 정의 |
doc | 사용자들에게 이 스키마 정의 대한 설명 제공(주석) |
name | 레코드의 이름을 나타내는 문자열로서, 필숫값 |
fields | JSON 배열로 필드들의 리스트를 뜻함 |
name | 필드의 이름, |
type : boolean, int, long, string 등의 데이터 타입 정의 doc : 사용자 들에게 해당 필드의 의미 설명(주석) |
schema.compatibility.level | 스키마 호환성 레벨을 설정한다. |
---|
[GKE SR, SRU 생성 실습]
Kafka와 Kafka Client 생성 상태 확인
jeonghyeons130@cloudshell:~ (kubernetes-project-397206)$ kubectl get all -n messagequeue
NAME READY STATUS RESTARTS AGE
pod/kafka-master-client 1/1 Running 0 14m
pod/kafka-master-controller-0 1/1 Running 0 15m
pod/kafka-master-controller-1 1/1 Running 0 15m
pod/kafka-master-controller-2 1/1 Running 0 15m
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/kafka-master ClusterIP 10.68.5.159 <none> 9092/TCP 15m
service/kafka-master-controller-headless ClusterIP None <none> 9094/TCP,9092/TCP,9093/TCP 15m
NAME READY AGE
statefulset.apps/kafka-master-controller 3/3 15m
schemaregistryservice.yml
apiVersion: v1
kind: Service
metadata:
name: messagequeue-schema-registry-service
namespace: messagequeue
spec:
selector:
app: schema-registry
type: NodePort
ports:
- protocol: TCP
port: 80
targetPort: 8081
schemaregistrydeployment.yml
apiVersion: apps/v1
kind: Deployment
metadata:
name: schema-registry
namespace: messagequeue
labels:
app: schema-registry
spec:
replicas: 1
selector:
matchLabels:
app: schema-registry
template:
metadata:
labels:
app: schema-registry
spec:
containers:
- name: schema-registry
image: confluentinc/cp-schema-registry:7.0.1
imagePullPolicy: Always
resources:
requests:
cpu: 250m
memory: 512Mi
limits:
cpu: 500m
memory: 1024Mi
env:
- name: SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS
value: kafka-master.messagequeue.svc.cluster.local**:9092**
- name: SCHEMA_REGISTRY_HOST_NAME
value: schema-registry
- name: SCHEMA_REGISTRY_LISTENERS
value: http://0.0.0.0:8081
ports:
- containerPort: 8081
환경 변수 | Description |
---|---|
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS | 연결하고자 하는 Kafka Broker 주소, 해당 인자 값은 콤마(,) 기준으로 N개 입력 가능 |
SCHEMA_REGISTRY_HOST_NAME | 해당 서비스의 명칭을 입력하는 Alias |
SCHEMA_REGISTRY_LISTENERS | 외부 입력 도메인 |
생성 상태 확인
jeonghyeons130@cloudshell:~ (kubernetes-project-397206)$ kubectl create -f schemaregistryservice.yml
service/messagequeue-schema-registry-service created
jeonghyeons130@cloudshell:~ (kubernetes-project-397206)$ kubectl create -f schemaregistrydeployment.yml
deployment.apps/schema-registry created
jeonghyeons130@cloudshell:~ (kubernetes-project-397206)$ kubectl get all -n messagequeue
NAME READY STATUS RESTARTS AGE
pod/kafka-master-client 1/1 Running 0 17m
pod/kafka-master-controller-0 1/1 Running 0 18m
pod/kafka-master-controller-1 1/1 Running 0 18m
pod/kafka-master-controller-2 1/1 Running 0 18m
pod/schema-registry-677db666c5-r6scg 1/1 Running 0 59s
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/kafka-master ClusterIP 10.68.5.159 <none> 9092/TCP 18m
service/kafka-master-controller-headless ClusterIP None <none> 9094/TCP,9092/TCP,9093/TCP 18m
service/messagequeue-schema-registry-service NodePort 10.68.13.137 <none> 80:32189/TCP 70s
NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/schema-registry 1/1 1 1 59s
NAME DESIRED CURRENT READY AGE
replicaset.apps/schema-registry-677db666c5 1 1 1 59s
NAME READY AGE
statefulset.apps/kafka-master-controller 3/3 18m
apiVersion: apps/v1
kind: Deployment
metadata:
name: schema-registry-ui
namespace: messagequeue
labels:
app: schema-registry-ui
spec:
replicas: 1
selector:
matchLabels:
app: schema-registry-ui
template:
metadata:
labels:
app: schema-registry-ui
spec:
containers:
- name: schema-registry-ui
image: landoop/schema-registry-ui:0.9.5
imagePullPolicy: Always
resources:
requests:
cpu: 250m
memory: 512Mi
limits:
cpu: 500m
memory: 1024Mi
env:
- name: SCHEMAREGISTRY_URL
value: "http://messagequeue-schema-registry-service"
- name: SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS
value: "GET,POST,PUT,OPTIONS,HEAD"
- name: SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN
value: '*'
- name: PROXY
value: "true"
- name: PROXY_SKIP_VERIFY
value: "true"
- name: CADDY_OPTIONS
value: "timeouts none"
ports:
- containerPort: 8000
환경 변수 | Description |
---|---|
SCHEMAREGISTRY_URL | SR의 서비스 명(Local환경에서 연결 시, /etc/hosts를 변경해야 함, Default : localhost:8081) |
SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS | REST API를 사용하기 위한 허용 메소드 옵션 |
SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN | REST API를 행위를 허용 |
PROXY | CORS 문제가 있거나 방화벽을 통과하고 서버를 공유하려는 경우 PROXY옵션을 추가 해야함, SR과 SRU 연동 시 필수 옵션 |
프록싱을 할 때 SCHEMAREGISTRY_URL해당 IP 주소 또는 이에 대해 확인할 수 있는 도메인을 사용해야 함. Localhost에서 SR을 제공하더라도 사용할 수 없다. Docker Container에 자체 Network가 있으므로 Localhost와 Container 내 Localhost는 다르기 때문이다. | | PROXY_SKIP_VERIFY | 스키마 레지스트리가 자체 서명된 SSL 인증서를 사용하는 경우 PROXY_SKIP_VERIFY=true환경 변수를 사용하여 백엔드 TLS 인증서를 확인하지 않도록 프록시에 지시 가능하다. | | CADDY_OPTIONS | 이미지를 구동하는 웹서버인 CADDY 옵션 설정, 현재 Kubernetes에서는 시간 초과를 적용하지 않는 옵션 |
jeonghyeons130@cloudshell:~ (kubernetes-project-397206)$ kubectl get all -n messagequeue
NAME READY STATUS RESTARTS AGE
pod/kafka-master-client 1/1 Running 0 56m
pod/kafka-master-controller-0 1/1 Running 0 57m
pod/kafka-master-controller-1 1/1 Running 0 57m
pod/kafka-master-controller-2 1/1 Running 0 57m
pod/schema-registry-677db666c5-vpk6p 1/1 Running 0 12s
pod/schema-registry-ui-67f97fc85-5szrx 1/1 Running 0 3m59s
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/kafka-master ClusterIP 10.68.5.159 <none> 9092/TCP 57m
service/kafka-master-controller-headless ClusterIP None <none> 9094/TCP,9092/TCP,9093/TCP 57m
service/messagequeue-schema-registry-service NodePort 10.68.13.137 <none> 80:32189/TCP 40m
NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/schema-registry 1/1 1 1 40m
deployment.apps/schema-registry-ui 1/1 1 1 3m59s
NAME DESIRED CURRENT READY AGE
replicaset.apps/schema-registry-677db666c5 1 1 1 40m
replicaset.apps/schema-registry-ui-67f97fc85 1 1 1 3m59s
NAME READY AGE
statefulset.apps/kafka-master-controller 3/3 57m
zayden@Justin-MacBook-Pro ~ curl -X GET http://localhost:8081/config
{"compatibilityLevel":"BACKWARD"}%
Option | Description |
---|---|
GET /schemas | 현재 스키마 레지스트리에 등록된 전체 스키마 리스트 조회 |
GET /schemas/ids/id | 스키마 ID로 조회 |
GET /schemas/ids/id/versions | 스키마 ID의 버전 |
GET /subjects | 스키마 레지스트리에 등록된 subject 리스트 |
subject는 토픽이름-key, 토픽이름-value 형태로 쓰임 | |
GET /subjects/서브젝트이름/versions | 특정 서브젝트의 버전 리스트 조회 |
GET /config | 전역으로 설정된 호환성 레벨 조회 |
GET /config/서브젝트이름 | 서브젝트에 설정된 호환성 조회 |
DELETE /subjects/서브젝트이름 | 특정 서브젝트 전체 삭제 |
DELETE /subjects/서브젝트이름/versions/버전 | 특정 서브젝트에서 특정 버전만 삭제 |
번호 | Description |
---|---|
➊ | 에이브로 프로듀서는 컨플루언트에서 제공하는 io.confluent.kafka.serializers.KafkaAvroSerializer라는 새로운 직렬화를 사용해 스키마 레지스트리의 스키마가 유효한지 여부를 확인한다. 만약 스키마가 확인되지 않으면 에이브로 프로듀서는 스키마를 등록하고 캐시한다. |
➋ | 스키마 레지스트리는 현 스키마가 저장소에 저장된 스키마와 동일한 것인지, 진화한 스키마인지 확인한다. 스키마 레지스트리 자체적으로 각 스키마에 대한 고유 ID를 할당하게 된다. 이 ID는 순차적으로 1씩 증가하지만 반드시 연속적이진 않는다. 스키마에 문제가 없다면 스키마 레지스트리는 프로듀서에게 고유 ID를 응답한다. |
➌ | 이제 프로듀서는 스키마 레지스트리로부터 받은 스키마 ID를 참고해 메세지를 카프카로 전송한다. 이대 프로듀서는 스키마의 전체 내용이 아닌 오로지 메세지와 스키마 ID만 보낸다. JSON은 Key:Value형태로 전체 메세지를 전송해야 하지만 에이브로를 사용하면 프로듀서가 스키마 ID와 Value만 메세지로 보내게 되어 카프카로 전송하는 전체 메세지의 크기를 줄일 수 있으며, 이는 JSON보다 에이브로를 사용하는 편이 더 효율적인 이유이기도 하다. |
➍ | 에이브로 컨슈머는 스키마 ID로 컨플루언트에서 제공하는 io.confluent.kafka.serializers.KafkaAvroDeserializer라는 새로운 역직렬화를 사용해 카프카의 토픽에 저장된 메세지를 읽는다. 이때 컨슈머가 스키마 ID를 갖고 있지 않다면 스키마 레지스트리로부터 가져온다. |
진화된 스키마를 적용한 컨슈머가 진화 전의 스키마가 적용된 프로듀서가 보낸 메세지를 읽을 수 있도록 허용하는 호환성을 말한다. 스키마의 버전 업데이트가 필요하다면 프로듀서와 컨슈머의 스키마도 업데이트해 줘야 하는 데, BACKWARD 호환성에서는 먼저 상위 버전의 스키마를 컨슈머에게 적용하고 난 뒤에 프로듀서에게 상위 버전의 스키마를 적용해야 한다. 만약 모든 하위 버전의 스키마를 호환하고자 한다면 호환성 타입을 BACKWARD_TRANSITIVE로 설정해야 한다.
호환성 레벨 | 지원 버전(현재 최신 버전 : 3) | 변경 허용 항목 | 스키마 업데이트 순서 |
---|---|---|---|
BACKWARD | 자신과 동일한 버전과 하나 아래의 하위 버전(버전 3으로 버전 2도 처리 가능) | 필드 삭제, 기본 값이 지정된 필드 추가 | 컨슈머 → 프로듀서 |
BACKWARD_TRANSITIVE | 자신과 동일한 버전을 포함한 모든 하위 버전(버전 3으로 버전 2, 버전 1 처리가능) | 필드 삭제, 기본 값이 지정된 필드 추가 | 컨슈머 → 프로듀서 |
진화된 스키마가 적용된 프로듀서가 보낸 메세지를 진화 전의 스키마가 적용된 컨슈머가 읽을 수 있게 하는 호환성을 말한다.
호환성 레벨 | 지원 버전(현재 최신 버전 : 3) | 변경 허용 항목 | 스키마 업데이트 순서 |
---|---|---|---|
FORWARD | 자신과 동일한 버전과 하나 위의 상위 버전(버전 2로 버전 3도 처리 가능) | 필드 추가, 기본 값이 지정된 필드 삭제 | 프로듀서 → 컨슈머 |
FORWARD_TRANSITIVE | 자신과 동일한 버전을 포함한 모든 하위 버전(버전2로 버전 3과 그 이상 처리 가능) | 필드 추가, 기본 값이 지정된 필드 삭제 | 프로듀서 → 컨슈머 |
BACKWARD와 FORWARD 호환성 모두를 지원한다. 가장 최근 2개의 버전 스키마를 지원하며, 모든 버전의 스키마를 호환하고자 한다면 FULL_TRANSITIVE로 설정해야 한다.
호환성 레벨 | 지원 버전(현재 최신 버전 : 3) | 변경 허용 항목 | 스키마 업데이트 순서 |
---|---|---|---|
FULL | 자신과 동일한 버전과 하나 위 또는 하나 아래 버전(버전2로 버전 1 또는 버전 3 처리 가능) | 기본 값이 지정된 필드 추가, 기본 값이 지정된 필드 삭제 | 상관 없음 |
FULL_TRANSITIVE | 자신과 동일한 버전을 포함한 모든 상위 버전과 하위 버전(버전 번호 무관하게 모든 버전 처리 가능) | 기본 값이 지정된 필드 추가, 기본 값이 지정된 필드 삭제 | 상관 없음 |
출처
https://always-kimkim.tistory.com/entry/kafka101-schema-registry
https://seojeonghyeon0630.notion.site/10-7cbcd01d9dd245ab85e00a2c584b8bb4?pvs=4