Open easyfordev opened 1 year ago
이번 범위도 별로 정리할 게 없어서... 카프카 관련해서 검색하다 나온 글 정리 내용이 어렵지 않아 이해하기 쉬움!
검색 서비스를 고도화하면서, 다양한 정보를 검색 시스템에 색인하기 위해 여러 카프카 토픽 운영
리밸런싱 원인
1) max.poll.interval.ms
max.poll.records
결국, poll() 메소드를 호출했을 때 가져온 max.poll.records
개의 레코드를 max.poll.interval.ms
의 시간 동안 처리하지 못해 리밸런싱이 일어난다.
개선1 한 번에 가져오는 레코드의 수를 줄여 리밸런싱이 일어나지 않게 했다. (근본적 해결 X)
spring:
kafka:
consumer:
max-poll-records: 100
개선2 컨슈머가 담당하는, 검색 색인 프로세스의 throughput을 늘리는 것이 더욱 근본적인 해결 방법이다.
spring:
kafka:
listener:
type: batch
// Batch + Acknowledge + ConsumerAware
public class BatchMessagingMessageListenerAdapter<K, V> extends MessagingMessageListenerAdapter<K, V> implements BatchAcknowledgingConsumerAwareMessageListener<K, V> {
@Override
public void onMessage(List<ConsumerRecord<K, V>> records, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
...
}
다양한 정보를 조합해 검색 시스템에 색인한다
→ 조합을 위해 Kafka Streams 사용
Stream DSL을 이용한 순수 자바 파이프라인
public class StreamsProcessor {
private static final String sourceTopicA = "MSG-SOURCE-A";
private static final String sourceTopicB = "MSG-SOURCE-B";
private static final String mergedTopic = "MSG-MERGED";
public static void buildPipeline() {
StreamsBuilder builder = new StreamsBuilder();
KTable<String, SourceAMessage> sourceATable
= builder.table(sourceTopicA, Consumed.with(
STRING_SERDE,
CustomSerdes.sourceAMessageSerde()
));
KStream<String, SourceBMessage> sourceBStream
= builder.stream(sourceTopicB, Consumed.with(
STRING_SERDE,
CustomSerdes.sourceBMessageSerde()
));
sourceBList.join(sourceATable, joiner)
.filter((key, value) -> Objects.nonNull(value))
.to(mergedTopic);
KafkaStreams streams = new KafkaStreams(builder.build(), kafkaProperties);
streams.start();
}
public static void main(String[] args) {
buildPipeline();
}
}
Spring ApplicationListener를 사용하여 결합 → 문제 발생 메세지 스펙이 맞지 않아 스트림 처리가 되지 않는 경우, 스프링 어플리케이션은 정상 상태이지만 스트림즈 스레드의 상태는 체크할 수 없음.
@Component
@RequiredArgsConstructor
public class IndexStreamProcessor implements ApplicationListener<ApplicationStartedEvent> {
/**
* 카프카 스트림즈 파이프라인을 구성합니다.
*/
public void buildPipeline() {
StreamsBuilder builder = new StreamsBuilder();
...
KafkaStreams streams = new KafkaStreams(builder.build(), kafkaProperties);
streams.start();
}
@Override
public void onApplicationEvent(ApplicationStartedEvent event) {
buildPipeline();
}
}
스프링 카프카 - 카프카 스트림즈 지원!
// 1) Configuration 설정
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String bootstrapAddress;
@Value("${spring.kafka.streams.application-id:kafka-streams-app}")
private String applicationName;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kafkaStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationName);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, String().getClass().getName());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "3");
return new KafkaStreamsConfiguration(props);
}
}
// 2) 스트림 처리 로직이 포함된 프로세서를 컴포넌트로 생성
@Component
@RequiredArgsConstructor
public class IndexStreamProcessor {
@Autowired
public void buildPipeline(StreamsBuilder builder) {
KTable<String, SourceAMessage> sourceATable
= builder.table(sourceTopicA, Consumed.with(
STRING_SERDE,
CustomSerdes.sourceAMessageSerde()
));
KStream<String, SourceBMessage> sourceBStream
= builder.stream(sourceTopicB, Consumed.with(
STRING_SERDE,
CustomSerdes.sourceBMessageSerde()
));
sourceBList.join(sourceATable, joiner)
.filter((key, value) -> Objects.nonNull(value))
.to(mergedTopic);
}
}
이후 헬스체크는 Spring Actuator
를 통해 HTTP 헬스체크 엔드포인트를 만들었다.