dnvriend / akka-persistence-inmemory

Akka-persistence-inmemory is a plugin for akka-persistence that stores journal and snapshot messages memory, which is very useful when testing persistent actors, persistent FSM and akka cluster
Apache License 2.0
134 stars 41 forks source link

Seems like eventsByTag query misses events (at buffer's boundary). #55

Open LoneEngineer opened 6 years ago

LoneEngineer commented 6 years ago

Hi,

I use Akka 2.5.14 and akka-persistence-inmemory 2.5.1.1, and seems like I fall into following issue: I emit quite fast a lot of events (more that default max-buffer-size) and sometimes 101th, 202th and 303th events are not pushed into stream. I tried to read code, and following place I consider as suspicious:

  override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
    Source.unfoldAsync[Offset, Seq[EventEnvelope]](offset) { (from: Offset) =>
      def nextFromOffset(xs: Seq[EventEnvelope]): Offset = {
        if (xs.isEmpty) from else xs.last.offset match {
          case Sequence(n)         => Sequence(n)
          case TimeBasedUUID(time) => TimeBasedUUID(UUIDs.startOf(UUIDs.unixTimestamp(time) + 1))
        }
      }
      ticker.flatMapConcat(_ => currentEventsByTag(tag, from)
        .take(maxBufferSize)).runWith(Sink.seq).map { xs =>
        val next = nextFromOffset(xs)
        Some((next, xs))
      }
    }.mapConcat(identity)

Let consider an example (real values from test, sorry I cannot share test yet): 100th event has TimeBasedUUID(fa7225e0-a223-11e8-b71e-e9435a127f49) 101th event has TimeBasedUUID(fa7225e1-a223-11e8-b71e-e9435a127f49)

According to my logs, my stream processing logic never got 101th event. If we run code from nextFromOffset:

@ val time1 = UUID.fromString("fa7225e0-a223-11e8-b71e-e9435a127f49") 
time1: UUID = fa7225e0-a223-11e8-b71e-e9435a127f49
@ val next1 = TimeBasedUUID(UUIDs.startOf(UUIDs.unixTimestamp(time1) + 1)) 
next1: TimeBasedUUID = TimeBasedUUID(fa724cf0-a223-11e8-8080-808080808080)
@ val time2 = UUID.fromString("fa7225e1-a223-11e8-b71e-e9435a127f49") 
time2: UUID = fa7225e1-a223-11e8-b71e-e9435a127f49
@ TimeBasedUUID(time2).compare(next1) 
res9: Int = -1

So if currentEventsByTag returned 101 events, last one is dropped by take(100) and next offset returned to unfold will be fa724cf0-a223-11e8-8080-808080808080 which is after 101th event's timestamp.

If I set max-buffer-size to value above number of events my test can generate - everything works fine.

What you think about my thoughts?

dnvriend commented 6 years ago

I think you hit an edge case. The solution of setting the max-buffer-size seems reasonable.

LoneEngineer commented 5 years ago

I did a fix and created pull request https://github.com/dnvriend/akka-persistence-inmemory/pull/64 Please review.

LoneEngineer commented 5 years ago

Just copy description from commit message: The problem happened then buffer's bound stops in between of events with timeuuid like: ... e3e99ed0-a21d-11e8-b31a-e9435a127f49 // A: last event which put into a buffer e3e99ed1-a21d-11e8-b31a-e9435a127f49 // B: next one ...

InMemoryReadJournal::eventsByTag::nextFromOffset uses unix timestamp to calculate 'next' event:

case TimeBasedUUID(time) => TimeBasedUUID(UUIDs.startOf(UUIDs.unixTimestamp(time) + 1))

and it skips event B because both of them have same unix timestamp: 1534510989629, and 'next' uuid will be: e3e9c5e0-a21d-11e8-b31a-e9435a127f49

The difference in nanoseconds: 137538037896290000 for A 137538037896290001 for B

Where it comes from ? InMemoryAsyncWriteJournal uses following functions to generate timeuuid for an event:

def nowUuid: UUID = UUIDs.timeBased()
def getTimeBasedUUID: TimeBasedUUID = TimeBasedUUID(nowUuid)
def timeBased(): UUID = {
   new UUID(makeMSB(UUIDUtil.getCurrentTimestamp()), ClockSeqAndNode)
}

If we will take a look on UUIDUtil.getCurrentTimestamp more closely, we can see following:

public static final AtomicLong lastTimestamp = new AtomicLong(0L);
   ...
       long now = fromUnixTimestamp(System.currentTimeMillis());
       long last = lastTimestamp.get();
       if (now > last) { 
          ...
        } else { 
          ...
          long candidate = last + 1;

So if two (or more) events are persisted in same millisecond, nanoseconds will be added to timeuuid. But they are not taken into account when events are read from a journal.

PS: I also added the test for that scenario, unfortunately test is very depended on timing (performance) and may NOT fail even with broken implementation. I was able to choose parameters which gives me like ~100% failure rate. I mean, the test never passed successfully with original implementation on my box but I cannot guarantee that for other boxes.