waterdog-oss / outbox-event-bus

A technology agnostic event bus
MIT License
3 stars 1 forks source link

Entity not found, Event Sync failed Error #9

Closed aanciaes closed 5 years ago

aanciaes commented 5 years ago

With a producer sending a event every 5 seconds, sometimes an error occurs with the stack trace below.

The consumer and producer are as follow:

fun start() {
        thread {
            val consumer: EventConsumer by inject()

            consumer.stream("telemetry", "telemetry-consumer")
                .filter { it.msgType == "DecodedFrame" }
                .subscribe { parseMessage(it.payload) }
        }
    }

fun testProducer() {
        val producer: EventProducer by inject()

        timer("producer", false, 5000, 5000) {
            producer.sendAsync(
                EventInput(
                    "telemetry",
                    "DecodedFrame",
                    "application/json",
                    JsonFrames.gpsFrame.toByteArray()
                )
            )
        }
    }

with the following configurations:

private fun initBusEventBus(): EventConsumer {
        val ebf: EventBusProvider = get()

        val props = Properties()
        props["bootstrap.servers"] = "kafka-service:9092"
        props["enable.auto.commit"] = "true"
        props["group.id"] = "test-consumer-group"
        props["auto.commit.interval.ms"] = "1000"
        props["auto.offset.reset"] = "earliest"
        props["session.timeout.ms"] = "30000"
        props["key.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
        props["value.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
        props["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
        props["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
        props["sync.delayMillis"] = 1000

        ebf.setup(EventBackend.Kafka, props)

        return ebf.getConsumer()
    }

The different errors occur with some time in between them, and just a few times, not always with the same event

org.jetbrains.exposed.exceptions.EntityNotFoundException: Entity EventDAO, id=4 not found in database
        at org.jetbrains.exposed.dao.EntityClass.get(Entity.kt:495)
        at org.jetbrains.exposed.dao.EntityClass.get(Entity.kt:497)
        at mobi.waterdog.eventbus.persistence.sql.LocalEventCacheSql$markAsDelivered$1$1.invoke(LocalEventCacheSql.kt:20)
        at mobi.waterdog.eventbus.persistence.sql.LocalEventCacheSql$markAsDelivered$1$1.invoke(LocalEventCacheSql.kt:13)
        at mobi.waterdog.eventbus.persistence.sql.DatabaseConnection$query$2$1.invoke(DatabaseConnection.kt:16)
        at mobi.waterdog.eventbus.persistence.sql.DatabaseConnection$query$2$1.invoke(DatabaseConnection.kt:9)
        at org.jetbrains.exposed.sql.transactions.ThreadLocalTransactionManagerKt.inTopLevelTransaction(ThreadLocalTransactionManager.kt:103)
        at org.jetbrains.exposed.sql.transactions.ThreadLocalTransactionManagerKt.transaction(ThreadLocalTransactionManager.kt:74)
        at org.jetbrains.exposed.sql.transactions.ThreadLocalTransactionManagerKt.transaction(ThreadLocalTransactionManager.kt:57)
        at mobi.waterdog.eventbus.persistence.sql.DatabaseConnection$query$2.doResume(DatabaseConnection.kt:15)
        at kotlin.coroutines.experimental.jvm.internal.CoroutineImpl.resume(CoroutineImpl.kt:42)
        at kotlinx.coroutines.experimental.DispatchedTask$DefaultImpls.run(Dispatched.kt:168)
        at kotlinx.coroutines.experimental.DispatchedContinuation.run(Dispatched.kt:13)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
23:00:27.855 [Thread-5] ERROR m.w.e.p.PersistentEventWriter - Event sync failedEntity EventDAO, id=5 not found in database
org.jetbrains.exposed.exceptions.EntityNotFoundException: Entity EventDAO, id=5 not found in database
        at org.jetbrains.exposed.dao.EntityClass.get(Entity.kt:495)
        at org.jetbrains.exposed.dao.EntityClass.get(Entity.kt:497)
        at mobi.waterdog.eventbus.persistence.sql.LocalEventCacheSql$markAsDelivered$1$1.invoke(LocalEventCacheSql.kt:20)
        at mobi.waterdog.eventbus.persistence.sql.LocalEventCacheSql$markAsDelivered$1$1.invoke(LocalEventCacheSql.kt:13)
        at mobi.waterdog.eventbus.persistence.sql.DatabaseConnection$query$2$1.invoke(DatabaseConnection.kt:16)
        at mobi.waterdog.eventbus.persistence.sql.DatabaseConnection$query$2$1.invoke(DatabaseConnection.kt:9)
        at org.jetbrains.exposed.sql.transactions.ThreadLocalTransactionManagerKt.inTopLevelTransaction(ThreadLocalTransactionManager.kt:103)
        at org.jetbrains.exposed.sql.transactions.ThreadLocalTransactionManagerKt.transaction(ThreadLocalTransactionManager.kt:74)
        at org.jetbrains.exposed.sql.transactions.ThreadLocalTransactionManagerKt.transaction(ThreadLocalTransactionManager.kt:57)
        at mobi.waterdog.eventbus.persistence.sql.DatabaseConnection$query$2.doResume(DatabaseConnection.kt:15)
        at kotlin.coroutines.experimental.jvm.internal.CoroutineImpl.resume(CoroutineImpl.kt:42)
        at kotlinx.coroutines.experimental.DispatchedTask$DefaultImpls.run(Dispatched.kt:168)
        at kotlinx.coroutines.experimental.DispatchedContinuation.run(Dispatched.kt:13)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
23:00:32.857 [Thread-5] ERROR m.w.e.p.PersistentEventWriter - Event sync failedEntity EventDAO, id=6 not found in database
org.jetbrains.exposed.exceptions.EntityNotFoundException: Entity EventDAO, id=6 not found in database
        at org.jetbrains.exposed.dao.EntityClass.get(Entity.kt:495)
        at org.jetbrains.exposed.dao.EntityClass.get(Entity.kt:497)
        at mobi.waterdog.eventbus.persistence.sql.LocalEventCacheSql$markAsDelivered$1$1.invoke(LocalEventCacheSql.kt:20)
        at mobi.waterdog.eventbus.persistence.sql.LocalEventCacheSql$markAsDelivered$1$1.invoke(LocalEventCacheSql.kt:13)
        at mobi.waterdog.eventbus.persistence.sql.DatabaseConnection$query$2$1.invoke(DatabaseConnection.kt:16)
        at mobi.waterdog.eventbus.persistence.sql.DatabaseConnection$query$2$1.invoke(DatabaseConnection.kt:9)
        at org.jetbrains.exposed.sql.transactions.ThreadLocalTransactionManagerKt.inTopLevelTransaction(ThreadLocalTransactionManager.kt:103)
        at org.jetbrains.exposed.sql.transactions.ThreadLocalTransactionManagerKt.transaction(ThreadLocalTransactionManager.kt:74)
        at org.jetbrains.exposed.sql.transactions.ThreadLocalTransactionManagerKt.transaction(ThreadLocalTransactionManager.kt:57)
        at mobi.waterdog.eventbus.persistence.sql.DatabaseConnection$query$2.doResume(DatabaseConnection.kt:15)
        at kotlin.coroutines.experimental.jvm.internal.CoroutineImpl.resume(CoroutineImpl.kt:42)
        at kotlinx.coroutines.experimental.DispatchedTask$DefaultImpls.run(Dispatched.kt:168)
        at kotlinx.coroutines.experimental.DispatchedContinuation.run(Dispatched.kt:13)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
felix19350 commented 5 years ago

Closed with PR #17