scullxbones / akka-persistence-mongo

Implementation of akka-persistence storage plugins for mongodb
Apache License 2.0
103 stars 55 forks source link

duplicate key error (E11000) #218

Open gbrd opened 5 years ago

gbrd commented 5 years ago

[Edited] problem seems not linked to scala driver.

I have "duplicate key error (E11000)" errors :

com.mongodb.MongoBulkWriteException: Bulk write operation error on server XXX:27017. Write errors: [BulkWriteError{index=0, code=11000, message='E11000 duplicate key error collection: akka-persistence.akka_persistence_journal_-foo-bar index: akka_persistence_journal_index dup key: { : "-foo-bar_toto_titi", : 55, : 55 }', details={ }}]

(build with openjdk8, running on openjdk11) version 2.2.2 ("com.github.scullxbones" %% "akka-persistence-mongo-scala" % "2.2.2")

I did not took time to investigate for now.

I guess stack trace will not help bu just in case:

. 
    at com.mongodb.connection.BulkWriteBatchCombiner.getError(BulkWriteBatchCombiner.java:176)
    at com.mongodb.operation.BulkWriteBatch.getError(BulkWriteBatch.java:235)
    at com.mongodb.operation.MixedBulkWriteOperation.addBatchResult(MixedBulkWriteOperation.java:483)
    at com.mongodb.operation.MixedBulkWriteOperation.access$1500(MixedBulkWriteOperation.java:67)
    at com.mongodb.operation.MixedBulkWriteOperation$6.onResult(MixedBulkWriteOperation.java:468)
    at com.mongodb.operation.MixedBulkWriteOperation$6.onResult(MixedBulkWriteOperation.java:443)
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
    at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor$2.onResult(DefaultServer.java:227)
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
    at com.mongodb.internal.connection.CommandProtocolImpl$1.onResult(CommandProtocolImpl.java:85)
    at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection$1.onResult(DefaultConnectionPool.java:461)
    at com.mongodb.internal.connection.UsageTrackingInternalConnection$2.onResult(UsageTrackingInternalConnection.java:111)
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
    at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:379)
    at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:356)
    at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:651)
    at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:618)
    at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:494)
    at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:491)
    at com.mongodb.internal.connection.AsynchronousSocketChannelStream$BasicCompletionHandler.completed(AsynchronousSocketChannelStream.java:237)
    at com.mongodb.internal.connection.AsynchronousSocketChannelStream$BasicCompletionHandler.completed(AsynchronousSocketChannelStream.java:220)
    at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:127)
    at java.base/sun.nio.ch.Invoker.invokeDirect(Invoker.java:158)
    at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.implRead(UnixAsynchronousSocketChannelImpl.java:560)
    at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:277)
    at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:298)
    at com.mongodb.internal.connection.AsynchronousSocketChannelStream.readAsync(AsynchronousSocketChannelStream.java:132)
    at com.mongodb.internal.connection.InternalStreamConnection.readAsync(InternalStreamConnection.java:491)
    at com.mongodb.internal.connection.InternalStreamConnection.access$1000(InternalStreamConnection.java:74)
    at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:608)
    at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:593)
    at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:494)
    at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:491)
    at com.mongodb.internal.connection.AsynchronousSocketChannelStream$BasicCompletionHandler.completed(AsynchronousSocketChannelStream.java:237)
    at com.mongodb.internal.connection.AsynchronousSocketChannelStream$BasicCompletionHandler.completed(AsynchronousSocketChannelStream.java:220)
    at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:127)
    at java.base/sun.nio.ch.Invoker$2.run(Invoker.java:219)
    at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecuto
gbrd commented 5 years ago

Supposition (based on real observation) :

is it really possible ?

I seen another problem : on the first persist failure (due to timeout) is is "normal" that circuits breaker open. But on the second one, we have an issue on a particular "resource" not on all database accesses and circuit breaker will probably open and impact all database operations, could we avoid that ? Is it event possible with akka persistence plugin API ? Should I open a separate issue ?

gbrd commented 5 years ago

Hum ok, in fact it's a well known subject : https://doc.akka.io/docs/akka/2.5/persistence-journals.html

* Please also note that requests for the highest sequence number may be made concurrently
 * to this call executing for the same `persistenceId`, in particular it is possible that
 * a restarting actor tries to recover before its outstanding writes have completed. In
 * the latter case it is highly desirable to defer reading the highest sequence number
 * until all outstanding writes have completed, otherwise the PersistentActor may reuse
 * sequence numbers.

Thus it may depends on write/read concern, and read preference on mongodb to be able to see it's own write...

scullxbones commented 5 years ago

Which write concern are you using?

I've never seen this behavior, but I don't have a super high rate of messages or especially recovery. I do use journaled write concern.

gbrd commented 5 years ago

