yeomko22 / TIL

Today I learned
1 stars 0 forks source link

kafka producer api #110

Open yeomko22 opened 3 years ago

yeomko22 commented 3 years ago

카프카 클라이언트

public class SimpleProducer {
    private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
        String messageValue = "testMessage";
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
        producer.send(record);
        producer.flush();
        producer.close();
    }
}

11

프로듀서 필수 옵션

프로듀서 선택 옵션

메시지 키를 가진 데이터 전송 프로듀서

// key 설정 레코드
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "Pangyo", "23");

// partition 지정 레코드
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, 0, "Pangyo", "23");

커스텀 파티셔너

브로커 정상 전송 여부 확인 프로듀서

ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
RecordMetadata metadata = producer.send(record).get();
logger.info(metadata.toString());