Kotlin based event bus that implements an outbox pattern approach to publishing domain level events in the context of database transactions. This allows to atomically update the database and publish events. Check this for an in-depth explanation of this pattern.
This solution uses a polling publisher to push messages to the event queue.
This library targets scenarios where we want to reliably persist data and send events to a message bus.
This library has an opinionated view about the internal structure of an event, this may change in the future, but for now the events have the following structure (see the EventOutput class):
Field | Data type |
---|---|
uuid | String |
timestamp | Instant |
topic | String |
msgType | String |
mimeType | String |
payload | ByteArray |
The payload is the user-specific content of the message. For now messages are encoded as JSON, but this may also change in the future.
Column | Data type |
---|---|
topic | varchar(255) |
delivered | boolean |
uuid | char(36) |
stored_timestamp | datetime |
send_timestamp | datetime |
msg_type | varchar(255) |
mime_type | varchar(255) |
payload | blob |
The event relay will keep on trying to send events that fails delivery, while also writing a messasge with alert level ERROR indicating that the failure has occurred.
Currently, we are using Exposed as our ORM, this forces the users of this library to also use exposed in order to have the transactional properties that are implied.
gradlew clean build
Example:
// imports are omitted. Check the examples section - producer
// Step 1
val dataSource:DataSource = HikariDataSource(HikariConfig().apply {
driverClassName = "org.h2.Driver"
jdbcUrl = "jdbc:h2:mem:test"
maximumPoolSize = 5
isAutoCommit = false
transactionIsolation = "TRANSACTION_REPEATABLE_READ"
validate()
})
// See the producer example (mobi.waterdog.eventbus.example.sql.LocalEventStoreSql)
val localEventStore: LocalEventStore = MyLocalEventStore()
// Step 2
val props = Properties()
//General cluster settings and config
props["bootstrap.servers"] = kafkaServer
//Kafka serialization config
props["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
props["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
// Step 3
val ebp = EventBusProvider()
ebf.setupProducer(EventBackend.Kafka, localEventStore)
val producer = ebf.getProducer(props)
// Step 4
producer.send(EventInput("test", "OK", "text/plain", "sent at: ${Instant.now()}".toByteArray()))
// imports are omitted. Check the examples section - consumer
// Step 1
val ebp = EventBusProvider()
ebp.setupConsumer(EventBackend.Kafka)
// Step 2
val props = Properties()
//General cluster settings and config
props["bootstrap.servers"] = kafkaServer
props["enable.auto.commit"] = "true"
props["auto.commit.interval.ms"] = "1000"
props["heartbeat.interval.ms"] = "3000"
props["session.timeout.ms"] = "10000"
props["auto.offset.reset"] = "latest"
//Kafka serialization config
props["key.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
props["value.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
//Event bus property that controls the sync loop and the auto-commit mode
props["consumer.syncInterval"] = "1000"
props["consumer.streamMode"] = "AutoCommit"
val consumer = ebf.getConsumer(props)
// Step 3
val topic="my-topic"
val consumerId = "consumer-group-id"
consumer.stream(topic, consumerId)
.doOnError { it.printStackTrace() }
.onErrorReturnItem(EventOutput.buildError())
.subscribe {
log.info("EVENT: ${it.uuid} @ ${it.timestamp}")
}
The following metrics are exported (using micrometer):
// Given: A meter registry
val meterRegistry = SimpleMeterRegistry()
meterRegistry.config().commonTags("service-name", "my-beautiful-service")
// See the producer example (mobi.waterdog.eventbus.example.sql.LocalEventStoreSql)
val localEventStore: LocalEventStore = MyLocalEventStore()
val ebf = EventBusProvider(EventBackend.Kafka, meterRegistry)
ebf.setupProducer(localEventStore)
// You are then able to access your metrics
val timer = meterRegistry.get("events.store.timer").timer()