Closed elgordino closed 2 years ago
@elgordino This is a great question!
We should do a little bit of research & testing - but I think the following might work:
pg_advisory_xact_lock
to make sure that sequence ids are read and used in the order in which transactions are committed.Example:
begin transaction
select pg_advisory_lock('my-model-identifier-number');
// create a cursor value from a sequence
end
Thoughts?
Hi @coco98!
Thanks for the reply, I think a lock would work but my main concern with a lock is the potential for a performance impact on writes to the table.
This table will have a high number of concurrent writes, some of which can take a few seconds (when a user is making a bulk update), and I wouldn't want to block other user writes during that time. I will be deploying some new monitoring to production soon that will track how much of an issue this is.
P.S. Nice keynote yesterday!
Hi @coco98 I added some logging to production that records the max difference between 'transaction_timestamp' and 'clock_timestamp' on the rows inserted into the table I want to track.
We see several transactions that take 5 or 6 seconds to complete. If we were to add a lock during these writes it will block writes for too long for other users.
Is this something that Hasura could be modified to manage? Perhaps with some sort of window covering a period of recent cursor positions?
@elgordino Ok, that's good to know. From my reading of the docs, the pg_advisory_xact_lock shouldn't adversely impact transaction performance that much and is in fact the recommended way of achieving this kind of concurrency control.
Another option is to separate the reads and the writes through a slightly more complicated architecture:
My recommendation:
What do you think?
Edit: Another experiment might also be to run a simple transaction that 1) acquires xact_lock 2) reads nextval from sequence 3) inserts and then see what happens when you run 100 concurrent transactions (or whatever is closest to your workload) and see if the deterioration in perf is significant?
This is a limitation of postgres. The only way to achieve "commit" sequence is to use a lock which means that you are limited to a single writer at the same time. Only possible solution is probably to use the wal log with master and slave tables as mentioned above. Thus my advice is to batch the writes or make the transactions smaller (faster to commit inside db). This is a well-known problem for implementing eventstore in postgres. Another way is to use a position counter as described here: https://dev.to/kspeakman/comment/ld93 , however this will commit in sequence
Hi @coco98 sorry for the extended delay in replying.
Let me outline the architecture I currently have in place that I am hoping to move to Streaming Subscriptions. It's how we subscribe users to thousands of records per user across many tables at a very low cost.
When a client app boots it may load a few thousand records, the client then needs to be notified if any of these records change. Rather than subscribing to each individual record what we do is use a trigger on each 'data' table to write a record into a single 'notify' table. Each client then polls the 'notify' table to see if there are any new records they need to refresh, if they see an entry in the 'notify' table they then fetch the real record from the 'data' table. The data in the 'notify' table is aged out over time, meaning clients are only making a simple request about a small table to keep 1000's of records fresh. BTW have been meaning to write up this approach in much more detail and offer to do a talk at the community call or similar because I think it's pretty cool (I dunno maybe folks are doing this already, I'm sadly not as in touch as I used to be!).
To make this work the clients currently poll the 'notify' table every 1 second, to avoid the issue with transaction timestamps they request a window that includes data that has been modified in the last 15 seconds. This does cause the client to receive the same data multiple times but it guarantees they receive the data, regardless of timestamp ordering (assuming we don't have a transaction over 15 seconds, which we monitor).
We cannot cause this table to block when multiple concurrent and independent, transactions write to the table. For example if one write is taking 10 seconds, then other writes will also be blocked and all users will be delayed receiving notifications of their changes. The responsiveness of the client is entirely dependent on receiving notification of change promptly.
I think any solution involving marshalling the cursor id's so they appear in order will always hit this problem, or I can't think of a way to do it. To enable responsive notification of new data I think Hasura needs to tolerate an out-of-order cursor somehow. Maybe it could, optionally, have a configurable window for the cursor age and then deduplicate the records before notifying the client, or I'm sure there are other approaches.
What do you think?
Thanks - this is great context!
What authorization rules do you have on the changes table that will be streamed out to users?
On Tue, Sep 6 2022 at 4:34 AM, Gordon Johnston < @.*** > wrote:
Hi @coco98 ( https://github.com/coco98 ) sorry for the extended delay in replying.
Let me outline the architecture I currently have in place that I am hoping to move to Streaming Subscriptions. It's how we subscribe users to thousands of records per user across many tables at a very low cost.
When a client app boots it may load a few thousand records, the client then needs to be notified if any of these records change. Rather than subscribing to each individual record what we do is use a trigger on each 'data' table to write a record into a single 'notify' table. Each client then polls the 'notify' table to see if there are any new records they need to refresh, if they see an entry in the 'notify' table they then fetch the real record from the 'data' table. The data in the 'notify' table is aged out over time, meaning clients are only making a simple request about a small table to keep 1000's of records fresh. BTW have been meaning to write up this approach in much more detail and offer to do a talk at the community call or similar because I think it's pretty cool (I dunno maybe folks are doing this already, I'm sadly not as in touch as I used to be!).
To make this work the clients currently poll the 'notify' table every 1 second, to avoid the issue with transaction timestamps they request a window that includes data that has been modified in the last 15 seconds. This does cause the client to receive the same data multiple times but it guarantees they receive the data, regardless of timestamp ordering (assuming we don't have a transaction over 15 seconds, which we monitor).
We cannot cause this table to block when multiple concurrent and independent, transactions write to the table. For example if one write is taking 10 seconds, then other writes will also be blocked and all users will be delayed receiving notifications of their changes. The responsiveness of the client is entirely dependent on receiving notification of change promptly.
I think any solution involving marshalling the cursor id's so they appear in order will always hit this problem, or I can't think of a way to do it. To enable responsive notification of new data I think Hasura needs to tolerate an out-of-order cursor somehow. Maybe it could, optionally, have a configurable window for the cursor age and then deduplicate the records before notifying the client, or I'm sure there are other approaches.
What do you think?
— Reply to this email directly, view it on GitHub ( https://github.com/hasura/graphql-engine/issues/8632#issuecomment-1238030817 ) , or unsubscribe ( https://github.com/notifications/unsubscribe-auth/AABAAWHWOZCGYGOQLFKUM53V44T4RANCNFSM5Z6O3SRA ). You are receiving this because you were mentioned. Message ID: <hasura/graphql-engine/issues/8632/1238030817 @ github. com>
I'll explain a little more about the 'notify' table.
The idea I had is that when a record in a data table is mutated the notification of that change needs to go to users of various 'scopes' who might potentially need to know about the updated record.
For example there could be three scopes
The notify table then looks like this
The client then then polls for 3 queries like so
notify (where: { updatedDate: ( _gte: "previousPollWithOffset", organizationId eq: "the users org" )}) {
id
}
notify (where: { updatedDate: ( _gte: "previousPollWithOffset", projectId eq: "the users project" )}) {
id
}
notify (where: { updatedDate: ( _gte: "previousPollWithOffset", userId eq: "the users id" )}) {
id
}
This results in a stream of UUID's being sent to the client. The client then compares those UUID's against one's it has seen previously and if it has seen it then it requests the data from the data tables, at which point the regular security rules apply. It will often receive UUID's it doesn't care about but it can simply ignore those.
Initially I enforced security rules by checking the users membership of the organization or the project, or whether the user is the user id.
I ran into an issue about notifying DELETE's to the client, because, say the 'project' is being deleted then the security rule can't reference the project being deleted. As such I am currently allowing any users to select from this table because they are only a stream of opaque UUID's which are useless without context anyway. I may in future move 'DELETE's to a separate table that has no rules, then apply rules for CREATE/UPDATE notification again.
@elgordino I am following this topic and I also implemented a similar notify table. I will give you some recommendations how you can fix your problems.
First you can generalize your notify table to more generic table structure: id(pk, auto-generated), resource_type(string), resource_id (id/string). This way you can add more resource types in future.
Next to make sure no data is missed by subscriptions: create an additional table _notify with the same structure you already have. Write the changed event to the _notify table in the transaction. Next you need to create a (traditional hasura) subscription/poll on your api server to poll the _notify table with some created_at timestamp cursor + some ts offset. In your notify table, create a unique constraint, and insert all new/duplicate events to the notify table with on conflict do nothing. Here you can use advisory lock to guarantee the write order and single writer to the notify table. Since the notify table has single writer, it doesnot matter if you using serial or created_at (not same as _notify table) as a cursor Now you can safely use the notify table with hasuras cursor subscription. Ideally this infrastructure setup could be handled by hasura internally
Thanks @ilijaNL that's a really interesting and creative suggestion.
I guess it could cause the delay in the client receiving the response to double, but that could be countered by reducing the refresh interval in Hasura to 0.5 secs.
Obviously it also means running this service somewhere, which is not a problem, but will need some careful design to make sure it meets availability requirements, which TBH I'd rather not have the headache of thinking about.
As you say ideally Hasura should handle this internally somehow. The feature is promoted as being simple to use and the documentation for this feature doesn't cover any of these nuances and I feel users will loose data unless they are aware of the implications and complexity here.The docs say . Ideally, the cursor chosen should represent unique and sortable values so that each row is sent exactly once to a subscriber.
but doesn't describe the difficulty of achieving this and the impact of failing to do so.
I have have to start building stuff outside of Hasura then I might change the notification channel back to the client entirely. There are lower latency options available.
Thinking of the 3 ways messages are normally delivered you have
Hasura is currently achieving 'At most once', however I need 'At least once' for my environment, I don't care if clients receive the same message multiple times, I do care if they never receive the message because they will wait forever. Perhaps this is something that could be achieved by allowing some sort of 'offset' window to be configured (say 15 seconds) and then messages are just repeated to clients each time the DB is polled during that period. Ideally Hasura would de-duplicate them first, but if that's not possible, then, at least in my use case, the client can do that.
At least once is not possible with hasuras stream subscriptions since it requires an acknowledgement from the receiving side and is more suitable for server to server scenarios (see message brokers like rabbitmq). I think you should use hasura subscription as notify mechanism, and then fetch the missing data. Which means, when a event comes in with a subscription, it means there is something new, then you catch up with simple request (using a cursor)
I guess it could cause the delay in the client receiving the response to double, but that could be countered by reducing the refresh interval in Hasura to 0.5 secs.
For lowest latency you should check out native postgres notify mechanism: https://www.postgresql.org/docs/current/sql-notify.html however this requires also some backend setup
Ah yeah I guess I shouldn't say 'at least once' as such. The client is managing it's own position and refreshing data as necessary if there is a disconnect or some trouble on the websocket etc, and the notify just needs to be a notify, the client then refreshes the full data as necessary. What I do need though it to ensure that the notification of change does get sent when the client is happily connected.
I could use standard Hasura subscriptions but I think I would need to recreate the subscriptions each time I wanted to move the cursor forward which is possible, but not desirable.
I'm happy to consider solutions outside of Hasura, but I'd really rather not have to.
TBH the polling the client does now works well enough, one option I have considered is moving that polling to a server, and then deduplicating data there and have the client make a websocket connect to that server. Effectively recreating streaming subscriptions, but again I'd really not have to.
@elgordino Thanks for the context on the permission rules :) We're working on a thing to make this easier for this use-case and will share it soon!
@coco98 excellent pleased to hear it. I'll look forward to giving it a go :)
@elgordino Heyo! Sorry for the wait.
We tried few benchmarks with a monotonic id generated via pg_advisory_locks and it is really fast. It can be set up via a BEFORE INSERT
trigger. The first one is an example of a "global" lock over all inserts to generate this id:
CREATE sequence serial_chats;
CREATE FUNCTION add_serial_id()
RETURNS trigger AS $$
DECLARE
nextval bigint;
BEGIN
PERFORM pg_advisory_xact_lock(1);
select nextval('serial_chats') into nextval;
NEW.id := nextval;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER add_serial_trigger
BEFORE INSERT ON chats
FOR EACH ROW
EXECUTE FUNCTION add_serial_id();
Depending on your application, you can optimize this further by taking a lock over a "partition" value (which is going to be part of your filter expression). For example, in a chat app you only need monotonicity for chats in a single room.
CREATE FUNCTION add_serial_id_partitioned()
RETURNS trigger AS $$
DECLARE
nextval bigint;
BEGIN
PERFORM pg_advisory_xact_lock(NEW.room_id); -- this is taking lock over a partition value
select nextval('serial_chats') into nextval;
NEW.id := nextval;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
Let us know if you are able to try this out. Also, we can hop on a call and run few tests/benchmarks on your app as well. Here is my calendly that can help schedule this time: https://calendly.com/tiru-hasura/30min
The solution mentioned by @tirumaraiselvan won't fix they issue of @elgordino since his transactions taking a long time and the lock is released when after transaction commit. Advisory lock per partition key improves it (less concurrent locking) however it depends on amount of partions and the stream subscription filters, aka you should filter per partion and not globally. If you want to stick single transaction and have monotonic incremental id with least concurrent locking the best you can do is this:
CREATE TABLE bench.events (
position bigint,
event_id uuid primary key default gen_random_uuid(),
stream text NOT NULL,
event_name text NOT NULL,
version integer NOT NULL,
data json,
meta_data json,
created_on timestamp with time zone not null default now()
);
CREATE OR REPLACE FUNCTION proc_set_primary_key()
RETURNS TRIGGER
LANGUAGE plpgsql
AS
$$
BEGIN
-- this can be improved by partion locking
PERFORM pg_advisory_xact_lock(1723683380);
update bench.events set position = NEXTVAL('bench.event_order') where event_id = new.event_id;
RETURN NULL;
END;
$$;
CREATE CONSTRAINT TRIGGER set_commit_order
AFTER INSERT ON bench.events
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW
EXECUTE PROCEDURE proc_set_primary_key();
The take away is the trigger should happen on DEFERRABLE INITIALLY DEFERRED and the position should be nullable and inserted as nullable.
However to make this as performant as possible, you should avoid locking here. In that case you need to remove trigger and just call theproc_set_primary_key
(slightly modified) from your backend asynchronical, thus updating the order in background.
Not sure I understand.
Both before and after triggers apply exactly around whenever the insert statement happens in a transaction.
Additionally. I think this trigger would be applied to the CDC audit table and not the actual transaction table. Doesn’t matter from a perf pov since everything will be a single transaction, but the lock acquisition is the last step.
I saw the benchmarks that tiru ran, including on slow transactions with random delays large and small introduced and there’s not a significant perf impact.
@elgordino: would love for you to give this a shot and perhaps set up a quick chat with tiru at your convenience. This is hopefully both really easy and non-intrusive to setup and is light on perf - esp for your use case with long running concurrent transactions. Excited to understand the impact!
On Thu, Sep 15 2022 at 11:56 PM, ilija < @.*** > wrote:
The solution mentioned by @tirumaraiselvan ( https://github.com/tirumaraiselvan ) won't fix they issue of @elgordino ( https://github.com/elgordino ) since his transactions taking a long time and the lock is released when after transaction commit. Additionally this should happen as after insert trigger, otherwise the cursor could miss "slow" transactions. Advisory lock per partition key improves it (less concurrent locking) however it depends on amount of partions. If you want to stick single transaction and have monotonic incremental id with least concurrent locking the best you can do is this:
CREATE TABLE bench.events ( position bigint, event_id uuid primary key default gen_random_uuid(), stream text NOT NULL, event_name text NOT NULL, version integer NOT NULL, data json, meta_data json,
created_on timestamp with time zone not null default now() );
CREATE OR REPLACE FUNCTION proc_set_primary_key() RETURNS TRIGGER LANGUAGE plpgsql AS $$ BEGIN PERFORM pg_advisory_xact_lock(1723683380);
update bench.events set position = NEXTVAL('bench.event_order') where event_id = new.event_id; RETURN NULL; END; $$;
CREATE CONSTRAINT TRIGGER set_commit_order AFTER INSERT ON bench.events DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE proc_set_primary_key();
The take away is the trigger should happen on DEFERRABLE INITIALLY DEFERRED and the position should be nullable and inserted as nullable.
— Reply to this email directly, view it on GitHub ( https://github.com/hasura/graphql-engine/issues/8632#issuecomment-1248989680 ) , or unsubscribe ( https://github.com/notifications/unsubscribe-auth/AABAAWGS7ULMGFY4Y7MJZATV6QKZZANCNFSM5Z6O3SRA ). You are receiving this because you were mentioned. Message ID: <hasura/graphql-engine/issues/8632/1248989680 @ github. com>
Ofcourse - we also checked if the load testing created any missing events and that wasn’t the case!
On Fri, Sep 16 2022 at 12:06 AM, Tanmai Gopal < @.*** > wrote:
Not sure I understand.
Both before and after triggers apply exactly around whenever the insert statement happens in a transaction.
Additionally. I think this trigger would be applied to the CDC audit table and not the actual transaction table. Doesn’t matter from a perf pov since everything will be a single transaction, but the lock acquisition is the last step.
I saw the benchmarks that tiru ran, including on slow transactions with random delays large and small introduced and there’s not a significant perf impact.
@elgordino: would love for you to give this a shot and perhaps set up a quick chat with tiru at your convenience. This is hopefully both really easy and non-intrusive to setup and is light on perf - esp for your use case with long running concurrent transactions. Excited to understand the impact!
On Thu, Sep 15 2022 at 11:56 PM, ilija < @.*** > wrote:
The solution mentioned by @tirumaraiselvan ( https://github.com/tirumaraiselvan ) won't fix they issue of @elgordino ( https://github.com/elgordino ) since his transactions taking a long time and the lock is released when after transaction commit. Additionally this should happen as after insert trigger, otherwise the cursor could miss "slow" transactions. Advisory lock per partition key improves it (less concurrent locking) however it depends on amount of partions. If you want to stick single transaction and have monotonic incremental id with least concurrent locking the best you can do is this:
CREATE TABLE bench.events ( position bigint, event_id uuid primary key default gen_random_uuid(), stream text NOT NULL, event_name text NOT NULL, version integer NOT NULL, data json, meta_data json,
created_on timestamp with time zone not null default now() );
CREATE OR REPLACE FUNCTION proc_set_primary_key() RETURNS TRIGGER LANGUAGE plpgsql AS $$ BEGIN PERFORM pg_advisory_xact_lock(1723683380);
update bench.events set position = NEXTVAL('bench.event_order') where event_id = new.event_id; RETURN NULL; END; $$;
CREATE CONSTRAINT TRIGGER set_commit_order AFTER INSERT ON bench.events DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE proc_set_primary_key();
The take away is the trigger should happen on DEFERRABLE INITIALLY DEFERRED and the position should be nullable and inserted as nullable.
— Reply to this email directly, view it on GitHub ( https://github.com/hasura/graphql-engine/issues/8632#issuecomment-1248989680 ) , or unsubscribe ( https://github.com/notifications/unsubscribe-auth/AABAAWGS7ULMGFY4Y7MJZATV6QKZZANCNFSM5Z6O3SRA ). You are receiving this because you were mentioned. Message ID: <hasura/graphql-engine/issues/8632/1248989680 @ github. com>
Additionally. I think this trigger would be applied to the CDC audit table and not the actual transaction table. Doesn’t matter from a perf pov since everything will be a single transaction, but the lock acquisition is the last step.
The advisory lock will still block other transactions, and this performance can be significant if you cant use/dont have partions and having long transactions. The after insert is used to reduce (even if it is really small) the lock time. However I am not sure how much time it safes but I assume it can be significant since postgres does some contraint checks per statement instead of transaction and with after insert you skip those. But again it depends on your transactions.
Thank you @ilijaNL the insight to take the transaction lock in a deferred trigger has worked very nicely.
What I have tested is this process
This is the trigger I have been testing with to add the sequence number to the 'notify' table
CREATE OR REPLACE FUNCTION deferred_notify_upd() RETURNS TRIGGER AS $notify_upd$
BEGIN
PERFORM pg_advisory_xact_lock(1);
update notify set
"deferred_notify_serial" = NEXTVAL('deferred_serial')
where id = new.id;
RETURN NULL;
END;
$notify_upd$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS "deferred_notify_upd" ON "notify";
CREATE CONSTRAINT TRIGGER "deferred_notify_upd"
AFTER INSERT ON "notify"
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE PROCEDURE deferred_notify_upd();
I have been testing with multiple clients inserting batches of 10,000 records (which takes about 50 seconds) and other clients inserting smaller batches of records at the same time.
With the deferred trigger I am seeing a lock for approx 0.2 seconds to add the sequence numbers to the notify table, which is excellent!
I need to make some more tweaks before adopting this in production, and I will change the lock so it is more granular, but I now expect to be able to deploy this successfully.
Thanks for your assistance with this @coco98 and @ilijaNL I hope to find time to write up my entire approach at some point in the next couple of months.
I'm going to close this issue since I think we are all good now!
If the deferred is not sufficient you can always call deferred_notify_upd() in a async fashion, e.g. with a background job. However this will increase the latency but removing the lock fully
Is a monotonic id still a thing? I'm just using an timestamptz as initial value and it seems to work fine.
Is a monotonic id still a thing? I'm just using an timestamptz as initial value and it seems to work fine.
Depends on your write rate and if you can miss data
After some testing it seems a monotonic id is still required, batching doesn't work at all if I insert entries > batch_size with the same timestamp.
Sorry for the double post.
@ilijaNL Can you explain where exactly bench.event_order in your example comes from? Is it a sequence?
What would you prefer for a high traffic chat application? Your solution or the one elgordino posted?
@elgordino Any chance for your writeup soon?
@ilijaNL Can you explain where exactly bench.event_order in your example comes from? Is it a sequence?
That is correct, more specific, a bigserial sequence
What would you prefer for a high traffic chat application? Your solution or the one elgordino posted?
That kind of depends on your transactions, if they are heavy (thus taking long time), I would prefer the deferred after insert trigger approach, if they are really short (only inserting message e.g.) I would go for the pg_advisory_xact_lock
before every insert and using partion key for the lock key (chat id for example)
@elgordino Any chance for your writeup soon?
TBH I haven't much to add on what I posted above other than to say we've been using it a while and it's been working fine.
Version Information
Server Version: 2.8.1 also seen on 2.7.0
Environment
Docker local dev with community edition
What is the expected behaviour?
I expect streaming subscription to return new rows at least once to the connected client.
Keywords
streaming subscriptions, transaction, updated data, clock timestamp, serial, cursor
What is the current behaviour?
The streaming subscriptions rely on a cursor to identify new rows. This cursor needs to be unique monotonically increasing (fron Hasura's perspective), however this is not easily achieved in Postgres, and may not be achievable at all.
The problem arises when two transactions occur in parallel. For example
The same problem is also observed when using clock_timestamp and bigserial for the cursor column, because the inserted rows are not visible to Hasura until the transactions are committed, and as such can show out of order to Hasura
Using a huge, say 10000, batch size does not resolve the issue.
How to reproduce the issue?
Screenshots or Screencast
I have put together a video showing this problem in detail here: https://youtu.be/qIsS0YX8g1k
I can upload the repo used in the video if it is helpful.
Any possible solutions?
My suggestion is that Hasura needs to maintain some sort of lagging cursor, that lags the real cursor position by a configurable period.
For example if the lagging cursor is set to 30 seconds, then in my example above all rows would be returned because Hasura would still be looking for new rows created after 11:59:40 when transaction B commits.
Obviously this results in more deduplication effort by Hasura and potentially more load on the database, however it would be much safer.
Alternatively Hasura could potentially use LISTEN/NOTIFY to observe the table directly?
I have had some discussion with karthikeyanin this discord thread: https://discord.com/channels/407792526867693568/983643992988196914 but a resolution has not yet been reached.
Can you identify the location in the source code where the problem exists?
No
If the bug is confirmed, would you be willing to submit a PR?
Sorry, way outside my competence!