Open JayCesar opened 6 months ago
O local onde as informações do Kafka ficam armazenadas se chama ZooKepper, logo eu tenho que baixar o Kafka e também o ZooKeeper Link 01: https://kafka.apache.org/ Link 02: https://zookeeper.apache.org/releases.html
Descrever os tópicos: bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
Subir o zooKeeper bin/zookeeper-server-start.sh config/zookeeper.properties
Listar os tópicos bin/kafka-topics.sh --list --bootstrap-server localhost:9092
Subir o Kafka bin/kafka-server-start.sh config/server.properties
Cadastrar um tópico (documentação) bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
Consumir um tópico bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ECOMMERCE_NEW_ORDER --from-beginning
Send é assÃncrono! Logo eu preiso adicionar o .get() para esperar! Mas isso pode gerar uma exception, por isso preciso lançar.
A medida que eu envio eu preciso indicar se recebi ou não, por isso eu uso a interface CallBack()
Assim ele está consumindo:
Eu preciso de um Listener!
Código:
public static void main(String[] args) throws ExecutionException, InterruptedException {
// Tem dois parâmetros: tipo da chave e tipo da mensagem
var producer = new KafkaProducer<String, String>(properties());
var value = "132123,675223,91238419";
// Agora que eu tenho um producer, posso enviar algo:
// var record = new ProducerRecord<String, String>("ECOMMERCE_NEW_ORDER", value, value);
// Ele percebe que é String, String, então eu posso remover:
var record = new ProducerRecord<>("ECOMMERCE_NEW_ORDER", value, value);
producer.send(record, (data, ex) ->{
if(ex != null){
ex.printStackTrace();
return;
}
System.out.println("sucesso enviando nesse tópico:" + data.topic() + ":::partition " + data.partition() + "/ offset " + data.offset() + "/ timestamp" + data.timestamp());
}).get(); // é um registro porque ficará registrado no kafka pelo tempo que eu quiser, basta eu configurar
}
private static Properties properties() {
var properties = new Properties();
// Preciso apontar ONDE estão rodando minhas kafkas
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// Além disso, eu preciso passar os transformadores de Strings para Byte (Serializadores)
// Key Serializer é o transformador, depois preciso intigar que será de String, tanto p/ chave quanto p/ o valor
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return properties;
}
The offset in Kafka is not exactly the "amount" of messages, but rather a unique identifier assigned to each message within a partition. It represents the position of a message within the partition's sequence.
[Topic: Weather Data]
|
|--- [Partition 0]
| |--- Offset: 0 Key: SensorID-123 Message: {"temperature": 25, "humidity": 60}
| |--- Offset: 1 Key: SensorID-456 Message: {"temperature": 23, "humidity": 55}
|
|--- [Partition 1]
|--- Offset: 0 Key: SensorID-789 Message: {"temperature": 28, "humidity": 70}
Todos / Cada um deles precisa receber TODAS as mensagens, logo, ao criar um Consumer, eu preciso dizer qual é o ID do grupo das mensagens.
Neste caso o ID será o nome da própria classe:
É comum numa aplicação a gente fazer / aplicar serviços em paralelo, ou seja, pode ser no mesmo computador duas threads. Enquanto isso, neste exemplo, eu aviso, olha, sua compra está sendo processada.
Ou seja, são muitos processos, num exemplo de um sistema de delivery! Em outras palavras, são muitas requisições HTTP. Cada caixinha seria um URI;
Para tudo isso eu preciso de um log, onde tem dado de TUDO que acontece! Todo mundo tem que acessar, tudo precisa ir pro sistema de log.
Tudo isso gera uma CONFUSÃO GIGANTE:
É aqui que entra a necessidade de REPENSAR toda essa arquitetura
O Broker recebe uma mensagem do tipo do serviço que está sendo requisitado, pode ser compra, devolução etc...
O e-mail está recebendo esse assunto, esse tópico, o reservar estoque também está, o Log também está recebendo, todos estão! O servidor HTTP não sabe disso! Todos os sistemas estão de recebendo as mensagens / tópicos.
Quando um serviço termina, ele também envia uma mensagem! E todos vão ouvir!
Isso é o conceito de mensageria, todos escutam as mensagens de cada tópico.
O Kafka utiliza algumas sacadas de mensageria, uma delas é: eu posso ter mais de um broker, posso ter várias máquinas rodando, eu posso escalar conforme o necessário.
Eu vou removendo os pontos de falhas! Eu posso ter cluster de brokers. Eu consigo ter mais Reliability