jet / propulsion

.NET event stream projection and scheduling platform with CosmosDB, DynamoDB, EventStoreDB, MemoryStore, message-db, Equinox and Kafka integrations
https://github.com/jet/dotnet-templates
Apache License 2.0
177 stars 24 forks source link
changefeed cosmosdb cosmosdb-changefeed cosmosdb-changefeedprocessor dynamodb dynamodb-streams equinox eventstore kafka message-db projections

Propulsion Build Status release NuGet license code sizeDiscord docs status Gitpod ready-to-code

Propulsion provides a granular suite of .NET NuGet packages for building Reactive event processing pipelines. It caters for:

If you're looking for a good discussion forum on these kinds of topics, look no further than the DDD-CQRS-ES Discord's #equinox channel (invite link).

Core Components

Store-specific Components

The ubiquitous Serilog dependency is solely on the core module, not any sinks.

dotnet tool provisioning / projections test tool

Deprecated components

Propulsion supports recent versions of Equinox and other Store Clients within reason - these components are intended for use on a short term basis as a way to manage phased updates from older clients to current ones by adjusting package references while retaining source compatibility to the maximum degree possible.

Related repos

Overview

The Equinox Perspective

Propulsion and Equinox have a Yin and yang relationship; their use cases naturally interlock and overlap.

See the Equinox Documentation's Overview Diagrams for the perspective from the other side (TL;DR the same topology, with elements that are de-emphasized here central over there, and vice versa)

C4 Context diagram

Equinox focuses on the Consistent Processing element of building an event-sourced decision processing system, offering relevant components that interact with a specific Consistent Event Store. Propulsion elements support the building of complementary facilities as part of an overall Application. Conceptually one can group such processing based on high level roles such as:

The overall territory is laid out here in this C4 System Context Diagram:

Propulsion c4model.com Context Diagram

See Overview section in DOCUMENTATION.md for further drill down

QuickStart

1. Use propulsion tool to run a CosmosDb ChangeFeedProcessor or DynamoStoreSource projector

dotnet tool uninstall Propulsion.Tool -g
dotnet tool install Propulsion.Tool -g --prerelease

propulsion init -ru 400 cosmos # generates a -aux container for the ChangeFeedProcessor to maintain consumer group progress within
# -V for verbose ChangeFeedProcessor logging
# `-g projector1` represents the consumer group - >=1 are allowed, allowing multiple independent projections to run concurrently
# stats specifies one only wants stats regarding items (other options include `kafka` to project to Kafka)
# cosmos specifies source overrides (using defaults in step 1 in this instance)
propulsion -V sync -g projector1 stats from cosmos

# load events with 2 parallel readers, detailed store logging and a read timeout of 20s
propulsion -VS sync -g projector1 stats from dynamo -rt 20 -d 2

2. Use propulsion tool to Run a CosmosDb ChangeFeedProcessor or DynamoStoreSource projector, emitting to a Kafka topic

$env:PROPULSION_KAFKA_BROKER="instance.kafka.mysite.com:9092" # or use -b

# `-V` for verbose logging
# `-g projector3` represents the consumer group; >=1 are allowed, allowing multiple independent projections to run concurrently
# `-l 5` to report ChangeFeed lags every 5 minutes
# `kafka` specifies one wants to emit to Kafka
# `temp-topic` is the topic to emit to
# `cosmos` specifies source overrides (using defaults in step 1 in this instance)
propulsion -V sync -g projector3 -l 5 kafka temp-topic from cosmos

3. Use propulsion tool to inspect DynamoStore Index

Summarize current state of the index being prepared by Propulsion.DynamoStore.Indexer

propulsion index dynamo -t equinox-test

Example output:

19:15:50 I Current Partitions / Active Epochs [[0, 354], [2, 15], [3, 13], [4, 13], [5, 13], [6, 64], [7, 53], [8, 53], [9, 60]]  
19:15:50 I Inspect Index Partitions list events 👉 eqx -C dump '$AppendsIndex-0' dynamo -t equinox-test-index  
19:15:50 I Inspect Batches in Epoch 2 of Index Partition 0 👉 eqx -C dump '$AppendsEpoch-0_2' -B dynamo -t equinox-test-index

4. Use propulsion tool to validate DynamoStoreSource Index

Validate Propulsion.DynamoStore.Indexer has not missed any events (normally you guarantee this by having alerting on Lambda failures)

propulsion index -p 0 dynamo -t equinox-test

5. Use propulsion tool to reindex and/or add missing notifications

