Closed Rouche closed 2 years ago
Are you using Docker Desktop, or a Docker daemon installed into WSL2 and manually exposed to the Windows host? When using Docker Desktop, exposing access to the daemon via tls://localhost:2375
is not necessary, since Testcontainers will connected using Windows-native named pipes.
Can you share your test code? Also, what happens when you run the Kafka image directly from Docker CLI?
Docker Desktop with WSL 2 engine. For tls its to get access via IntelliJ service tab, also when i use SpringBoot to connect directly to mysql or kafka, wich runs perfectly with the right compose stack.
Here is the code, but i think if you have a minimal example working with avro, publish / consume, i could test it in my environment and if it works i adjust with my record and everything. I did not find such an example. I found it but its a bit different. Il try testing it on my machine.
Caint put all the code here but it gives the idea:
package x.x
import kong.unirest.Unirest
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.Assertions.assertNull
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS
import org.junit.jupiter.api.assertThrows
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource
import org.mockito.kotlin.mock
import org.rnorth.ducttape.TimeoutException
import org.rnorth.ducttape.unreliables.Unreliables.retryUntilTrue
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.junit.jupiter.Testcontainers
import java.time.Duration
import java.util.Properties
import java.util.UUID
import java.util.concurrent.TimeUnit.SECONDS
data class SchemaInfo(var schema: String)
@Testcontainers
@TestInstance(PER_CLASS)
internal class EventsIntegrationTest {
private val config = ConfigLoader.init("test")
private val availsService = mock<AvailsService>()
private val flightDetailMapper = FlightDetailMapper(availsService)
private val eventOutboxTopic = config.getString("kafka.tapEventsTopic")
private val flightDetailsTopic = config.getString("kafka.flightDetailsTopic")
private val schemaRegistryUrl = config.getString("kafka.registryUrl")
private val replicationFactor = config.getInt("kafka.replicationFactor")
private lateinit var schema: Schema
private lateinit var kafka: KafkaContainer
private lateinit var tapEventsStreams: EventsStreams
private lateinit var kafkaStream: KafkaStreams
private lateinit var consumer: Consumer<String, GenericRecord>
private lateinit var producer: Producer<String, GenericRecord>
private var seqId: Long = 0L
@BeforeAll
fun setupKafkaContainer() {
kafka = KafkaContainer().apply {
this.start()
this.execInContainer("/bin/sh", "-c", "/usr/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic $eventOutboxTopic")
this.execInContainer("/bin/sh", "-c", "/usr/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic $detailsTopic")
}
eventsStreams = EventsStreams(
schemaRegistryUrl = schemaRegistryUrl,
brokers = kafka.bootstrapServers,
incomingTopic = eventOutboxTopic,
outgoingTopic = flightDetailsTopic,
replicationFactor = replicationFactor,
flightDetailMapper = flightDetailMapper,
processingGuarantee = AT_LEAST_ONCE
)
kafkaStream = eventsStreams.setup()
kafkaStream.start()
schema = schema()
consumer = consumer()
producer = producer()
consumer.subscribe(listOf(detailsTopic))
}
@AfterAll
fun tearDownContainer() {
consumer.close()
producer.close()
kafka.close()
}
@Test
fun `receives a message and writes into destination topic with a null payload (validation failure)`() {
// Given
val record = record(
schema = schema,
objectInfo = event().copy(creatives = null),
objectType = "type",
operation = "create")
// When
val message = produceAndGet(record)
// Then
assertNull(message?.value())
assertEquals(message?.key(), fortyFourId.toString())
}
private fun produceAndGet(produceRecord: GenericData.Record): ConsumerRecord<String, GenericRecord>? {
lateinit var records: ConsumerRecords<String, GenericRecord>
val key = (seqId++).toString()
producer.send(ProducerRecord(eventOutboxTopic, key, produceRecord))
retryUntilTrue(10, SECONDS) {
records = consumer.poll(Duration.ofMillis(100))
if (records.isEmpty) {
return@retryUntilTrue false
}
true
}
return records.records(flightDetailsTopic).first()
}
private fun schema(): Schema {
val schemaInfo = Unirest.get("http://localhost:8081/subjects/outbox/versions/latest").asObject(SchemaInfo::class.java).body
return Schema.Parser().parse(schemaInfo.schema)
}
private fun consumer(): Consumer<String, GenericRecord> {
val properties = Properties().apply {
this[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafka.bootstrapServers
this[ConsumerConfig.GROUP_ID_CONFIG] = "tests-" + UUID.randomUUID()
this[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
}
return KafkaConsumer(properties, StringDeserializer(), tapEventsStreams.avroSerde().deserializer())
}
private fun producer(): Producer<String, GenericRecord> {
val properties = Properties().apply {
this[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafka.bootstrapServers
this[ProducerConfig.CLIENT_ID_CONFIG] = UUID.randomUUID().toString()
this[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = KafkaAvroSerializer::class.java
}
return KafkaProducer(properties, StringSerializer(), tapEventsStreams.avroSerde().serializer())
}
private fun flightRecord(
schema: Schema,
objectInfo: Event,
objectType: String = "type",
operation: String = "create"
): GenericData.Record {
return GenericData.Record(schema).apply {
put("id", 1)
put("operation", operation)
put("creation_time", System.currentTimeMillis())
put("user", "johnd")
put("object_id", fortyFourId)
put("object_type", objectType)
put("object_info", toJson(objectInfo))
}
}
private fun aDeliveringOrganization() = anOrganization().copy(_deliveryState = enabled)
private fun aNonDeliveringOrganization() = anOrganization().copy(_deliveryState = disabled)
}
I see you are using hardcoded localhost
values in a couple of places, which could be problematic. Can you please share the actual error you are getting?
Also, if you could share a minimal reproducer project on GitHub, this would be very helpful for triaging the problem (and I should be able to replicate it, since I am also on Windows with WSL2).
All your tests are working.
Ill try to reproduce in a minimal project as soon as posible!
Thanks.
----- Edit
I caint believe it. Coming back from weekend i open the computer and it works.
The difference was getting your project i used cp-kafka:6.2.1
but cp-kafka:latest
is failing.
Cain't use cp-kafka:latest
Hi,
Im on last version of TestContainers and TestContainers:Kafka and Docker Windows (WSL 2)
While debugging, i see all my images (only 3) are included in cache correctly and the testcontainers/ryuk:0.3.3 too.
But at
CreateContainerCmdExec
class it fails to post the request. With the message in title.Heres the variables i have just the line before failing:
Any idea if i miss a config? I did not find any specific. And yes my Docker have option enable acces via tls://localhost:2375 The tests are working on BitBucket server but not on a MAC or Windows
Hoping you can help. Thanks.