openstreetmap / operations

OSMF Operations Working Group issue tracking
https://operations.osmfoundation.org/
99 stars 12 forks source link

Need a new way of doing diff replication #154

Closed zerebubuth closed 3 years ago

zerebubuth commented 7 years ago

In #94, we wanted to upgrade to PostgreSQL 9.6, as this is the latest and greatest version. We were unable to, as we set an index on xid, a system column, which is no longer allowed in PG 9.6:

Disallow creation of indexes on system columns, except for OID columns (David Rowley)

Such indexes were never considered supported, and would very possibly misbehave since the system might change the system-column fields of a tuple without updating indexes. However, previously there were no error checks to prevent them from being created.

Although they've been stable and worked for us for many years, it seems they might not be a great idea.

We should think through how to do this properly. Other systems for user-space replication (i.e: not PostgreSQL-to-PostgreSQL WAL-level replication) seem to use TRIGGERs, but there might be other options. Ideally, what we'd want is:

  1. Idempotent. This means that we can get some reference (which we can store to disk) to the state "now", and diff that against a previous state and get the set of changed elements between these two states.
  2. Immutable. Once written, a state cannot be amended to include new elements. This is the problem with timestamps - we have no way to ensure that all the changes before a particular timestamp are visible to the replicator's transaction.
  3. In a commit order. Commits in PostgreSQL happen in some order which is not necessarily strict linear order but which allows for multiple orders, each of which satisfies relational integrity at the point of commit. In order that 3rd parties can apply diffs without needing to reorder them, or relax constraints, we need to be able to extract changes in one of the many orders which satisfy that.

Additional items that we may want to take the opportunity to include:

How do we do replication efficiently, retaining all the properties above, without an index on xid?

kelvich commented 7 years ago

Seems that logical decoding conforms all requirements. In 9.6 there is only sender part and several example in contrib/ about how to implement decoding protocol (binary/json). In 10.0-devel also receiving part was recently commited (logical replication ).

akorotkov commented 7 years ago

Yeah, indexes on xmin were possible, but they never worked properly. For instance, if user update non-indexed field, then you could have heap only tuple (HOT) update. So, you would see xmin updated, but xmin index wouldn't be updated accordingly. Thus, xmin index would just return wrong results to you.

However, it's still possible to you to make an index on xmin: you can have user-defined field maintained by trigger. See following example.

CREATE FUNCTION set_vxmin() RETURNS trigger AS $$
BEGIN
    NEW.vxmin = txid_current();
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

create table test (id serial primary key, value text not null, vxmin int8 not null);

CREATE TRIGGER test_set_vxmin_trigger
BEFORE INSERT OR UPDATE ON test
    FOR EACH ROW EXECUTE PROCEDURE set_vxmin();

insert into test (value) values ('aaa'), ('bbb'), ('ccc');

select * from test;
 id │ value │ vxmin
────┼───────┼───────
  1 │ aaa   │   881
  2 │ bbb   │   881
  3 │ ccc   │   881
(3 rows)

update test set value = 'cccc' where id = 3;
UPDATE 1

begin;
BEGIN

update test set value = 'bb' where id = 2;
UPDATE 1

rollback;
ROLLBACK

select * from test;
 id │ value │ vxmin
────┼───────┼───────
  1 │ aaa   │   881
  2 │ bbb   │   881
  3 │ cccc  │   882
(3 rows)

select * from test where vxmin > 881;
 id │ value │ vxmin
────┼───────┼───────
  3 │ cccc  │   882
(1 row)

explain select * from test where vxmin > 881;
                                  QUERY PLAN
───────────────────────────────────────────────────────────────────────────────
 Bitmap Heap Scan on test  (cost=7.07..21.79 rows=377 width=44)
   Recheck Cond: (vxmin > 881)
   ->  Bitmap Index Scan on test_vxmin_idx  (cost=0.00..6.98 rows=377 width=0)
         Index Cond: (vxmin > 881)
(4 rows)

As bonus txid_current() returns you int8. Therefore, you wouldn't have wraparound problems.

tomhughes commented 7 years ago

So aside from the fact that it would work that will also allow us to get rid of the xid_to_int4 function that we currently have to create.

Downside are the need for 8 extra bytes on every row in the nodes, ways and relations table (estimated at 50Gb total) and the question of what the performance impact of the trigger might be.

Currently the xmin indexes are a hack that are on the production database but aren't part of the official schema, which we might have to reconsider if we're going to start adding an actual field.

pnorman commented 7 years ago

For instance, if user update non-indexed field, then you could have heap only tuple (HOT) update

I don't think that ever happens on the tables replication uses, which is why we've not had those problems. Unredacting something might be a HOT update but we've never done that for the fear that it wouldn't work properly with replication

A couple of additional requirements for what we use are

One bug believed to be in the current replication is that if something were unredacted the old object would make it into the replication stream.

Logical decoding seems like the best solution, but triggers seem like they'd be less development to get working since an additional column is closer to what we currently have.

the question of what the performance impact of the trigger might be.

Probably not much. It doesn't require another disk read/write and the trigger is pretty simple. Given everything else involved in diff uploads (network, rails, etc) it'd be minor.

zerebubuth commented 7 years ago

Downside are the need for 8 extra bytes on every row in the nodes, ways and relations table (estimated at 50Gb total) and the question of what the performance impact of the trigger might be.

Currently the xmin indexes are a hack that are on the production database but aren't part of the official schema, which we might have to reconsider if we're going to start adding an actual field.

