aio-libs / aiopg

aiopg is a library for accessing a PostgreSQL database from the asyncio
http://aiopg.readthedocs.io
BSD 2-Clause "Simplified" License
1.39k stars 159 forks source link

add support for streaming replication protocol #880

Open pianoyeg94 opened 3 years ago

pianoyeg94 commented 3 years ago

What do these changes do?

1) Add support for logical and physical streaming replication protocol.

This feature has been extensively battle-tested in 
a production environment during the last four months 
without any failures or throughput issues.

It's currently used by one of our worker services running under Uvloop, 
implementing the "Transactional Outbox/Transaction log tailing" pattern 
to provide guaranteed delivery of events/messages to RabbitMQ.

This feature also gives us the ability to reliably/atomically update business entities 
within our database and publish events related to those entities within a single transaction.
This helps with maintaining data consistency across a distributed system.

Unfortunately, RabbitMQ doesn't have plugins that provide such 
functionality, unlike, for example, Kafka with its Debezium plugin.

And this is not the only scenario in which the streaming replication protocol can be used.
For example, just off the top of my head, this can be used to continuously 
index data from Postgres into an Elasticsearch cluster and such.

2) Due to the nature of this feature, some additional configurations had to be dynamically applied to Postgres container setups during unit-testing: a) Configuring Postgres through command-line arguments and mounting a pg_hba.conf file to support logical and physical replication during testing; b) Mounting of pre-built logical decoding plugins (one 'wal2json.so' for each version of Postgres).

3) If this feature is going to be considered for merging, I would be happy to extend my pull request by writing some example scripts and adding them to the "examples" directory. Plus, fully document the feature within Sphinx.

Are there changes in behavior for the user?

1) If a user doesn't require this new functionality, then nothing changes for him. The public API of aiopg remains fully backward-compatible.

Also, internally, new code is structured in such a way that it behaves almost like a plugin - only a small part of the current codebase had to be adjusted (just a few lines of code in the Connection's _cursor() factory method).

2) On the other hand, if a user would like to take advantage of this new functionality, all he or she has to do is pass in psycopg2's LogicalReplicationConnection/PhysicalReplicationConnection type to aiopg's connect() function as a connection_factory parameter.

After this, the user gets access to the whole method set provided by psycopg2 replication objects, plus an additional message_stream() asynchronous generator, which is very convenient to use.

And of course, all cursor methods are fully asynchronous and non-blocking, compatible with both vanilla asyncio's selector-based event loops (tested on Unix and Windows) and Uvloop.

Related issue number

287

Checklist

codecov[bot] commented 2 years ago

Codecov Report

Merging #880 (0447714) into master (ed69a06) will decrease coverage by 0.10%. The diff coverage is 91.50%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #880      +/-   ##
==========================================
- Coverage   93.32%   93.22%   -0.11%     
==========================================
  Files          12       12              
  Lines        1574     1771     +197     
  Branches      187      214      +27     
==========================================
+ Hits         1469     1651     +182     
- Misses         73       77       +4     
- Partials       32       43      +11     
Impacted Files Coverage Δ
aiopg/__init__.py 100.00% <ø> (ø)
aiopg/connection.py 94.79% <91.28%> (-0.87%) :arrow_down:
aiopg/utils.py 91.81% <100.00%> (+0.38%) :arrow_up:

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update ed69a06...0447714. Read the comment docs.

pianoyeg94 commented 2 years ago

@Pliner The current unit test setup of docker containers via the pg_server pytest session-scoped fixture doesn't work well with tests that are extensively parameterized while running against multiple versions of postgres.

The current approach of using a session-scoped fixture leads to extensive thrashing of container startups and teardowns, which results in some test runs exceeding the current 15 minutes time limit per CI job.

The streaming replication protocol tests had to be parameterized to avoid a lot of boilerplate code - duplicating each test for physical and logical replication cursors.

The question is: 1) Should I introduce some boilerplate to my tests by removing parameterization and duplicating the tests for physical and logical replication cursors? 2) Or maybe you could raise the current time limit per CI job up to at least 20 minutes? 3) Should I refactor the current pytest setup of docker containers so that all of the different versions of postgres are started at the same time and remain in a running state till the end of a test run? This could potentially involve a lot of refactoring and it's not a good idea to include such stuff in a pull request that has nothing to do with providing a new unit test setup paradigm (looks like more of a separate feature to me).

pianoyeg94 commented 2 years ago

@asvetlov @jettify I'm sorry for disturbing you (I know that you're no longer actively maintaining aiopg) but I haven't gotten any answer to my above question in the comments for 11 days already. Maybe you could help me out and give some advice on what path to take to get the CI pipeline to run the current test suite all the way through? I would really like to see this "streaming replication protocol" feature as part of aiopg.

Pliner commented 2 years ago

@asvetlov @jettify I'm sorry for disturbing you (I know that you're no longer actively maintaining aiopg) but I haven't gotten any answer to my above question in the comments for 11 days already. Maybe you could help me out and give some advice on what path to take to get the CI pipeline to run the current test suite all the way through? I would really like to see this "streaming replication protocol" feature as part of aiopg.

@pianoyeg94 Sorry for the late reply.

I've increased timeout from 15 to 20 minutes https://github.com/aio-libs/aiopg/commit/f543f8c17a8108db02d8d242a87115089280a228#diff-b803fcb7f17ed9235f1e5cb1fcd2f5d3b2838429d4368ae4c57ce4436577f03fR26.

pianoyeg94 commented 2 years ago

@Pliner Thanks a lot! I've noticed some "codecov/patch" and "codecov/project" issues with my PR as well - will add some more fine-grained tests and make another push to trigger the CI pipeline again.

Pliner commented 2 years ago

@Pliner Thanks a lot! I've noticed some "codecov/patch" and "codecov/project" issues with my PR as well - will add some more fine-grained tests and make another push to trigger the CI pipeline again.

👍

Also I hope to have capacity to review next week.

pianoyeg94 commented 2 years ago

@Pliner Thanks!

pianoyeg94 commented 2 years ago

@Pliner Could you please launch the CI pipeline again. I've added some more fine-grained tests to satisfy "codecov" - just want to see if "codecov" is happy and the new 20 minute time slice per CI job solved the "long running test suites" issue (adding more tests to this feature is going to be an act of "needles super over testing"). The pipeline isn't triggered automatically after my pushes because I'm a first-time contributor.

pianoyeg94 commented 2 years ago

@Pliner Thanks a lot!

All test runs were successful, except one flaky test on python 3.8 "test_execute_timeout" (this is an old test, and it just failed by coincidence).

Also, currently, codecov's settings require me to write tests that are useless and enforce an antipattern of over-testing things. These are the 5 issues I'm currently facing with codecov: 1) it enforces me to somehow test line 831 in "connection.py" - this line is only used to mark the method as an async generator and the method itself is tested within "test_cursor:test_replication_family_standard_cursor" (line 482);

2) it tells me that line 1386 in "connection.py" isn't covered but there's a fine-grained test for it in "test_cursor:test_read_message_done_callback_verify_fd_via_select" (line 2251);

3) it also enforces me to write separate tests for lines 1501, 1518, 1519 in "connection.py" - these are all covered within a single test "test_cursor:test_echo_start_replication" (line 1068).

P.S: I'm sorry for being such a pain in the ass.