Closed 0pdd closed 1 year ago
@l3r8yJ that's how Eventuate, popular event sourcing framework implements kafka consumer: https://github.com/eventuate-tram/eventuate-tram-core/blob/master/eventuate-tram-consumer-kafka/src/main/java/io/eventuate/tram/consumer/kafka/EventuateTramKafkaMessageConsumer.java
You can try to get deeper into their classes and see how consumer fetches the data
@h1alexbel thank you very much, I'll check it out when I'm a bit freer than I am now.
@l3r8yJ I found 2 bugs: #93 #94 They can help to implement consumer logic
@h1alexbel is this issue still relevant?
@l3r8yJ yes, in master branch you can see only draft solution for consumer data polling
@h1alexbel I'm a bit dived in consumers and should it be closable
or not?
@l3r8yJ yes, it's should be closeable
@l3r8yJ by the way you can use Kafka clients API, and https://github.com/eventuate-tram/eventuate-tram-core/blob/master/eventuate-tram-consumer-kafka/src/main/java/io/eventuate/tram/consumer/kafka/EventuateTramKafkaMessageConsumer.java
@h1alexbel yes, I go by the sources you gave
here's the thing: https://github.com/eo-cqrs/eo-kafka/blob/master/src/main/java/io/github/eocqrs/kafka/consumer/KfConsumer.java#:~:text=%3C%3E(-,13,-)%3B
why 13
?
@l3r8yJ it's just initial size of the list. Do you have a better ideas?
@h1alexbel we usually set 0
, that's why I'm asking
also i see this
this.origin
.poll(timeout)
.records(topic)
.forEach(
data ->
iterate.add(
new KfData<>(
data.value(),
topic,
data.partition()
).dataized()
)
);
Isn't this Data polling
already, or am I misunderstanding something?
We need to test this; maybe it's a bad implementation
I mean the result of this method, is it simple enough to the user?
@h1alexbel do you know what is may be?
in integration test i have a maven project with junit
and eo-kafka
deps.
inside a test i'm creating a new producer
and consumer
:
try (
final Producer<String, String> producer =
new KfProducer<>(new KfProducerSettings<>("settings.xml"));
final Consumer<String, String> consumer =
new KfConsumer<>(new KfConsumerSettings<>("consumer.xml"))
) {
consumer.subscribe(new ListOf<>("pic"));
producer.send(
"my_key",
new KfData<>("value", "pic", 1)
);
MatcherAssert.assertThat(
"Consumes data right",
consumer.iterate("pic", Duration.ofSeconds(5L)).size(),
Matchers.equalTo(1)
);
} catch (final Exception ex) {
throw new IllegalStateException(ex);
}
in logs i see:
[main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-1-1, groupId=1] Subscribed to topic(s): pic
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Node -1 disconnected.
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
Is far as i understand consumer
is okay, but here is some problem with producer
?
settings.xml
for producer:
<producer>
<bootstrapServers>localhost:9092</bootstrapServers>
<groupId>1</groupId>
<keySerializer>org.apache.kafka.common.serialization.StringSerializer</keySerializer>
<valueSerializer>org.apache.kafka.common.serialization.StringSerializer</valueSerializer>
</producer>
consumer.xml
for consumer:
<consumer>
<bootstrapServers>localhost:9092</bootstrapServers>
<groupId>1</groupId>
<keyDeserializer>org.apache.kafka.common.serialization.StringDeserializer</keyDeserializer>
<valueDeserializer>org.apache.kafka.common.serialization.StringDeserializer</valueDeserializer>
</consumer>
what do you think?
@l3r8yJ you need to start Kafka broker on localhost:9092
You can use this docker-compose file
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: micro-zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.3.0
container_name: micro-kafka
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
depends on #134
@l3r8yJ how can I help you in this ticket?
This functionality, to my knowledge already works fine. Some tunes will be appreciated
@h1alexbel In the integration test that we have, we need to reproduce the situation where we produce data with KfProducer
and read it with KfConsumer
. I think you can play with that in #236
The puzzle 41-80474f37
has disappeared from the source code, that's why I closed this issue.
The puzzle
41-80474f37
from #41 has to be resolved:https://github.com/eo-cqrs/eo-kafka/blob/457754bdc447e1199b0a1b6af6c0f97a8e71bbd5/src/main/java/io/github/eocqrs/kafka/consumer/KfConsumer.java#L52-L60
The puzzle was created by @rultor on 01-Mar-23.
Estimate: 30 minutes, role: DEV.
If you have any technical questions, don't ask me, submit new tickets instead. The task will be \"done\" when the problem is fixed and the text of the puzzle is removed from the source code. Here is more about PDD and about me.