I was thinking we wouldn't want to alter the table definition. Instead we'd do something more like:

create table replication_updates
  (nwr nwr_enum, id bigint, version bigint, txid bigint);
create index replication_update_txid_idx
  on replication_updates(txid);

create function replicate_node() as $$
begin
  insert into replication_updates(nwr, id, version, txid)
    values('node'::nwr_enum, OLD.node_id, OLD.version, current_txid());
end;
$$ language plpgsql;

create trigger replicate_nodes after insert on nodes
  for each row execute procedure replicate('node');

-- ... etc ...

Or possibly have a separate table per-type, which would make it easier to include changesets and changeset discussions in this scheme. Additional columns might be necessary to do redactions, or we could just have a redaction_updates table for that.

Another advantage of having a separate table (or tables) beyond not modifying the schema that Rails is maintaining is that we can potentially drop older rows that we no longer need - e.g: look at the state file from a month ago and delete from replication_updates where txid < last_month_complete_txid;

Komzpa commented 7 years ago

Instead we'd do something more like:

This is going to be a hell of write amplification. You're going to write not just 8 bytes per entity, but 24 (tuple header) + 8 (txid) + 8 (object id) + 8 (version), all this in addition to index on txid and a level of indirect lookup to have a look for the object itself, on read phase.

I think it's time to stop complicating the system and look for cleaner approaches.

Can we do that on application level? Do we have to depend on database implementation? DB-modifying queries send data back to user, this can be collected and concatenated.

Can we refactor API to make changesets atomic? Or a pair of 'changeset+api_call_seq_id'. Then we won't have to depend on database transaction, since we'll have our own. This can be done without breaking API0.6 compatibility.

zerebubuth commented 7 years ago

This is going to be a hell of write amplification.

The node record is already approximately 72 bytes, so we're talking about an amplification around 11% for adding a bigint or 67% for adding a row in a separate table. This is on the assumption that there are no tags for that node, and indeed many nodes have no tags. But many do, and ways have nodes, and relations have members. So the average write-amplification is probably lower.

It would be useful to see some benchmarks based on reasonable assumptions, and I think it would be premature to rule an approach out before that had been done. But it might be an argument in favour of the logical replication approach.

Can we do that on application level?

Theoretically, yes. In practice, I think the website code depends very strongly on the behaviour of the database. In particular, we depend on the database to assign each edit a position in some history of edits subject to constraints.

DB-modifying queries send data back to user, this can be collected and concatenated.

This creates the failure mode where the database commits, sends data back to the website instance, which then crashes before it collects and forwards the data to whatever else is collecting all the edits to make the diffs. In effect, it would mean the website instances would not be stateless and mean they were part of the system of record. If we implement a recovery system to fall back to the database, then we may as well use that as the primary system.

The replication output uses local disk in a similar way, which introduces a failure mode when the "services" machine is down or we swap to another site. That was the motivation for me adding the "distributed operation" item to the would-like-to-have.

Can we refactor API to make changesets atomic? Or a pair of 'changeset+api_call_seq_id'.

We effectively already do have an api_call_seq_id; Postgres calls it txid. By using this system, it means we don't have to reinvent it for ourselves. Postgres gives us not only a handy, compact edit IDs, but also the visibility functions to determine the current value of "now" in terms of those IDs.

pnorman commented 7 years ago

The node record is already approximately 72 bytes, so we're talking about an amplification around 11% for adding a bigint or 67% for adding a row in a separate table. This is on the assumption that there are no tags for that node, and indeed many nodes have no tags. But many do, and ways have nodes, and relations have members. So the average write-amplification is probably lower.

I wouldn't measure amplification bytes, but in ios. Any write is going to be at least one page, and even with multiple nodes, the cost difference between the two options is going to be the number of operations, not the size, even on NVMe.

What I like about the separate table is the ability to trim it, getting rid of no longer required data. Aside from that, it's the standard question of what level of normalization to apply to your database schema.

It would be useful to see some benchmarks based on reasonable assumptions, and I think it would be premature to rule an approach out before that had been done. But it might be an argument in favour of the logical replication approach.

I think the logical replication approach a superior architecture in every way, it just requires more work because it can reuse less of our current system.

zerebubuth commented 7 years ago

I think the logical replication approach a superior architecture in every way, it just requires more work because it can reuse less of our current system.

I worry that the logical replication system is so new that we'll hit some problems; either writing, building, running the system - or when we come to upgrade and find it's no longer supported or has changed significantly between releases. I don't like the "code in the database" aspect of TRIGGERs, but at least they're a very stable, very well-tested part of PostgreSQL.

The "cleaner" architecture of logical replication attracts me, too. But, as @Komzpa mentioned, depending on the database implementation at such a low level might restrict our options in the future, or make change more difficult. Other database engines might have equivalents of txid, but I don't know how many would have equivalents of logical replication.

pnorman commented 7 years ago

I've discussed discussed what we're doing at a few Postgres meetups. The conclusions reached based on those discussions are

I think the following are the two best options

  1. Add a trigger and created_txid column, switch osmosis to them, then drop the txid to int function and txid index. This is the fastest way to let us upgrade because the changes to the DB and osmosis are both very small.

  2. Pay someone to write code that does diffs from the logical replication stream. I don't see any of our existing devs have the combination of knowledge, interest, and free time to implement this.


One drawback of logical replication not mentioned so far is it would still require a connection to the DB for information like user_names which isn't being modified but is still needed to produce the XML output

mmd-osm commented 7 years ago

