cashapp / pranadb

Apache License 2.0
614 stars 24 forks source link

Concurrency control / synchronous ingest #330

Closed gunnarmorling closed 2 years ago

gunnarmorling commented 2 years ago

First of all, congrats on the release!

IIUC, ingestion into PranaDB is via Kafka only atm., and thus by definition always async. Which makes it hard to reject events which are derived from outdated view state. Curious what your plans here are, and whether you intend to provide for rejecting such outdated events, e.g. akin to optimistic offline locks. I think this would open up using Prana for many more use cases.

purplefox commented 2 years ago

Hi Gunnar! Could you elaborate a bit more on the issue, I'm not sure I am on the same page yet. Are you referring to late events, watermarks etc?

gunnarmorling commented 2 years ago

What I'm after is means of rejecting writes which are derived from an outdated view. Say I have this sequence of events:

Process 1                                   | Process 2
SELECT value FROM BALANCE; -- returns 100   |
                                            | SELECT value FROM BALANCE; -- returns 100
Do some calculation, e.g. value = value * 2 |
                                            | Do some calculation, e.g. value = value * 3
Write back new value: 200                   |
                                            | Write back new value: 300

That write by process 2 is derived from an outdated state; the balance has changed between the time the value has been read and when it's written back. So I'd want to reject that write, indicating the stale read, resulting in process 2 to re-read the now current value and try again.

In a regular RDBMS, I could use optimistic locking for instance by adding a version counter to the table and issue updates like UPDATE balance SET value = 300 WHERE id = 123 AND version = 1 and rollback that transaction if that update doesn't yield any rows due to passing an old version value. Right now, I don't think I could do that with PranaDB as all data ingestion is via Kafka and such async by definition; i.e. even if that write-based-on-stale-view would be detected somehow within Prana itself, that'd be too late as I'd have sent my Kafka message already.

Does that make sense?

purplefox commented 2 years ago

I think you're asking about read-after-write consistency?

I.e. you publish an event to Kafka, it gets consumed by Prana and updates a bunch of materialized views. Then you want to execute a SQL query against those views to retrieve some state and you want to make sure that the event you published has been fully processed in Prana before you do that?

gunnarmorling commented 2 years ago

I think you're asking about read-after-write consistency?

Not quite, although I think that'd also be desirable. What I am looking for is a way for preventing race conditions between multiple writers. Coming back to the account balance example, let's say I have the invariant that the balance must not become negative. I have an account with current value 10. Now two concurrently running processes try to deduct 10. How can I ensure that the "deduct 10" event from one of them gets rejected, so I don't end up with a balance of -10?

purplefox commented 2 years ago

In Prana you can't insert/update/delete data directly as you can with a traditional RDBMS. All updates are done by Prana itself when ingesting data from topics. Updates to a shard are processed serially so I don't think this situation can happen.

gunnarmorling commented 2 years ago

Updates to a shard are processed serially so I don't think this situation can happen.

How would you ensure though that there aren't two concurrent processes trying to sending conflicting writes such as the two "deduct 10" events? Or are you saying that I should make sure to only have a single writer e.g. for a given account? Or perhaps it's the case that Prana isn't meant to address that sort of use case (just like Kafka)?

gunnarmorling commented 2 years ago

In Prana you can't insert/update/delete data directly as you can with a traditional RDBMS. All updates are done by Prana itself when ingesting data from topics.

Yes, understood. Is this something you consider changing down the road, and e.g. allow for synchronous insert/update/delete ops?

purplefox commented 2 years ago

Or are you saying that I should make sure to only have a single writer e.g. for a given account

Could you elaborate a bit on what you mean by "writer"?

purplefox commented 2 years ago

Just stepping back a bit....

One way to think of PranaDB is as a functional database, events stream in from Kafka, these are then processed by functional folds which output a new state. Nothing is ever mutated. Users then select data from materialized views. So there's really no concept of writer. This massively simplifies the implementation as we don't have to deal with distributed locks etc

gunnarmorling commented 2 years ago

Could you elaborate a bit on what you mean by "writer"?

A process or application interacting with Prana, issuing writes (by sending events to Kafka atm.) to it.

purplefox commented 2 years ago

To clarify:

Now two concurrently running processes try to deduct 10. How can I ensure that the "deduct 10" event from one of them gets rejected, so I don't end up with a balance of -10?

This can't happen. All updates to a MV are serialized at the shard level. There's no concept of concurrent processes writing to the same shard. All writes are done by a single "process" (goroutine) for each shard.

gunnarmorling commented 2 years ago

What I have in mind are client processes external to Prana. Say I have a clustered REST application which uses Prana as its datastore. Two nodes in that cluster receive a "deduct 10" API call at the same time. Right now, there's no way this application could ensure the account balance would never become negative: when processing the two requests, both application instances would retrieve the current balance (10), see the request they want to process (deduct 10) fits into the balance and will send the "deduct 10" event to Prana. They will be serialized there as you say, but the end result will be a balance of -10.

But from what you say, I think this sort of application as a system-of-record just isn't the intended use case for Prana right now, so one would probably use an RDBMS with transactions in front of Prana and then e.g. use CDC (Debezium) for feeding events into Prana for updating materialized views.

purplefox commented 2 years ago