Acknowledge mongo 3.0 replicaset = 1 primary and 3 secondary (1 is "hidden")

Read preference is read on primary by default. it is overridden by the plug-in ?

scullxbones commented 5 years ago

I don't think the plugin does anything to actively touch read preference.

Acknowledged write concern does have some consistency tradeoffs for performance, enough to scare me off from using that mode in production for whatever that's worth.

gbrd commented 5 years ago

Ok. But we still have a problem whatever the writeconcern if "persist" operation times out while the write on mongo is still on "queue" (not taken into account). And then the persistent actor restart (maybe on another server using another journal actor...) and etc... (yes it's a (very) race condition)

Am I right ?

The question is then : how to implement : "defer reading the highest sequence number until all outstanding writes have completed"

scullxbones commented 5 years ago

I think the best path is to retry persist in the PA with some kind of exponential backoff. Both when the circuit breaker is open, as well as when a duplicate key exception is received.

Since only the mongo server knows when all outstanding writes have completed, and the plugin does not control how/when the library (akka-persistence) invokes maxSequence logic, I'm not seeing how it can be handled by the plugin. It would be complicated (I think) but it's possible to put in a gate delaying maxSequence calls. It's not clear to me how to know when that gate should be open or closed, without knowing the state of the mongo write queue.

gbrd commented 5 years ago

I don't think we can retry persist: in doc of onPersistFailure we can read: The actor is always stopped after this method has been invoked. Thus retry would be to let actor crash on launch it again ; that's what we currently do naturally, and it could be acceptable. The thing that might be a problem is that the persistent actor could remain live without writing data, and will see newer data when it will restart.

A solution that I see could be to use write-concern=majority for all writes and read-concern=linearisable when reading highest sequence number. This would cover that cases where write was acknoledged, for the race condition where it times out, we could just erase the previous event on second persist (after duplicate-key error). This solution is certainly not ideal because it would mask some other errors.

We may ask to @patriknw how it is managed in cassandra plugin ? In code here: https://github.com/akka/akka-persistence-cassandra/blob/master/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala

I see that there is a data structure writeInProgress used to defer read of maxSeqNum but I guess it will not work if the PA is recreated on another server in an akka cluster (which will have a separate writeInProgress collection).

scullxbones commented 5 years ago

I don't think we can retry persist: in doc of onPersistFailure we can read: The actor is always stopped after this method has been invoked. Thus retry would be to let actor crash on launch it again ; that's what we currently do naturally, and it could be acceptable.

Ah yes of course - good point. You could exponential backoff if you had a request/reply command protocol on the outside, but not inside the PA. The advantage of this is that the maxSequenceNr would eventually increment and no longer trigger a unique index exception. That said, the events would need to be idempotent if they will potentially be duplicated.

A solution that I see could be to use write-concern=majority for all writes and read-concern=linearisable when reading highest sequence number.

Is your replica set flapping / doing master re-elections frequently? If not, is there a way to figure out why the database is so slow as to cause the circuit breaker to time out? Perhaps the logs, if slow logging is enabled. I know if the mongo working set is bigger than available RAM it bogs down pretty seriously.

gbrd commented 5 years ago

We don't have re-elections frequently, I was just trying to find a solution where overwriting could be acceptable (and thus in all cases - even with master changes - if first write was acknowledged it is not acceptable to overwrite).

I'm still working on this but it seems this we have a part of persist failures due to other causes, (probably a problem with akka-cluster-sharding that ends with 2 PA with the same persistent id at the same time...) I'm still investigating, but this part has probably no link with the plugin.

scullxbones commented 5 years ago

probably a problem with akka-cluster-sharding that ends with 2 PA with the same persistent id at the same time

Ouch - ok, so a network partition split-brain. That would definitely cause constraint failures even by itself. Are you using a cluster downing library to take down isolated nodes? There's the lightbend subscription that provides one, and I think there's also a few OSS ones on github.

gbrd commented 5 years ago

We have our own implementation of split brain resolver. (implemented before OSS one is available) ...But the cluster itself is not split, and I've reproduced a complex situation where a shard is allocated at 2 servers at the same time. Still under investigation...

patriknw commented 5 years ago

It's right that the writeInProgress in the Cassandra plugin protects against PA restarts while there is still writes in flight from same PA, but it's only for restarts of PA on same node (same journal Actor instance). For restart on another node the thinking is that such fail over would anyway take long time (10s of seconds) and by that time any pending writes from the old node should have been completed.

gbrd commented 5 years ago

Ok. Our main issue was a bad configuration of serializers that made journal event not replayed (only snapshots) for internal akka cluster sharding data and lead to complex situations...

But the issue described initially here remains present and I will continue to work on it. Thanks @patriknw for your answer, maybe a similar solution would fix this issue.