Open yeomko22 opened 2 years ago
public class SimpleConsumer { private final static Logger logger = LoggerFactory.getLogger(SimpleConsumer.class); private final static String TOPIC_NAME = "test"; private final static String BOOTSTRAP_SERVERS = "my-kafka:9092"; private final static String GROUP_ID = "test-group"; public static void main(String[] args) { Properties configs = new Properties(); configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs); consumer.subscribe(Arrays.asList(TOPIC_NAME)); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> record: records) { logger.info("{}", record); } } } }
consumer.assign(Collections,singleton(new TopicPartition(TOPIC_NAME, PARTITION_NUMBER)));
try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> record: records) { logger.info("{}", record); currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, null)); consumer.commitSync(currentOffsets); } } } catch (WakeupException e) { logger.warn("Wakeup consumer"); } finally { consumer.close(); }
// shutdown hook 등록 Runtime.getRuntime().addShutdownHook(new ShutdownThread());
// wakeup 함수 호출하는 shutdown hook static class ShutdownThread extends Thread { public void run() { logger.info("Shutdown hook"); consumer.wakeup(); } }
Consumer API
컨슈머를 운영하는 2가지 방법
컨슈머 그룹 운영
컨슈머 그룹 예시 - 시스템 정보 확인 파이프라인
컨슈머 리밸런싱
오프셋 커밋
컨슈머 내부구조
컨슈머 필수 옵션
컨슈머 선택 옵션
리밸런스 리스너를 가진 컨슈머
파티션을 직접 할당하는 컨슈머
컨슈머 안전 종료
// wakeup 함수 호출하는 shutdown hook static class ShutdownThread extends Thread { public void run() { logger.info("Shutdown hook"); consumer.wakeup(); } }