In addition to being able to validate the index (see preceding step), the tool facilitates ingestion of missing events from a complete DynamoDB JSON Export. Steps are as follows:

  1. Enable Point in Time Restores in DynamoDB
  2. Export data to S3, download and extract JSON from .json.gz files
  3. Run ingestion job

    propulsion index -t 0 $HOME/Downloads/DynamoDbS3Export/*.json dynamo -t equinox-test

CONTRIBUTING

See CONTRIBUTING.md

TEMPLATES

The best place to start, sample-wise is with the the Equinox QuickStart, which walks you through sample code, tuned for approachability, from dotnet new templates stored in a dedicated repo.

BUILDING

Please note the QuickStart is probably the best way to gain an overview, and the templates are the best way to see how to consume it; these instructions are intended mainly for people looking to make changes.

NB The Propulsion.Kafka.Integration tests are reliant on a TEST_KAFKA_BROKER environment variable pointing to a Broker that has been configured to auto-create ephemeral Kafka Topics as required by the tests (each test run blindly writes to a guid-named topic and trusts the broker will accept the write without any initialization step)

build, including tests

dotnet build build.proj -v n

FAQ

Why do you employ Kafka as an additional layer, when downstream processes could simply subscribe directly and individually to the relevant CosmosDB change feed(s)? Is it to accommodate other messages besides those emitted from events and snapshot updates? :pray: @Roland Andrag

Well, Kafka is definitely not a critical component or a panacea.

You're correct that the bulk of things that can be achieved using Kafka can be accomplished via usage of the ChangeFeed. One thing to point out is that in the context of enterprise systems, having a well maintained Kafka cluster does have less incremental (or total) cost than it might do if you're building a smaller system from nothing.

Some negatives of consuming from the ChangeFeed directly:

Many of these concerns can be alleviated to varying degrees by splitting the storage up into multiple Containers (potentially using database level RU allocations) such that each consumer will intrinsically be interested in a large proportion of the data it will observe, the write amplification effects of having multiple consumers will always be more significant when reading directly than when having a single reader emit to Kafka. The design of Kafka is specifically geared to running lots of concurrent readers.

However, splitting event categories into Containers solely to optimize these effects can also make the management of the transactional workload more complex; the ideal for any given Container is thus to balance the concerns of:

Any tips for testing Propulsion (projection) in an integration/end-to-end fashion? :pray: @James Booth

I know for unit testing, I can just test the obvious parts. Or if end to end testing is even required

Depends what you want to achieve. One important technique for doing end-to-end scenarios, especially where some reaction is supposed to feed back into Equinox is to use Equinox.MemoryStore as the store, and then wire the Propulsion Sink (that will be fed from your real store when deployed in a production scenario) consume from that using Propulsion.MemoryStore.MemoryStoreProjector.

Other techniques I've seen/heard are:

In general I'd be looking to use MemoryStoreProjector as a default technique, as it provides:

To answer more completely, I'd say given a scenario involving Propulsion and Equinox, you'll typically have the following ingredients:

  1. writing to the store - you can either assume that's well-tested infra or take the view that you need to validate that you wired it up properly

  2. serialization/deserialization - you can either have unit tests and/or property tests to validate round-tripping as an orthogonal concern, or you can take the view that it's critical to know it really works with real data

  3. reading from the store's change feed and propagating to handler - that's harder to config and has the biggest variability in a test scenario so either:

    • you want to take it out of the equation
    • OR you want to know its wired properly
  4. validating that triggered reactions are handled and complete cleanly - yes you can and should unit test that, but maybe you want to know it works end-to-end with a much larger proportion of the overall system in play

  5. does it trigger follow-on work, i.e. a cascade of reactions. You can either do triangulation and say its proven if I observe the trigger for the next bit, or you may want to prove that end to end

  6. does the entire system as a whole really work - sometimes you want to be able to validate workflows rather than having to pay the complexity tax of going in the front door for every aspect (though you'll typically want to have a meaningful set of smoke tests that validate basic system integrity without requiring manual testing or back-door interfaces)

Any reason you didn’t use one of the different subscription models available in ESDB? :pray: @James Booth

TL;DR Differing goals

While the implementation and patterns in Propulsion happen to overlap to a degree with the use cases of the ESDB's subscription mechanisms, the main reason they are not used directly stems from the needs and constraints that Propulsion was evolved to cover.

One thing that should be clear is that Propulsion is definitely not attempting to be the simplest conceivable projection library with a low concept count that's easy to get started with. If you were looking to build such a library, you'll likely give yourself some important guiding non-goals to enable that, e.g., if you had to add 3 concepts to get a 50% improvement in throughput, whether or not that's worth it depends on the context - if you're trying to have a low concept count, you might be prepared to leave some performance on the table to enable that.

For Propulsion, almost literally, job one was to be able to shift 1TB of ordered events in streams to/from ESDB/Cosmos/Kafka in well under 24h - a naive implementation reading and writing in small batches takes more like 24d to do the same thing. A key secondary goal was to be able to keep them in sync continually after that point (it's definitely more than a one time bulk ingestion system).

While Propulsion scales down to running simple subscriptions, its got quite a few additional concepts compared to using something built literally for that exact job; the general case of arbitrary projections was almost literally an afterthought.

That's not to say that Propulsion's concepts make for a more complex system when all is said and done; there are lots of scenarios where you avoid having to do concurrent/async tricks one might otherwise do more explicitly in a more simplistic subscription system.

When looking at the vast majority of typical projections/reactions/denormalizers one runs in an event-sourced system it should come as no surprise that EventStoreDB's subscription features offer plenty ways of achieving those common goals with a good balance of:

That's literally the company's goal: enabling rapidly building systems to solve business problems, without overfitting to any specific industry or application's needs.

The potential upsides that Propulsion can offer when used as a projection system can definitely be valuable when actually needed, but on average, they can equally be overkill for a given specific requirement.

With that context set, here are some notable aspects of using Propulsion for Projectors rather than building bespoke wiring on a case by case basis:

A Brief History of Propulsion's feature set

The order in which the need for various components arose (as a side effect of building out Equinox; solving specific needs in terms of feeding events into and out of EventStoreDB, CosmosDB and Kafka) was also an influence on the abstractions within and general facilities of Propulsion.

Conclusion/comparison checklist

The things Propulsion in general accomplishes in the projections space:

Things EventStoreDB's subscriptions can do that are not presently covered in Propulsion:

What's the deal with the early history of this repo?

This repo is derived from FsKafka; the history has been edited to focus only on edits to the Propulsion libraries.

Your question here

FURTHER READING

See DOCUMENTATION.md and Equinox's DOCUMENTATION.md