Generally you want to avoid a situation where multiple "processors" can handle requests for a particular customer at the same time, otherwise you have to do some kind of locking to get consistency - this isn't just an issue with event driven systems, it's true in general.

Imho, better to design your app so this can't happen. This can be done by either a) load balancing your HTTP requests according to the customer_id so requests to deduct 10 are always handled by the same processor, and therefore serialized. OR b) (better imho) your web app receives a deduct 10 request on any instance and immediately publishes a "deduct 10 requested" command where key = customer_id. Kafka will hash that and ensure that all requests for same customer_id are on the same partition. You then have a pool of account_service instances that process the commands, check the balance and either do the deduction or reject it. All requests for a particular customer_id are always handled by the same instance so serialized.

This is a bit like the actor model - there's no shared access to mutable state so no locking is needed, and it's a very sane pattern imho when designing event driven services.

purplefox commented 2 years ago

So basically use the partitioned event bus (Kafka) to enforce serialization of processing for any particular domain entity (e.g. customer)

purplefox commented 2 years ago

One thing I want to do with PranaDB is to produce a set of example/template services that use PranaDB and implemented using Vert.x/Micronaut/whatever- that demonstrate good patterns for how to create event driven services that are loosely coupled / scalable / etc and don't need to directly mutate state at all, or do any locking.

gunnarmorling commented 2 years ago

Ok, gotcha. That approach is what I meant above with "are you saying that I should make sure to only have a single writer e.g. for a given account?"

I think something like optimistic locking is super-useful feature to have in a data store which should be used as a system-of-record, e.g. it's precluding Kafka from being used for certain applications, and single writer principle is quite impactful on application design.

In any case, feel free to close this issue if you don't think it's something you'd want to support in PranaDB. Thanks for taking the time to discuss! +1 on the examples idea (also for Quarkus), that'd be cool.

purplefox commented 2 years ago
 super-useful feature to have in a data store which should be used as a system-of-record, e.g. it's precluding Kafka from being used for certain applications

Gunnar - could you elaborate on this? I'm trying to understand a system a system where you would absolutely require this and couldn't design the requirement away.

gunnarmorling commented 2 years ago

I don't think it's an absolute requirement, but it simplifies things a lot. As you say, implementing a single writer design can solve the issue, at the price of increased complexity though (in particular for the second solution you describe, as it requires yet another intermediary processing node). Another concern is finding equally distributing sharding keys so as to avoid single processing nodes being overloaded by specific keys which receive a large share of events (although this could be a problem in any case).

Bottom line, "designing it away" sounds about what one can do, but having some means of concurrency control built into the store itself will make lifes of application developers simpler in many cases.

purplefox commented 2 years ago
at the price of increased complexity though

In what sense does it increase complexity? I'd have thought it reduces complexity as you don't have to do locking and reason about that.

purplefox commented 2 years ago

in particular for the second solution you describe, as it requires yet another intermediary processing node

You shouldn't need an extra node - you can process messages on the same nodes a your http endpoints, something like this:

// This is all on one node
{

   MyWebHandler(HttpRequest req) {
       sendCommandToKafka(NewCommand(req))
   }

   // This is the command Kafka consumer
   MyCommandHandler(Command com) {

       // This is called serially for a particular partition key
       // You don't have to worry about locking

   }

}

Imho, not having to worry about shared access to mutable state massively reduces complexity (think race conditions, deadlock, scaling of locks, retrying etc)

gunnarmorling commented 2 years ago

you can process messages on the same nodes a your http endpoints

True that, fair enough.

So let's spin this further, how would the web handler signal to the caller the outcome of the operation ("deduction ok", "deduction failed due to negative balance")? By subscribing to some Kafka response topic?

purplefox commented 2 years ago

Thinking more about this... if the logic for checking current balance is done on the writer side then some kind of locking would be required anyway.

E.g. if a request to deduct 10 came in for customer 1, balance was ok so "balance deducted 10" event was sent. Then another deduct 10 could come in for the same customer be approved, but there were no funds a the first event hadn't caused the balance to be deducted yet. So there would have to be logic in the writer to stall checks for a particular customer, until there were no outstanding unprocessed deduct events for that customer. This would essentially be implicit locking.

It seem that this kind of logic, which needs to be atomic belong in the processor - not the writer. A materialized view is a type of processor which is expressed using SQL. But we intend to allow processor to also be provided as user functions or even as gRPC endpoints which can live in your service. If the logic for checking and updating the balance lived in there Prana would ensure that it was invoked serially for the primary key.

Now it might also be possible to create MV that conditionally updates the MV if there is sufficient balance just using SQL- but would require more thought!

Another way we could handle this: When creating the MV we could provide an update_check field and the MV would only be updated if this evaluates to true. When sending the event it would include a version field which would be the current version read from the balance + 1.

create materialized view cust_balance as 
select customer_id, sum(amount), max(version_number) as version from
customer_transactions
group by customer_id
with (
  update_check = "customer_transaction.version = cust_balance.version + 1"
  error_topic = "cust_balance_failures"
)
purplefox commented 2 years ago

Gonna close this for now, but thanks for the interesting discussion @gunnarmorling Before too long we will put together a set of real examples demonstrating how these kinds of use cases can be solved effectively with event driven services.