@pnorman : if at all possible at this stage, could you maybe also outline in a few words, how each option would impact minutely diff consumers, if at all. Will state and diff file stay as is, or do you expect some (maybe subtle) changes?

pnorman commented 7 years ago

I wouldn't tie this to any differences to consumers. Its possible we might find subtle undocumented oddities which don't get reproduced. It might also make sense to run both systems in parallel and change the replication URL.

Its premature to plan too much on any changes because we don't have a decided approach.

jburgess777 commented 6 years ago

@zerebubuth I dug into the Osmosis source and believe that your existing osmlogical plugin probably produces enough information for it to replace the XID usage in osmosis.

I don't think it is possible to provide the all the data needed to generate the change files from just the logical stream. As was mentioned earlier, the user information typically won't be present. I also think the object/tag relationship causes problems. The postgres docs make me think that the logical plugin cannot make its own queries to lookup this information so I think the stream consumer will need to make its own queries to fill in the missing data.

The piece of osmosis code which needs to be replaced appears to be getHistory(ReplicationQueryPredicates). Currently this takes in XIDs information and uses it determine which objects have changed. The function places the object ids & versions in selectedEntityTableName.

This code gets called three times, for each of: nodes, ways & relations. Once this data is collected it then gets fed to various other routines which join it with the tables to derive the complete information each object.

My guess is that we could write a new getHistory() which takes in a LSN range and use this and the osmlogical plugin to populate selectedEntityTableName with the equivalent object ID + version information that is currently gathered using XIDs. Once we have that information the rest of the code should work as it currently does.

The predicates passed into getHistory are created in buildQueryPredicates(). Currently these are based on the XIDs in the state file and data returned by txid_current_snapshot().

The postgres docs imply that the LSNs references used by logical replication give us a nice linear view of the commit history without the complexity of excluding transactions in progress as happened with XIDs. It does not look difficult adjust the osmosis code to track state using LSNs instead, the code should be simpler.

I see that the JDBC has existing support for interacting with a logical replication stream: JDBC Replication

Komzpa commented 6 years ago

@jburgess777 Can object/tag relationship issue be resolved by burning tags into node/way/relation object as a jsonb column? This would also save database IO for object reads, and space due to less tuple headers.

zerebubuth commented 6 years ago

I don't think it is possible to provide the all the data needed to generate the change files from just the logical stream.

I agree. The information we can get out of the logical replication looks like:

openstreetmap=# SELECT * FROM pg_logical_slot_get_changes('replication_slot', NULL, NULL);
 location   |  xid  |     data
------------+-------+---------------
59/DE3EF8C0 | 53776 | BEGIN
59/DE3EF8C0 | 53776 | NEW nodes 1 3
59/DE3F0150 | 53776 | NEW nodes 2 3
59/DE3F02D0 | 53776 | NEW nodes 3 4
59/DE3F0450 | 53776 | NEW nodes 4 3
59/DE3F07A8 | 53776 | COMMIT
(6 rows)

Following this, we need a 2-step process. First, something needs to buffer and batch committed changes into sets of updated node, way and relation versions. Second, something needs to "decorate" those with additional data from the database and write out osmChange diffs.

The first thing needs to remember the mapping from xid to the elements which changed, and mark them as ready when it sees the COMMIT. These are not guaranteed to happen in the same call to pg_logical_slot_get_changes, unlike the example above. It's also possible that there is a ROLLBACK instead of COMMIT, in which case we'd want to drop the changes.

Since the "changes" records are a continuous stream, it will in general be impossible to find synchronisation points at which there are no open transactions. This means we'll need to buffer the "changes" somewhere until we see the COMMIT (and/or hit a replication interval - minutely or whatever).

