citusdata / citus

Distributed PostgreSQL as an extension
https://www.citusdata.com
GNU Affero General Public License v3.0
10.36k stars 658 forks source link

Push down modifying CTEs in distributed INSERT ... SELECT #1741

Open marcocitus opened 6 years ago

marcocitus commented 6 years ago

To do rollups using INSERT...SELECT, you need to a way of tracking which data has already been processed to avoid rolling it up twice, which is difficult given current limitations of both distributed INSERT...SELECT and INSERT...SELECT via the coordinator.

A common pattern is to the delete the raw data that you are adding to the rollup. That way, the second time you run the command, it will only process new data.

We would want something like the following to be pushed down:

CREATE TABLE data (key int);
CREATE TABLE rollup (key int primary key, count int);

SELECT create_distributed_table('data', 'key');
SELECT create_distributed_table('rollup', 'key');

WITH deleted_rows AS (DELETE FROM data RETURNING *)
INSERT INTO rollup (key, count)
SELECT key, count(*) FROM deleted_rows GROUP BY key
ON CONFLICT (key) DO UPDATE SET count = rollup.count + EXCLUDED.count;
onderkalaci commented 1 year ago

@marcocitus this seems to work nowadays. Do you want to check before closing?

marcocitus commented 1 year ago

No, I don't think we push down update/delete in CTEs. They work, but are not pushed down along with the INSERT.

onderkalaci commented 1 year ago

Ah, yes, I misunderstood the description.

So, the idea here is that the CTE could potentially preserve the distribution column. It reminded me https://github.com/citusdata/citus/pull/4355.

I guess if CTE inlining were to work for modification CTEs, then it might be possible to inline and pushdown. But afaict, PG does not allow any CTE inlining for CTEs that can modify the database.

marcocitus commented 1 year ago

You can only have modifying CTEs, not modifying subqueries, so probably inlining does not really apply.

The planner would have to be clever enough to realize that the whole query is pushdownable and generate tasks that have CTEs.

jaihind213 commented 8 months ago

hi we are doing a

with(cte) insert into distributed_table select * from cte join another_distributed_table on conflict do update set blah blah blah...


azure citus 10.2

we get

ERROR:  cannot perform distributed planning for the given modification
DETAIL:  Insert query cannot be executed on all placements for shard 214137

the query work some times and some times it errors out.

the cluster is not balanced !