akka / akka-persistence-dynamodb

DynamoDBJournal for Akka Persistence
Other
85 stars 51 forks source link

Read journal for DynamoDB #50

Open manocha-aman opened 8 years ago

manocha-aman commented 8 years ago

To be able to use events generated on the read side using Akka Persistence Query.

patriknw commented 8 years ago

Contributions here would be very interesting

manocha-aman commented 8 years ago

Thanks for your reply Patrik, I will be very happy to contribute. I will check how it can be done.

On another note, do you think DynamoDB triggers which launch AWS Lambda functions can be used to populate views?

joost-de-vries commented 7 years ago

@patriknw We're looking into implementing this and offering a PR. Do you have any specific requirements that we should keep in mind? F.i. wrt Dynamodb performance. Also; do you have a suggestion which implementation of Akka Persistence Query would be the best example to take as inspiration?

patriknw commented 7 years ago

Sounds good. One consideration might be the cost aspect of dynamosb queries.

The cassandra plugin is good, but it's using ActorPublisher which should probably be implemented with GraphStage instead.

Another thing, cassandra plugin is polling. If DynamoDB has some push notification that would be nice to use.

joost-de-vries commented 7 years ago

@patriknw There's something called Dynamodb Streams which publishes the events on a Dynamodb table if so configured. The aws client provides an api with a so called iterator. That still sounds like polling. But I haven't analysed the source code to see if they use a streaming protocol...

Regarding your remark about using GraphStage: there's now a Akka Streams Dynamodb implementation. I guess we could use that as a basis if we'd go the polling route, right?

patriknw commented 7 years ago

ok

joost-de-vries commented 7 years ago

A quick update: I'm pretty far with implementing support for snapshots. I'm doing this on top of a new project I started so progress is not that fast but it is steady.

joost-de-vries commented 7 years ago

After implementing snapshot support #57 I started working on a read journal.

I first looked into using Dynamodb Streams api to implement this. Once you create a stream writes are written to that stream and are available for 24 hours. So the stream has to be created from the beginning that data is written in Dynamodb and has to run at all times that data is written, otherwise after 24 hours data gets lost. While the Akka Persistence Query api leaves room for instantiating the read journal and querying it at any time. So I don't think the Dynamodb Streams api is suitable for implementing a read journal. (Just as well since they are rather hand wavy about how to properly support resharding and appropriate parallellism and direct you to their Kinesis adapter)

So I've started implementing this by querying Dynamodb itself in a polling manner. The GraphLogic for filtering out 'uncommitted' batches looks rather inscrutable so I hope I'll be able to reuse that as is.

joost-de-vries commented 7 years ago

At Scala Days @patriknw mentioned that it would be nice do the query up to now using the standard polling api and from there to use Dynamodb Streams. That's a good idea. I'll implement it first with polling only. And the use of Dynamodb Streams can be added as a separate optimisation.

joost-de-vries commented 7 years ago

@johanandren @ktoso Since I finished the snapshot support I'm gearing up to implement query.

There are three things I'd like to discuss before I start:

1) Can we break this up into multiple PRs somehow? Perhaps create a query branch and implement features step by step, PR by PR. And have code review discussions piecemeal. The reason I mention this is that it's my current understanding that the optimal implementation of persistenceIds, currentPersistenceIds, eventsByPersistenceId, currentEventsByPersistenceId, eventsByTag and currentEventsByTag is not trivial to implement on Dynamodb and I think it may be quite a bit of work that requires design choices. And its better to have the work and design choices piecemeal I find.

2) I'd like to discuss the intended execution plan of queries before implementing them. Would this issue be the best venue for that? Or creating a temporary markdown file in the query branch?

3) In issue #61 I raise the question of futures vs streams and traits vs functions with implicits. And code reuse. The outcome of that discussion is obviously relevant for implementing this issue. F.i. the recovery logic is very close to eventsByPersistenceId.

joost-de-vries commented 7 years ago

@johanandren @ktoso I've been doing a lot of thinking how to implement query journal in a performant way. Here's some code exploring how to implement currentEventsByTag. The major decision is whether we keep storing the individual events only in the journal. Or whether we store them also in a 'eventsByTag' table. The latter makes it much easier and more performant to get all events in an ordered way. But if I'm not mistaken it would also very much undo the high throughput optimisations of the current implementation. Because the partitionkey would be the tag. And that would get much hotter than persistenceid as partitionkey. And even that got too hot. So I implemented it another way, using the events in the journal table and an extra 'tags' table that's only updated every 100 events. Please let me know what you think of this as a direction.

aquamatthias commented 6 years ago

@joost-de-vries I am currently evaluating dynamodb as akka persistence storage engine and as such also very interested in the read side. What is the status? Is there any progress?

joost-de-vries commented 6 years ago

@aquamatthias I think the best way to implement this is to use Dynamodb Streams. So I've done a bit of work on the side of Alpakka to get that going. Unfortunately Dynamodb Streams api is not that well documented. Currently I haven't got much time to work on it.

lutzh commented 6 years ago

It seems new versions of the akka-persistence-cassandra plugin (from 0.80) no longer use a Cassandra materialized view, but instead introduce a separate table tag_views. I'm wondering if this model is maybe easier to port to dynamodb, and if anyone has given this any thought already?

coreyoconnor commented 5 years ago

The j5ik2o dynamodb plugin implementation:

looks to use a polled scan request lifted to a stream.

coreyoconnor commented 5 years ago

Looking into this further. I agree with https://github.com/akka/akka-persistence-dynamodb/issues/50#issuecomment-302961050

some odd secondary indices may be required ;)

In terms of implementation process, I suspect it will look like so:

  1. prep
  2. currentEventsByPersistenceId
  3. currentPersistenceIds
  4. eventsByPersistenceId
  5. persistenceIds
  6. ... consider tags

each a separate PR and at least snapshot release for each.

kali786516 commented 2 years ago

I need small help converting journal event data to human readable format https://stackoverflow.com/questions/73560217/convert-akka-journal-event-columns-string-value-to-java-object who can help ?