We'll need to make sure that the buffer is fully committed to storage before ACKing the stream to ensure that we don't drop changes. (The replication slot can't go backwards, as far as I can see.) The best way seems to be to peek at the changes with pg_logical_slot_peek_changes, commit them to disk, then call pg_logical_slot_get_changes to consume the changes.

Once we have a committed set of changes on disk, it's a simple matter for the second thing to query the database to find out the metadata, tags, node position, way nodes, relation members, etc... These are mostly append-only and immutable (with the exception of redaction information).

To my mind, the biggest questions are:

  1. Testing. How do we make sure that this behaves the way we expect?
  2. Fail-over. The current WAL-level replication ensures that xids are the same on the master and all replicas, and we store in-flight xids in the state.txt file. This means we can (in theory) fail-over replication without missing any changes. It's not clear to me that the same is true of the logical replication state (i.e: slot position) and the logical replication state is opaque and managed by PostgreSQL internally. How can we reassure ourselves that we'll be able to fail-over replication without missing or double-counting changes?
  3. Stable storage. Committing to local disk is easy using something like SQLite. However, having stuff committed on the local disk of a failed machine, or one at a down site, is pretty useless. Should we be committing to distributed storage (i.e: #169 or etcd) instead?
  4. Redactions. The logical replication should be able to handle redactions, and other useful stuff such as changeset metadata updates, which are currently more difficult to handle. Should we build that in from the beginning?

Overall, I'd prefer not to shoe-horn this into Osmosis if it's just to save the small amount of time necessary to write some code to take a set of node, way and relation versions and output osmChange diffs. If that's the blocker, I'll happily volunteer to write that small piece.

Can object/tag relationship issue be resolved by burning tags into node/way/relation object as a jsonb column?

Yeah, I think it's a good idea. If we could enforce referential integrity (with a custom CONSTRAINT, perhaps?) then I think we should inline way nodes and relation members too.

However, I think that's orthogonal to the central difficulties of replication and, with the current setup, would be a migration requiring significant downtime. Since the migration would have to write every row in both the current and historical way, node and relation tables, it seems like it would take a very long time, and Rails doesn't (AFAIK) allow running migrations concurrently with normal operations.

To avoid the lengthy migration, we could make another use of PostgreSQL's logical replication to keep a master and replica in-sync with different schemas (since they are informationally identical). Then we could do the software switch and fail over to "upgrade" in a matter of minutes rather than hours or days.

jburgess777 commented 6 years ago

The first thing needs to remember the mapping from xid to the elements which changed, and mark them as ready when it sees the COMMIT. These are not guaranteed to happen in the same call to pg_logical_slot_get_changes, unlike the example above. It's also possible that there is a ROLLBACK instead of COMMIT, in which case we'd want to drop the changes.

What I read implied that ROLLBACK was a non-issue and we would only see data once the commit had been issued:

Output Plugin Callbacks Concurrent transactions are decoded in commit order, and only changes belonging to a specific transaction are decoded between the begin and commit callbacks. Transactions that were rolled back explicitly or implicitly never get decoded.

In the description of pg_logical_slot_get_changes() it seemed to imply we would always see the data for complete transactions returned in one go. It even states that it exceeds the "upto_nchanges" you specify to ensure the complete data is returned:

If upto_nchanges is non-NULL, decoding will stop when the number of rows produced by decoding exceeds the specified value. Note, however, that the actual number of rows returned may be larger, since this limit is only checked after adding the rows produced when decoding each new transaction commit.

You wrote:

Since the "changes" records are a continuous stream, it will in general be impossible to find synchronisation points at which there are no open transactions. This means we'll need to buffer the "changes" somewhere until we see the COMMIT (and/or hit a replication interval - minutely or whatever).

This is not how I read things. I think we see a nice stream of commits. Each time we are notified about a commit, we get the entire data for that transaction.

We'll need to make sure that the buffer is fully committed to storage before ACKing the stream to ensure that we don't drop changes. (The replication slot can't go backwards, as far as I can see.) The best way seems to be to peek at the changes with pg_logical_slot_peek_changes, commit them to disk, then call pg_logical_slot_get_changes to consume the changes.

The JDBC code accesses the logical replication stream via the streaming interface and not via the SQL functions like pg_logical_slot_peek_changes(). This allows it greater flexibility than the SQL interface provides. My reading is that we can go backwards but only in a limited window, the low level command to begin stream has a start LSN argument and comments in the code say:

logicalfuncs.c

Callers on the walsender interface are expected to keep track of their own progress and don't need it written out. But SQL-interface users cannot specify their own start positions and it's harder for them to keep track of their progress, so we should make more of an effort to save it for them.

This start position is used and exposed in pg_recvlogical: https://github.com/postgres/postgres/blob/master/src/bin/pg_basebackup/pg_recvlogical.c#L234

You are correct that we need to be careful when to ACK the data, once it has been ACKd we might not be able to go back to it again. The obvious point at which to issue the ACK is after we finished writing and syncing the changes to disk but I think there might be a better option.

I think we should delay the ACK until the next time we generate a changeset: at the start of generating the changeset for N minutes, we start be issuing the ACK for data processed at N-1 (based on what we read from the last state file). This gives us the chance of detecting bad output data and repeating the change generation again.

  1. Testing. How do we make sure that this behaves the way we expect?

Osmosis has test cases. For the APIDB change generation they seem to apply one change to the DB and verify that the change file written out matches what was applied.

  1. Fail-over. The current WAL-level replication ensures that xids are the same on the master and all replicas, and we store in-flight xids in the state.txt file. This means we can (in theory) fail-over replication without missing any changes. It's not clear to me that the same is true of the logical replication state (i.e: slot position) and the logical replication state is opaque and managed by PostgreSQL internally. How can we reassure ourselves that we'll be able to fail-over replication without missing or double-counting changes?

My reading of the logical replication suggests that the WAL LSNs are kept in sync across the replicas and that the logical decoding references LSNs from the WAL. Provided we store the LSN in the state file then I think this allows us to generate complete data with no duplicates. I agree we need to think more about how a master/slave switch affects the change generation.

  1. Stable storage. Committing to local disk is easy using something like SQLite. However, having stuff committed on the local disk of a failed machine, or one at a down site, is pretty useless. Should we be committing to distributed storage (i.e: #169 or etcd) instead?

This is an interesting question but I think that how and where the data ends up being stored can be handled at another layer, e.g. after the changes are generated they can be copied or moved somewhere else.

  1. Redactions. The logical replication should be able to handle redactions, and other useful stuff such as changeset metadata updates, which are currently more difficult to handle. Should we build that in from the beginning?

Do we have a description of what happens today and what problems need to be solved? Today I don't think osmosis cares about changesets at all.

Overall, I'd prefer not to shoe-horn this into Osmosis if it's just to save the small amount of time necessary to write some code to take a set of node, way and relation versions and output osmChange diffs. If that's the blocker, I'll happily volunteer to write that small piece.

I think the amount of change needed in osmosis is pretty small and should allow everything around it to stay the way it is. About the only externally visible change should be the addition of an LSN reference into the state file.

I agree that we could adapt other code we have for reading the DB to perform the same work of determining the changes and writing them out along with a state file.

One advantage from breaking away from Osmosis is that the usage of the temporary table could be removed. If I recall correctly, this is what current prevents it being run against a DB slave?

I think we should inline way nodes and relation members too.

I think we should try to keep to the existing schema. It allows the old and new schemes to be run in parallel for a while to verify consistency.

zerebubuth commented 6 years ago

Transactions that were rolled back explicitly or implicitly never get decoded.

Ah, you're right. I should have read that more carefully. That makes the whole thing a lot easier.

Stable storage. Committing to local disk is easy using something like SQLite. However, having stuff committed on the local disk of a failed machine, or one at a down site, is pretty useless. Should we be committing to distributed storage (i.e: #169 or etcd) instead?

This is an interesting question but I think that how and where the data ends up being stored can be handled at another layer, e.g. after the changes are generated they can be copied or moved somewhere else.

This is a problem that we have with the current replication system: Osmosis stores the state.txt to the filesystem on a single computer. When that computer goes down, or the site becomes unreachable, then we have a problem: the only copy of the state has disappeared.

If the LSN is the only state we need, then that simplifies matters greatly, and having #169 would make this a simple matter of storing to the object store instead of local disk (with a careful reading of the consistency guarantees provided by the object store). It's not a hard requirement to commit to stable storage, but I don't think ignoring it entirely is a good idea.

I think we should inline way nodes and relation members too.

I think we should try to keep to the existing schema. It allows the old and new schemes to be run in parallel for a while to verify consistency.

Sorry, I should have been clearer: I think we should do it, but not now, and not in relation to the changes we're making to the diff replication.

jburgess777 commented 6 years ago

I updated osmosis to consume and use the logical replication stream from the osm-logical plug-in and it seems to pass some simple tests. The code can be found on the branch below as well as README describing the setup steps and the results:

jburgess-logical-replication branch README-logical-replication.txt

This code is meant as a 'proof of concept' and is not intended to be deployed in its current state.

mmd-osm commented 6 years ago

I think we see a nice stream of commits. Each time we are notified about a commit, we get the entire data for that transaction.

I have a question on how long running transactions are handled.

In today's replication, an external consumer of diff files would see an indication of such in-flight transactions in state.txt in txnActiveList. While those numbers don't make much sense on their own, an absense of any such transaction number would imply, that the current diff contains all data up to including the current point of time. That's an important information for downstream applications like Overpass Augmented Diffs, which aim at publishing all changes for a given point in time.

Will the new mechanism provide a similar indication about the existance of in-flight transactions?

If not, this would severely impact such data consumers, as you cannot be sure that the next minutely diff would suddenly return data that is 2 hours old, but was still part of an in-flight transaction right now.

(Please see https://github.com/drolbr/Overpass-API/issues/346 for a rather lengthy discussion of this topic.)

eulerto commented 6 years ago

Will the new mechanism provide a similar indication about the existance of in-flight transactions?

In logical replication, transactions are decoded at commit order. It means that if the transactions was not committed yet, it was not available to the consumers/replicas. The in-flight transactions info are only available in the master server.

If not, this would severely impact such data consumers, as you cannot be sure that the next minutely diff would suddenly return data that is 2 hours old, but was still part of an in-flight transaction right now.

I don't know the details of mentioned applications but why bother with a long transaction that you are not sure that it will complete? Also, long transactions are evil for databases.

zerebubuth commented 6 years ago

That's an important information for downstream applications like Overpass Augmented Diffs, which aim at publishing all changes for a given point in time.

Absolutely. There are two problems here, both of which are orthogonal to this particular issue, and could be solved independently:

  1. The frontends process diff uploads in a streaming fashion, allowing a slow client to arbitrarily delay transaction commit. This seemed like a good idea at the time, on the basis that it kept memory overheads low and allowed bigger diffs to be uploaded. But now I think it would be better to buffer the upload either on disk or in memory and process it only after it has been fully uploaded. This would mean the transaction would be open for a much shorter length of time - seconds, rather than minutes.
  2. The timestamps are assigned to elements by the frontend, at the wall-clock time that they were parsed. Instead, they should use current_timestamp from the database to be consistent.

The first item is more important, the second one is likely less beneficial under normal circumstances, since all the servers are synced to within a few milliseconds by NTP anyway.

If anyone wants to have a go at either of those and there's any way I can help, please let me know.

mmd-osm commented 6 years ago

Just a quick comment, as I don't want to hijack this issue...

The frontends process diff uploads in a streaming fashion, allowing a slow client to arbitrarily delay transaction commit.

Agreed that we need to do further analysis with regards to long running transactions. It seems to me that upload (or the framework calling upload) collects the full client provided payload already in memory (request.raw_post), before starting the update processing via DiffReader, only which is part of a transaction. DiffReader does indeed streaming processing of the XML document string to reduce memory footprint. So a slow client shouldn't be causing those long running transactions.

zerebubuth commented 6 years ago

It seems to me that upload ... collects the full client provided payload already in memory (request.raw_post), ... So a slow client shouldn't be causing those long running transactions.

You're right! That's a surprise to me, it wasn't what I originally intended. But now I'm confused; I thought that was the most likely culprit. I've added https://github.com/openstreetmap/openstreetmap-website/issues/1710 to follow up, since it seems like more of a software bug to me (but I could be wrong).

jburgess777 commented 6 years ago

In today's replication, an external consumer of diff files would see an indication of such in-flight transactions in state.txt in txnActiveList.

In theory we could also include the txnActiveList as before but it might not be precisely consistent with the data that the change file includes.

suddenly return data that is 2 hours old

From the point of view of the database, the information doesn't really exist for any external consumer until it is committed so it isn't really 2 hours old. Also, if the commit never happens then any changes which were part of that transaction effectively vanish, as if they never existed. We cannot go back and rewrite the previous changes to include edits which have only now been committed. We also don't want to hold up the entire replication stream until any open transactions are committed.

jburgess777 commented 6 years ago

One area of the implementation which probably needs more thought is whether we should lookup the changes inside a transaction or not. The old replication code worked by starting a transaction, querying the transaction XIDs and then processing this snapshot of data.

In the new code, we receive a stream of logical changes. In the code branch I mentioned in an earlier comment I removed the transaction. When we see a change in the logical stream it should be visible to us in the DB.

Another option would be to start a transaction and then only attempt to use the logical messages for transactions which committed prior to ours. This might mean falling back to looking at the XIDs in a similar way to the old code. When we see a logical change for transaction which is newer than ours then we should probably end this state file. One option might be to commit our own empty transaction, we could then easily identify when this appears in the logical decode stream.

pnorman commented 6 years ago

If not, this would severely impact such data consumers, as you cannot be sure that the next minutely diff would suddenly return data that is 2 hours old, but was still part of an in-flight transaction right now.

Even with the changes discussed to reduce long running transactions, this will always be true.

The following sequence of events is always possible

  1. Start transaction 1
  2. Start transaction 2
  3. Finish transaction 2
  4. Generate minutely diff 1
  5. Finish transaction 1
  6. Generate minutely diff 2

This is inherent in MVCC and committing stuff in parallel, and not out of order. Transaction 2 exists for other people before transaction 1 does.

Conflicts aren't a problem either because one of the transactions would fail, and never get seen.

So although I'm glad to see long-running transactions reduced for performance reasons, they can't have any impact on correctness of minutely diffs.

zerebubuth commented 6 years ago

When we see a change in the logical stream it should be visible to us in the DB.

I agree. As you pointed out before, we only see changes which have been committed, so we should see the data as it was committed unless a later (also committed) transaction altered it.

Such alterations should be rare, since the nodes, ways and relations tables are insert-only and, apart from redactions, immutable. If it does happen that we see an (id, version) pair in the logical stream and not in the database, I think that would be abort condition.

PostgreSQL does have the ability to synchronise snapshot views in transactions. But I'm not sure this is helpful to us, since the version of the data we want to synchronise with was visible in the WAL rather than a user session.

mmd-osm commented 6 years ago

@pnorman:

So although I'm glad to see long-running transactions reduced for performance reasons, they can't have any impact on correctness of minutely diffs.

Right. Although I'm somewhat confident that we can significantly reduce the runtime of those long running transactions down to a few seconds, that sequence of events is still perfectly valid.

In the redesigned bulk upload, the changeset would be locked all the time, and could be an indication of an ongoing transaction. As noted before, evaluating this lock won't work, as the replication stream processing will start several seconds later only.

The only option I could think of would be to introduce a new "start of new upload timestamp" entry in a dedicated table, immediately commit this row, and then continue with the usual upload processing.Once the upload transaction finishes or gets rolled back, the marker would have to be removed again.

During stream replication processing in osmosis, we could query this table (+ checking the current replication stream?) for all markers with ongoing transactions and could derive the "earliest timestamp" up to which transactions are included in the change file. Or in other words, we guarantee that no subsequent diff file will ever include data before this earliest timestamp.

I know, this is quite a pain, and probably not worthwhile... YMMV.

@jburgess777:

I updated osmosis to consume and use the logical replication stream from the osm-logical plug-in and it seems to pass some simple tests. The code can be found on the branch below as well as README describing the setup steps and the results: This code is meant as a 'proof of concept' and is not intended to be deployed in its current state.

I tested this code locally (JOSM --> mass upload to openstreetmap db --> osmosis w/ stream replication -> state files --> minutely diff upload in Overpass API), and it looks good so far. One thing I noticed was the long processing time even with only a few thousand changed nodes. I guess this could be improved by using bulk loading?

Yesterday I was somehow under the impression that one relation without members was missing in the diff files, but unfortunately couldn't reproduce it anymore today. If others are testing, maybe keep a close look on what's in the diff file.

mmd-osm commented 6 years ago

Looking back in this issue, the option to use triggers was mentioned several times, e.g. in https://github.com/openstreetmap/operations/issues/154#issuecomment-326544587.

There were some concerns that this might create a huge load on the DB. For this reason I took a look at all minutely diffs created during the past 35 days and evaluated the number of objects affected in each file. This figure should correspond to the number of rows we would have to insert into a dedicated replication table during each minute, as outlined in https://github.com/openstreetmap/operations/issues/154#issuecomment-286087517

Object ids affected per minute during the last 35 days (cumulative histogram):

min=0, average=2201, max=34740 objects / min

grafik

These figures seem fairly small to me after all.

This is the fastest way to let us upgrade because the changes to the DB and osmosis are both very small.

What would be the actual steps to have something testable based on db triggers?

Extending a bit on what @zerebubuth posted earlier, we would need to define a new replication table, some triggers.

SQL statements to create db trigger (click to expand me) ```sql create table replication_updates (nwr nwr_enum, id bigint, version bigint, txid bigint); create index replication_update_txid_idx on replication_updates(txid); DROP TRIGGER IF EXISTS replicate_nodes ON nodes; DROP TRIGGER IF EXISTS replicate_ways ON ways; DROP TRIGGER IF EXISTS replicate_relations ON relations; CREATE OR REPLACE FUNCTION replicate_nodes() RETURNS trigger as $replicate_nodes$ BEGIN INSERT INTO replication_updates(nwr, id, version, txid) values('Node'::nwr_enum, NEW.node_id, NEW.version, txid_current()); RETURN NEW; END; $replicate_nodes$ language plpgsql; CREATE OR REPLACE FUNCTION replicate_ways() RETURNS trigger as $replicate_ways$ BEGIN INSERT INTO replication_updates(nwr, id, version, txid) values('Way'::nwr_enum, NEW.way_id, NEW.version, txid_current()); RETURN NEW; END; $replicate_ways$ language plpgsql; CREATE OR REPLACE FUNCTION replicate_relations() RETURNS trigger as $replicate_relations$ BEGIN INSERT INTO replication_updates(nwr, id, version, txid) values('Relation'::nwr_enum, NEW.relation_id, NEW.version, txid_current()); RETURN NEW; END; $replicate_relations$ language plpgsql; CREATE TRIGGER replicate_nodes AFTER INSERT ON nodes FOR EACH ROW EXECUTE PROCEDURE replicate_nodes(); CREATE TRIGGER replicate_ways AFTER INSERT ON ways FOR EACH ROW EXECUTE PROCEDURE replicate_ways(); CREATE TRIGGER replicate_relations AFTER INSERT ON relations FOR EACH ROW EXECUTE PROCEDURE replicate_relations(); ```


Example output in replication table: (click to expand) | nwr | id | version | txid | | ---------- | ------------- | --------- | -------- | | Node | 5001127976 | 1 | 935204 | | Node | 5001127977 | 1 | 935204 | | Way | 4000036780 | 1 | 935204 | | Node | 5001127978 | 1 | 935219 | | Node | 5001127986 | 1 | 935219 | | Way | 4000036781 | 1 | 935219 | | Way | 4000036782 | 1 | 935219 | | Relation | 80000000072 | 1 | 935219 | | Relation | 80000000073 | 1 | 935219 |


What would be needed on osmosis side for this to work? Today's logic seems to be in org.openstreetmap.osmosis.apidb.v0_6.impl.EntityDao, method getHistory. Instead of going directly to the entityName table and selecting via xid_to_int4(xmin) we would have to select txid on the new replication table and do some inner join based on id & version with the actual entityName table.

I created a small proof of concept prototype using this approach for further discussion: https://github.com/mmd-osm/osmosis/tree/patch/trigger_replication

For testing, just execute the SQL statements to create the db triggers once prior to running the replication.

woodpeck commented 6 years ago

As a side note, this change might also help us solve a problem that exists with redactions.

A redaction is the only event where an older version of an object is changed in the database (by applying a redaction_id to it). Due to the way Osmosis queries modified objects, such a redaction would have led to the old, redacted object being put into the replication feed. Therefore we added ".. and redaction_id is null" to the respective queries in Osmosis. This solves redactions, but does not solve the - currently hypothetical - case of an un-redaction: If we were to set the redaction_id of an old object to NULL, that old version of the object would then land in the replication stream and potentially confuse consumers.

If we switch to a trigger-based system, and do not execute the trigger on any changes to redaction_id, then un-redactions would become possible without fuss.

gravitystorm commented 6 years ago

Of course, if we unredact an object and don't put it in the replication, then we'd still confuse consumers since they wouldn't have that data. No easy way to deal with unredactions.

mmd-osm commented 6 years ago

If you start out with a full planet dated back 2012-Sep-1? and apply all subsequent minutely diffs, you will never encounter any redactions at all since then, as this redaction information is never part of a minutely diff file. That's also why you still see all those redacted objects in Overpass API and any other consumer using the same kind of processing.

OTOH, consumers using a recent planet file (including history) would need to have the unredaction details as part of a minutely diff, as they really don't have the respective object information around, as @gravitystorm mentioned.

IMHO, the whole redaction process has some severe shortcomings when used in conjunction with diff files. I'd be happy to see some solution for this, but I'm suspecting that this might be extremely hard to get right.

pnorman commented 6 years ago

Of course, if we unredact an object and don't put it in the replication, then we'd still confuse consumers since they wouldn't have that data. No easy way to deal with unredactions.

You don't want to put it in conventional replication, because the object is still deleted. If we can do unredactions without screwing up someone keeping a local copy of the planet up to date, that's an improvement.


The main use-case for replication diffs is a data consumer using OSM data, not meta-data. For someone who needs full history, full metadata, or other unusual requirements, we don't provide anything except the weekly dumps. I know it looks like replication diffs can be used for full history or metadata, but it only gets you 90% of the way there. This is one issue, but there are others like username renames, account actions, etc.

mmd-osm commented 6 years ago

Not sure if anyone noticed this post by Brett: https://lists.openstreetmap.org/pipermail/osmosis-dev/2018-October/001847.html:

It's a good time to mention that this will probably be the last Osmosis release I make.

Now that @brettch unfortunately won't be able to maintain osmosis in the future, we're not only stuck with Postgresql 9.5, but the two alternative approaches we're discussing here (logical replication vs. trigger) will have a much harder to get integrated, as noone is maintaining osmosis anymore.

I guess osmium also isn't anywhere close to taking over any minutely/hourly/daily replication tasks.

Any thoughts on this?

pnorman commented 5 years ago

I guess osmium also isn't anywhere close to taking over any minutely/hourly/daily replication tasks.

osmium is a library, not a stand-alone tool. There's osmium-tool, which does some basic stuff, but it's not a replacement for all the stuff that Osmosis does. Just the stuff most people use ;)

mmd-osm commented 5 years ago

osmium is a library, not a stand-alone tool.

Right, it was meant to allude to @brettch's comment [...] Osmosis has been largely superseded by better/faster tools such as Osmium [...]. In reality, there's libosmium, and osmium-tool, like you said.

The last OWG summary from a year ago mentioned some possible contracting option to continue work on the logical replication topic. I guess this never materialized in 2018? Are there some new plans for 2019? (Btw: it's a bit of a pity that the owg somehow seems to have stopped publishing reports in 2018, otherwise I wouldn't be asking those silly questions).

gravitystorm commented 5 years ago

(Btw: it's a bit of a pity that the owg somehow seems to have stopped publishing reports in 2018, otherwise I wouldn't be asking those silly questions).

Yeah, sorry about that. We haven't had any volunteers to write them, and so they haven't been done recently. Feel free to ask questions though, they aren't silly.

The last OWG summary from a year ago mentioned some possible contracting option to continue work on the location replication topic.

That's still on the table, but there's been no action taken since that report was written. Nobody has volunteered to do the work yet in writing a proposal or finding contractors.

simonpoole commented 5 years ago

@zerebubuth wrote

Additional items that we may want to take the opportunity to include:

* Changeset replication. This is currently handled by a [different tool](https://github.com/zerebubuth/openstreetmap-changeset-replication), but would be nicer if all this stuff was in the same place.

* Redactions. Currently these aren't included in diff feeds, but this would be an opportunity to add them.

* Distributed operation. State files are quite small, and could be stored in [Zookeeper](https://zookeeper.apache.org/), [etcd](https://github.com/coreos/etcd) or something similar. This would (in combination with the Idempotent/Immutable properties) mean that replication could be robustly run across multiple sites.

At the danger of some scope creep, but woudn't the feature with easily the most value add, be to generate diffs in the "augmented diff" format, aka including modified and deleted elements in their prior state in the diff. I haven't checked, but my assumption is that generating current format diff files from augmented diff format must be totally trivial.

mmd-osm commented 5 years ago

my assumption is that generating current format diff files from augmented diff format must be totally trivial.

Nope, it's even impossible. While Overpass-style augmented diffs omit all intermediate versions of an object in a one-minute time interval, the .osc file must contain every single object version, so there's no way to reconstruct an osc file based on this definition of augmented diffs.

If you're planning to provide a timeslot based augmented diff, you inherit pretty much all issues outlined here: https://github.com/drolbr/Overpass-API/issues/346

You could of course define something along the lines of the augmented diff format with different semantics, though. Not sure if this is what you're suggesting here.

mmd-osm commented 4 years ago

Follow up seems to be here now: https://github.com/joto/osmdbt

simonpoole commented 4 years ago

@mmd-osm "impossible" is a big word.

And yes I wasn't implying that diffs with context need to have exactly the same semantics as the overpass augmented diffs. Matter of fact they probably definitely shouldn't, given that the overpass format is both fairly arcane, contains more data than strictly necessary and the "one minute" constraint is at odds with everything else irl (as the linked issues show).

mmd-osm commented 4 years ago

@simonpoole : people seem to bring up this topic about every other week, in particular on twitter. I haven't seen anything even halfway qualifying as some kind of specification or even a list of requirements ⟶ 🤷‍♂️

mmd-osm commented 4 years ago

@tomhughes : while reviewing the new solution, the question about the target system environment came up. Do we already have some idea which Ubuntu and Postgresql versions will be deployed when this is supposed to move to production? Do we need a similar PPA build process, like for cgimap? Is it planned to have a test period on the dev server as well?

tomhughes commented 4 years ago

No idea yet until I have some view of what is involved.

Ideally we would have a test period but I'd need to look at how it might work given we don't do replication on dev at all - we might at least be able to test the postgres plugin bits though even if we don't save the diffs at the end of the day.

Then I guess we would likely move to switch to the new system with postgres 9.5 which will I assume involved some sort of flag day that makes everybody reset their consumers? and then migrate to postgres 12 in due course.

zerebubuth commented 4 years ago

Then I guess we would likely move to switch to the new system with postgres 9.5 which will I assume involved some sort of flag day that makes everybody reset their consumers?

I'm hoping that won't be necessary. I think it should be possible to have the process which writes out the diffs also read the last N diffs and filter out any object versions that have already been written. Which would make the transition seamless for any consumers.

However, I think it would be a good idea to test my "should" on dev to make sure it actually works and is practical.

mmd-osm commented 4 years ago

Right, we have this seamless transition topic also for another use case, and I’m assuming we’d need some plan for this one here as well.

https://github.com/openstreetmap/osmdbt/issues/11

woodpeck commented 4 years ago

A standard osmosis or pyosmium based replication consumer will not be hurt by receiving the same data again (in fact it is often recommended during setup to start the replication "a little earlier" than would absolutely be necessary "to be on the safe side"). While a totally seamless no-duplicate upgrade would be great, an upgrade that involves sending a few objects again wouldn't be a big issue. (At least not as big as an upgrade that would not send some objects at all...)

mmd-osm commented 4 years ago

an upgrade that involves sending a few objects again wouldn't be a big issue.

In the worst case, we might be sending v3, v2, and v3 of an object, and I don't really know, if all data consumers are resilient enough to cope with it. Maybe this is really a non-issue, and we just don't know.

pnorman commented 3 years ago

This is in production now.