Open hackermd opened 7 years ago
Hi @hackermd,
I would like to use the range distribution method (see use case below). At the moment, this is possible by manually updating the pg_dist_partition and pg_dist_shard tables. It would be nice if this functionality would be exposed at a higher level via the create_distributed_table() function.
You can create range distributed tables with create_distributed_table() by using the optional parameter;
SELECT create_distributed_table('table_name', 'distribution_column', 'range');
Why is the range distribution method not exposed (and documented)?
Hash distribution works fine for the most of the people, and less burden to maintain. Range distribution requires some manual intervention like setting shard min/max values.
Depending on your use-case, you may consider the append distribution: https://docs.citusdata.com/en/v6.2/reference/append.html
Do you see any problems with the above use case? Any suggestions?
You need to be careful while playing with the metadata. Also, some functionality which works for hash may not work for range partition.
What is your use-case? What type of an application are you trying to power? Is it possible put an aggregator in front of your batch processes and use lesser copy operations in parallel?
@metdos, thanks for your feedback!
Indeed, one can specify distribution method range
(haven't actually tried this, since the docs state that only append
and hash
are valid arguments). But as you have pointed out, range distribution has a few intricacies:
1) Explicit co-location is not possible
```sql
CREATE TABLE foo (id BIGINT, name VARCHAR);
SELECT create_distributed_table('foo', 'id', 'range');
CREATE TABLE bar (id BIGINT, foo_id BIGINT);
SELECT create_distributed_table('bar', 'foo_id', 'range', colocate_with=>'foo');
```
```
ERROR: cannot distribute relation
DETAIL: Currently, colocate_with option is only supported for hash distributed tables.
```
How is co-location handled for range distribution tables? Will this still be inferred from the metadata tables? How will this impact performance and SQL support?
2) Shards are not created
Specifying `range` distribution method in `create_distributed_table()` correctly creates the entry in the `pg_dist_partition` metadata table:
```sql
SELECT * FROM pg_dist_partition;
```
```
logicalrelid | partmethod | partkey | colocationid | repmodel
--------------------------------------+------------+------------------------------------------------------------------------------------------------------------------------+--------------+----------
foo | r | {VAR :varno 1 :varattno 1 :vartype 20 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 0 | c
```
However, it doesn't create the shards and consequently no entries in `pg_dist_shard` or `pg_dist_shard_placement`. The shards can then be subsequently created using `master_create_empty_shard()` (each shard needs to be created individually) and `shardminvalue` and `shardmaxvalue` columns of the `pg_dist_shard` table must be updated manually. It would be great if this would be automatically handled by `create_distributed_table()` similar to `hash` distribution method - at least in case the type of the distribution column is `INTEGER` or `BIGINTEGER` where the range for each shard could be determined based on the value of `citus.shard_count`.
Some functionality not supported
Also, some functionality which works for hash may not work for range partition.
Which functionality is supported for hash
but not for range
?
What type of an application are you trying to power? Is it possible put an aggregator in front of your batch processes and use lesser copy operations in parallel?
The application is TissueMAPS.
I need to create millions of so called "mapobjects" on a batch cluster. Each object has one ore more segmentations (geometries such as polygons or points) and a potentially large number of features associated with it. There are three tables: mapobjects
, mapobject_segmentations
and feature_values
, which are all distributed by mapobject_id
and should be co-located. I need to be able to insert into these tables concurrently using many parallel connections (one connection for each batch job). Each batch job inserts a few hundred up to tens of thousands of entries en bulk. Ideally they would all end up on the same shard or at least on the same worker, such that I can use transactions or COPY
, since creating each entry individually has too much overhead. Once all entries are written to the table, I must be able to perform spatial queries in real time to empower an interactive web application with support for multi-tenancy (think of Google maps). So I need to be able to ingest large amounts of data en bulk while being able to query the data in a responsive manner. The number of connections for ingestion is fixed (determined by the number of available compute units), however other connections from the web application may come in simultaneously and run select queries on distributed tables.
Is it possible put an aggregator in front of your batch processes and use lesser copy operations in parallel?
I am considering to put PgBouncer in front of the coordinator. But it is still hard to tell how many connections the coordinator will create on the workers, since this depends on the distribution method. This is why I would like to use range
over hash
, because I have a better control over the number of connections.
In the meantime, I have already figured out that co-location does work as expected for range distributed tables. I cannot use foreign keys for range distributed tables created as follows:
SELECT create_distributed_table('foo', 'id', 'range')
However, when I create distributed tables using the hash method and then later change the distribution method and the min/max values for the shards in the metadata tables it works fine.
@hackermd, we don't document and promote range
partitions because of the reasons you listed. We have customers who use range partition tables but they mostly manage metadata and colocation for themselves.
For your individual batch copy jobs, how many unique mapobject_id
do they use? If this is limited, or you have some control over them, this would help with the connection problem. Copy only opens connections to shards which it needs to send data, so if a copy operation only writes data into one shard, it will only open one connection.
For your individual batch copy jobs, how many unique mapobject_id do they use?
In the range between 10^2 and 10^5.
Copy only opens connections to shards which it needs to send data, so if a copy operation only writes data into one shard, it will only open one connection.
Exactly! But how can I guarantee that only a single shard will be target? This is my problem with hash
distribution. I have little to no control over shard placement because I don't know on which shard the rows will end up after their partition key get hashed. They may end up all over the place and every query (also subsequent SELECT
queries) will open multiple connections for the different shard placements.
we don't document and promote range partitions because of the reasons you listed.
I think it would be worth documenting it and improving the interface. To me range
distribution is really powerful, because it gives me much more control over where my data will end up and thereby allows me to tune performance. Your example with tenant_id
is nice, but for more complex distribution and access patterns hash
distribution can become a bottleneck, since there is no support for bulk ingestion using multi-row inserts or transactions and using multiple parallel COPY commands will overload workers.
The nicest solution would in my opinion be a function master_modify_single_shard()
that would allow me to bulk ingest multiple rows into a single shard. Since only a single shard will be modified, this function could provide full SQL support including transactions (based on my limited understanding of the Citus internals) and the partition key could be auto-generated using a shard-specific SEQUENCE
that generates values in the range [shardminvalue, shardmaxvalue]
.
@hackermd, I updated the issue title to represent whatever discussed here.
The nicest solution would in my opinion be a function master_modify_single_shard() that would allow me to bulk ingest multiple rows into a single shard
If you use a stable function as the default value in the partition column (e.g. now()
) then it won't change during the COPY and thus it goes into a single shard, while the COPY commands are still distributed across different shards.
@marcocitus thanks for the suggestion! That would require a change of my schema, but introducing an additional column for the partition key could be an elegant solution. The only problem that I see with this approach is that it would probably make subsequent SELECT
queries more difficult, since the partition key would no longer be part of the WHERE
clause. I guess this would then require the coordinator to send these SELECT
queries to all workers and run them on all shards, wouldn't it?
Following along these lines, it would be really cool if I could use a Postgis geometry column, such asST_Polygon
, for the partition key, and if this would be used by the coordinator to determine the shard placement for SELECT
queries that have a WHERE
clause with ST_Intersects()
. This would allow me to distribute my "mapobjects" based on spatial location and co-locate close by objects.
I guess this would then require the coordinator to send these SELECT queries to all workers and run them on all shards, wouldn't it?
Yes, it's probably a good idea to keep the shard count relatively small or use pgbouncer in front of the workers. In Citus cloud we actually set up outbound pgbouncers on the coordinator to keep a pool of connections to the workers.
@marcocitus thanks for the suggestions!
If you use a stable function as the default value in the partition column (e.g. now()) then it won't change during the COPY and thus it goes into a single shard, while the COPY commands are still distributed across different shards.
I'm currently testing the COPY
approach using a partition key for hash
distribution for which the value is the same for all rows generated by a particular batch job (e.g. the ID of the process or the hostname of the compute node). This makes things a bit more tricky with respect to foreign key and unique constraints when co-locating different tables, but it seems to work nicely.
Yes, it's probably a good idea to keep the shard count relatively small or use pgbouncer in front of the workers. In Citus cloud we actually set up outbound pgbouncers on the coordinator to keep a pool of connections to the workers.
Interesting! Wouldn't this create problems for the coordinator when he doesn't hear back from workers within 4s?
Interesting! Wouldn't this create problems for the coordinator when he doesn't hear back from workers within 4s?
No, because in transaction pooling mode pgbouncer will happily accept new connections, but queue up individual commands/transactions. The timeout only applies to connection establishment.
Feature request
I would like to use the
range
distribution method (see use case below). At the moment, this is possible by manually updating thepg_dist_partition
andpg_dist_shard
tables. It would be nice if this functionality would be exposed at a higher level via thecreate_distributed_table()
function.Use case
According to the docs, the method of choice for bulk ingestion is
COPY
. I have about 200 parallel processes (batch cluster) that all need to insert a lot of data into a distributed table in parallel. TheCOPY
approach would probably create too many connections on workers. However, the documentation states thatCOPY
will create only connections to workers that hold the shards data will be copied to. So, I would like to explicitly target an individual shard, i.e. "directly" copy data from a compute node to a database worker node without overloading the database servers with too many connections. In order to be able to achieve this, I need to choose values for the partition key such that the records are guaranteed to be co-located on the same shard. This doesn't seem to be possible when these values get hashed.Questions