disqus / pgshovel

A change data capture system for PostgreSQL
Apache License 2.0
11 stars 3 forks source link

Allow starting stream consumption at any point. #10

Closed tkaemming closed 9 years ago

tkaemming commented 9 years ago

Starting from a position other than the beginning breaks some assumptions in the stream validators.

Sequence Validation

Sequences are assumed to start at 0, regardless of how they were initialized. The first message in a stream should be able to start at any number, and the 0 starting point should only be enforced when a publisher change occurs.

Transaction State Validation

Any stream of events that doesn't begin with a BeginCommand will fail state validation. To avoid this, we should be able to drop messages until the first BeginCommand before yielding them to the validator.

Fluxx commented 9 years ago

Digging into this, the sequence validation issue I believe is here:

https://github.com/disqus/pgshovel/blob/replication/src/main/python/pgshovel/replication/validation/consumers.py#L77-L82

Seems to be that if the publisher of the message is different than the known one from state, and the sequence number of the message from that new publisher is not 0, then an InvalidSequenceStartError is raised. It's not immediately clear to me why this is a problem though, as I don't see any code that relies on the sequence number starting at 0, except the section that raises the error.

For transaction state validation the issue is here:

https://github.com/disqus/pgshovel/blob/replication/src/main/python/pgshovel/replication/validation/transactions.py#L141-L142

If going from None initial state, a BeginOperation event is required, however there is no guarantee that the stream actually begins with that. The actual exception (a InvalidEventError) is raised on the validate_state call in the KafkaStream:

https://github.com/disqus/pgshovel/blob/replication/src/main/python/pgshovel/replication/streams/kafka.py#L45

First thoughts on how to fix this is probably change the exception throw in this specific state (starting a transaction validator state machine) to include the expected event type (here, BeginOperation) that is required. Users who catch the exception (here a KafkaStream) can then consume messages from their internal Kafka consumer until they find the event they care about, or give up.

Fluxx commented 9 years ago

Just talked with @tkaemming and it's confirmed that the "sequence validation" issue above isn't actually an issue. The code path that checks if the sequence starts at 0 is only run when a consumer observes a publisher change, not when a consumer is first started up and the consumer_state is empty.