Closed julianpistorius closed 7 years ago
Thanks Julian! I don't have any prototypes, design documents, or notes. I was thinking of something that would continue to be updated after it was initialised perhaps by subscribing to published events locally or remotely. In other words, to continue it would first need to get up to date, so there's a need somehow to initialise and then continue.
What is persisted could perhaps be persisted in the event store. But also updating something like Elasticsearch could be done in a similar way. Updating an index in Elasticsearch perhaps isn't an event sourced projection, but more directly CQRS?
There are two different ways of following an event stream remotely that are described in the Implementing Domain Driven Design book: deduplication from a message queue in the receiving context; and polling an archived log that is somehow filled with a sequence of immutable pages of domain events.
The other related aspect is getting all the events from the event store in the order they happened. There isn't a single event log, because it is distributed across the aggregates. Still, it might be useful to have a log of the event IDs.
There's quite a lot of different things mixed up there. So perhaps we could start with a concrete example, like a running count of the number of entities of a particular type? That could be developed into a persistent index that is being updated in an event driven. What do you think?
Hi John,
That sounds good. Your second-to-last paragraph reminds me of stream processing, particularly joining between streams using Kafka Streams.
I'll have a play around with the code and see if I can make a proof of concept.
Hey Julian, how's it going? did you get anywhere making a proof of concept? was just thinking about this stuff and was curious if you'd been working on it :)
Not yet. I am still planning to have a go, but I've been a bit busy. Don't let me hold anybody up though - I won't feel offended if somebody feels like having a crack at it. :)
@johnbywater I see you've been doing interesting things. :) I was looking at d5-kanban-python yesterday and how they implement projections. Looks doable.
that's great to hear @julianpistorius :) good luck!
if my memory is correct, that code uses a list of IDs when reading the full event stream to pick out all the entities with one pass through the full stream. When it has all the event for all the entities, it replays them through a different set of mutators that pertain to the event sourced projection. In our case, the events for each ID would be retrieved separately (perhaps in different threads) because they are stored separately. But then they could be replayed through a different set of mutators that pertain to a "projection". Is that how you see it?
the other thing I always starting thinking about here is materialised views, ie having a view that involves persistent state that is perhaps driven by the domain events, that perhaps needs to be initialised from an already existing event stream, something that could run remotely either by deduplicating events that received via a message queue, or by polling for archive log pages of domain events. I guess the state of the materialised view could also be determined by a sequence of events, so that it would effectively be an "event sourced materialised view". I don't know if we're also calling this an event sourced projection? It's more or less the "read model" thing. If we don't have a name for it yet, I was feeling a good name might be event sourced materialised views? Remote event propagation / read model stuff is perhaps beyond the scope of event sourcing, defined as a persistence mechanism for DDD, so it probably calls instead for a different "cohesive mechanism", and should stop bringing it up!
anyway, let me know if you want to chat about event sourced projections? :)
@johnbywater Thank you immensely, for the awesome work you're doing with eventsourcing (Python), especially the latest refactoring. Well done!
Please, do you have any recommendation on "Projection" from raw stored event stream?
Not sure if I'm right, it seems the current eventstore repository implementations ONLY allow per-entity streaming. Is it possible to get a repo that could stream and republish all event to the mutators, for custom projections? Devs can then use a custom publish-subscribe implementation, so it won't confused the default PersistencePolicy
It'll really help.
Thanks, once again!
Hey! Thanks a lot for your generous and encouraging comment!
Yes, the state of the application is partitioned across the entities. Which causes the problem of getting "everything".
A repository only ever needs events for the entity it has been asked for. Repositories only provide entities, so to get something other than just an entity, you want to use something other than just a repository.
Just as the repository gets events from the event store, we can make something that gets events for more than one entity - maybe all of them.
Looking at the existing code, it would be quite straightforward to get all the events from SQLAlchemy. I don't really know how to get them all from Cassandra. In Cassandra, there is a "token" which allows everything in a table to be retrieved, but I think they would not be in order, so might not be very useful. And getting all events in a random order, and then sorting, probably wouldn't scale very well (which is the reason for using Cassandra).
But if you are using SQLAlchemy and not Cassandra, I guess you could do something like this (I just tested this against HEAD of the "develop" branch, so pull that first from github):
from eventsourcing.tests.example_application_tests.base import WithExampleApplication
from eventsourcing.tests.sequenced_item_tests.test_sqlalchemy_active_record_strategy import \
WithSQLAlchemyActiveRecordStrategies
class TestGetAllEventFromSQLAlchemy(WithSQLAlchemyActiveRecordStrategies, WithExampleApplication):
def test(self):
with self.construct_application() as app:
domain_events = self.get_all_domain_events(app)
self.assertEqual(len(domain_events), 0)
app.register_new_example('a1', 'b1')
app.register_new_example('a2', 'b2')
app.register_new_example('a3', 'b3')
domain_events = self.get_all_domain_events(app)
self.assertEqual(len(domain_events), 3)
def get_all_domain_events(self, app):
es = app.version_entity_event_store
active_records = es.active_record_strategy.filter()
domain_events = []
for r in active_records:
i = es.active_record_strategy.from_active_record(r)
e = es.sequenced_item_mapper.from_sequenced_item(i)
domain_events.append(e)
return domain_events
You could then map
that list through a new mutate
function to project the application event stream however you wish.
Hope that helps? If so, let's try to fold this into the library? Thanks again for getting in touch!
@adebisi-fa I just pushed changeset d8f9082 which implements the code snippet above on the EventStore
class and ActiveRecordStrategy
classes. Please note, I renamed the method get_all_domain_events()
in the next changeset to all_domain_events()
. As the commit message says, from SQLAlchemy the events will be in order, but that isn't possible with Cassandra so you'd need to somehow post-process the stream if you use Cassandra and need all events in chronological order. Of course, if you just wanted to identify all entity IDs, you wouldn't need to sort them. The method all_domain_events()
returns an iterator which pulls progressively on the database, so it might scale. Is that what you were looking for?
Whao, whao, whao, @johnbywater!
It's much more than I was looking for! Really appreciate the speed, and your commitment to making "eventsourcing" work for Devs, on python. I've been rocking it on the Write-Side of a project, until I got stucked on the Read-Side (the CQRS or CReadRS needs both sides for completeness, you know).
One more thing, please. Since the processor that republishes to custom subscribe() may likely not want to pull from the beginning of the stream every time it publishes events (especially if it give itself a breather in between), it'll be an "icing" on the cake if [get_]all_domain_events() can accept a "timestamp" or "version" parameter (depending on the strategy), and pull only events from such to date.
That'll seal it up! [Of course, the tradeoff of it working ONLY with SQLAlchemy, due to the limitation with Cassandra, is bearable]
So much appreciation and regard for your effort, @johnbywater.
Thanks please! You rock!
Thanks a lot! Good point about adding parameters. This reminds me of the SequenceItemIterator
classes which address similar concerns (especially resumption), but it's not exactly the same because they only deal with items in a single sequence (entity). Here, we want to progress through all the events in all sequences. I think that means exposing the underlying token, that is the autoincrementing id
field in the SQLAlchemy model, and the underlying token in Cassandra. I think doing it by "version" parameter might not be very useful because version 1 of something could happen after version 10 of something else. It makes sense for timestamped sequences, because that would lead to a chronological order. But that order in the SQLAlchemy table would be the same as the auto-incrementing field (I would hope). Again, it just isn't possible to select like that in Cassandra. The important thing seems to be having a token that you can use to break and resume iterating through everything, right?
You are very right on the version not being usable for pulling events chronologically. There'll surely be a number of v1(s) for a multi-entity stream of events, I actually had "integer-based sequences" in mind.
The important thing seems to be having a token that you can use to break and resume iterating through everything, right?
Exactly, the point! A token, (or checkpoint) for resuming iteration after break. You're awesome @johnbywater!
Thanks, please.
Ok! I'll see what I can do along those lines :)
Sorry for the delay, am just returning to this issue. Have been writing about it here: https://github.com/johnbywater/eventsourcing/blob/feature/better_iterators/docs/topics/user_guide/remote.rst
Have also developed a class BigArray which could be a workable application log: https://github.com/johnbywater/eventsourcing/blob/feature/better_iterators/eventsourcing/domain/model/array.py#L145
Feels like a solution is emerging, but would be interested to hear your thoughts :-)
@adebisi-fa there is now an all_records() method on the active record strategies which has a "resume" arg. It generates a sequence of tuples (record, resume) and the second value in the tuple can be used to resume. Hope that helps? Maybe let me know if it works for you?
Closing this issue, because there are now the Array, BigArray, NotificationLog, and NotificationLogReader classes, from which an application log can be constructed that will allow the application stream to be projected. I'm sure there are loads of improvements, but let's register them as separate issues? Please reopen if you disagree.
Hi @johnbywater! I hope this is an appropriate place to discuss features. :) If not, maybe create a Wiki page?
I am interested in your plans for these forthcoming features:
I'd like to help out.
d5-kanban-python
projections implementation were you thinking of borrowing?Thanks!