disqus / pgshovel

A change data capture system for PostgreSQL
Apache License 2.0
11 stars 3 forks source link

Replication agnostic stream start #26

Closed Fluxx closed 9 years ago

Fluxx commented 9 years ago

Fixes #10 by doing two things:

  1. Adds an expected property to the InvalidEventError, which contains the expected valid event given the state the transaction state machine is in.
  2. Adds code to the replication.streams.kafka:KafkaStream class to "prime" the consumer stream. Priming:
    1. Attempts to validate the first message in the stream.
    2. If the message is valid, it is yielded out.
    3. If it is not valid, and the exception contains expected events, the KafkaStream peeks at the first few messages in the stream looking for one matching an expected type.
    4. If a message is found, that message is yielded and iteration of the stream picks up from there.
    5. If a message is not found the original exception is raised.

Please see #10 for more detail on the error.

Fluxx commented 9 years ago

@ivanov please look at this today. If you don't offer comments I'm just going to merge it.

tkaemming commented 9 years ago

I haven't had a chance to do a thorough readthrough of this, but isn't the priming logic orthogonal to the Kafka stream backend (e.g. if you were reading the operations from a log file that was being rotated, you'd have to account for same behavior?) To me, it seems worth adding a Primer as a separate abstraction, and finding a way to move that into a base stream abstraction. There is a lot of stuff in KafkaStream right now that isn't really Kafka-specific, this was just left as-is due to lack of time to abstract it further. (If you guys don't choose to implement this now, it'd probably be worth adding a ticket to come back and improve it later.)

Fluxx commented 9 years ago

I haven't had a chance to do a thorough readthrough of this, but isn't the priming logic orthogonal to the Kafka stream backend (e.g. if you were reading the operations from a log file that was being rotated, you'd have to account for same behavior?) To me, it seems worth adding a Primer as a separate abstraction, and finding a way to move that into a base stream abstraction. There is a lot of stuff in KafkaStream right now that isn't really Kafka-specific, this was just left as-is due to lack of time to abstract it further. (If you guys don't choose to implement this now, it'd probably be worth adding a ticket to come back and improve it later.)

Yeah agree on all accounts. I put the primer abstraction in the KafkaStream since it's the only stream that exists at present, and creating a base/abstract stream is less urgent than other work that has higher priority. I'll make a new ticket to abstract out the Steam more.

Fluxx commented 9 years ago

Alright, did the method I mentioned in earlier comments. It's less scary, more accurate and straight forward.

Fluxx commented 9 years ago

Made all the changes from review. Waiting for the squashing of commits on the replication branch, and then I will rebase replication off of it.