disqus / pgshovel

A change data capture system for PostgreSQL
Apache License 2.0
11 stars 3 forks source link

Replication agnostic stream start #30

Closed Fluxx closed 9 years ago

Fluxx commented 9 years ago

Continuation of the old (now closed) #26 pull request. The reason this one is new is that the upstream branch that #26 was based off of, new_tests was merged into the replication branch, but it is impossible to change upstream branches of a pull request once it's created. Additionally, I tried to rebase the PR's branch off of replication branch and force push it, and was unable to re-open the pull request since I force pushed and/or recreated the branch (according to the Github UI).

In light of all of this this decided the cleanest way was just to reopen the branch with a new pull request.


Fixes #10 by doing two things:

  1. Adds an expected property to the InvalidEventError, which contains the expected valid event given the state the transaction state machine is in.
  2. Adds code to the replication.streams.kafka:KafkaStream class to "prime" the consumer stream. Priming:
    1. Attempts to validate the first message in the stream.
    2. If the message is valid, it is yielded out.
    3. If it is not valid, and the exception contains expected events, the KafkaStream peeks at the first few messages in the stream looking for one matching an expected type.
    4. If a message is found, that message is yielded and iteration of the stream picks up from there.
    5. If a message is not found the original exception is raised.

Please see #10 for more detail on the error.

Additionally, when I merge this code into the replication branch I will make sure it's a fast forward.

tkaemming commented 9 years ago

Overall this looks pretty good. I think it would be best to handle the case where the primer can allowed to be run unbounded but you can practically sidestep that by setting an absurdly high value, I guess. It would probably be a good idea to add some notes to the documentation about what it means if this occurs, as well.

Fluxx commented 9 years ago

Landed in replication branch with bb475ffffa57bfe1c7513c44bb3d4fd75a95eebe.