Open marcocitus opened 3 years ago
One of the customers asked for the scalable migration from citus to s3 buckets in csv or parquet format. Then I tried to give a prototype scalable 'copy distributed_table to program'. The udf scalable_copy_to_csv
accepts a table name and an output folder to persist csv files. It basically pushes the copy <dist_table> to program 'bash s3-uploader.sh <output folder>'
command on all workers and does the copying of the table's shards in parallel. Shards are copied into a sample program in which we read all the data of the shard and persist it into a csv file with the same name with the shard. After perisence to csv, we can have custom s3 upload logic(not implemented, left to customer now, it can be upload to any provider).
Here is the repo for the solution.
I wonder if we can make that solution into citus. Some important points that can be questioned:
COPY data_102008 TO '{PATH}/data_102008';
Should just work. A convenient functionality to implement for this would be:
COPY data TO '{PATH}/data' WITH (COPY_SHARDS);
which would then propagate COPY data_xxxx TO '{PATH}/data_xxxx';
to each worker appropriately.
Optionally, also:
COPY data FROM '{PATH}/data' WITH (COPY_SHARDS);
where {PATH}
can be scanned for data_xxxx
files and then propagate
COPY data_xxxx-real FROM '{PATH}/data_xxxx';
where xxxx-real
is the actual shard id which may or may not be the same as the shard id in the files.
This second suggestion though is trickier since it has some implicit constraints that I can think of such as:
PATH
v
: all values in dist column for file data_xxxx
copied into data_xxxx-real
shard
the hash function used to distribute values to shards should send v
to the data_xxxx-real
shard.I'm a bit cautious of directly accessing to the shard via data_102008
or such for reasons:
So, if possible COPY dist_table ... WITH (only_shardid:=XXX)
type of syntax looks nicer to me
The process utility hook gives us some liberty to deal with non-existent names in utility commands such as COPY.
That means we could support commands that copy directly to/from shards via the coordinator, e.g.:
If the table does not exist, the coordinator could check whether it matches a shard name and redirect the COPY to the shard.
This would be helpful for migrations to and from Citus.