This PostgreSQL extension is a Foreign Data Wrapper (FDW) for accessing Parquet file on local file system and Amazon S3. This version of parquet_s3_fdw can work for PostgreSQL 13, 14, 15 and 16.0.
Read-only Apache Parquet foreign data wrapper supporting S3 access for PostgreSQL.
CMake 3.26.3+
C++11 compiler
libcurl-devel
openssl-devel
libuuid-devel
pulseaudio-libs-devel
libarrow
and libparquet
: Confirmed version is 12.0.0 (required).
Please refer to building guide.
AWS SDK for C++ (libaws-cpp-sdk-core libaws-cpp-sdk-s3)
: Confirmed version is 1.11.91 (required).
Please refer to bulding guide
Attention!
We reccomend to build libarrow
, libparquet
and AWS SDK for C++
from the source code. We failed to link if using pre-compiled binaries because gcc version is different between arrow and AWS SDK.
make install
or in case when PostgreSQL is installed in a custom location:
make install PG_CONFIG=/path/to/pg_config
It is possible to pass additional compilation flags through either custom
CCFLAGS
or standard PG_CFLAGS
, PG_CXXFLAGS
, PG_CPPFLAGS
variables.
CREATE EXTENSION parquet_s3_fdw;
CREATE SERVER parquet_s3_srv FOREIGN DATA WRAPPER parquet_s3_fdw;
If using MinIO instead of AWS S3, please use use_minio option for create server.
CREATE SERVER parquet_s3_srv FOREIGN DATA WRAPPER parquet_s3_fdw OPTIONS (use_minio 'true');
You have to specify user name and password if accessing Amazon S3.
CREATE USER MAPPING FOR public SERVER parquet_s3_srv OPTIONS (user 's3user', password 's3password');
Now you should be able to create foreign table from Parquet files. Currently parquet_s3_fdw
supports the following column types (to be extended shortly):
Arrow type | SQL type |
---|---|
INT8 | INT2 |
INT16 | INT2 |
INT32 | INT4 |
INT64 | INT8 |
FLOAT | FLOAT4 |
DOUBLE | FLOAT8 |
TIMESTAMP | TIMESTAMP |
DATE32 | DATE |
STRING | TEXT |
BINARY | BYTEA |
LIST | ARRAY |
MAP | JSONB |
Currently parquet_s3_fdw
doesn't support structs and nested lists.
Following options are supported:
s3://
. The mix of local path and S3 path is not supported;ORDER BY
clause or in other cases when having a presorted set is beneficial (Group Aggregate, Merge Join);filename
or returned by files_func
are ordered according to sorted
option and have no intersection rangewise; this allows to use Gather Merge
node on top of parallel Multifile scan (default false
);false
);false
);JSONB
argument and return text array of full paths to parquet files;ap-northeast-1
).127.0.0.1:9000
).Foreign table may be created for a single Parquet file and for a set of files. It is also possible to specify a user defined function, which would return a list of file paths. Depending on the number of files and table options parquet_s3_fdw
may use one of the following execution strategies:
Strategy | Description |
---|---|
Single File | Basic single file reader |
Multifile | Reader which process Parquet files one by one in sequential manner |
Multifile Merge | Reader which merges presorted Parquet files so that the produced result is also ordered; used when sorted option is specified and the query plan implies ordering (e.g. contains ORDER BY clause) |
Caching Multifile Merge | Same as Multifile Merge , but keeps the number of simultaneously open files limited; used when the number of specified Parquet files exceeds max_open_files |
GUC variables:
true
);true
).true
).Example:
CREATE FOREIGN TABLE userdata (
id int,
first_name text,
last_name text
)
SERVER parquet_s3_srv
OPTIONS (
filename 's3://bucket/dir/userdata1.parquet'
);
SELECT * FROM userdata;
parquet_s3_fdw
also supports parallel query execution (not to confuse with multi-threaded decoding feature of Apache Arrow).
parquet_s3_fdw
also supports IMPORT FOREIGN SCHEMA
command to discover parquet files in the specified directory on filesystem and create foreign tables according to those files. It can be used as follows:
IMPORT FOREIGN SCHEMA "/path/to/directory"
FROM SERVER parquet_s3_srv
INTO public;
It is important that remote_schema
here is a path to a local filesystem directory and is double quoted.
Another way to import parquet files into foreign tables is to use import_parquet_s3
or import_parquet_s3_explicit
:
CREATE FUNCTION import_parquet_s3(
tablename text,
schemaname text,
servername text,
userfunc regproc,
args jsonb,
options jsonb)
CREATE FUNCTION import_parquet_s3_explicit(
tablename text,
schemaname text,
servername text,
attnames text[],
atttypes regtype[],
userfunc regproc,
args jsonb,
options jsonb)
The only difference between import_parquet_s3
and import_parquet_s3_explicit
is that the latter allows to specify a set of attributes (columns) to import. attnames
and atttypes
here are the attributes names and attributes types arrays respectively (see the example below).
userfunc
is a user-defined function. It must take a jsonb
argument and return a text array of filesystem paths to parquet files to be imported. args
is user-specified jsonb object that is passed to userfunc
as its argument. A simple implementation of such function and its usage may look like this:
CREATE FUNCTION list_parquet_s3_files(args jsonb)
RETURNS text[] AS
$$
BEGIN
RETURN array_agg(args->>'dir' || '/' || filename)
FROM pg_ls_dir(args->>'dir') AS files(filename)
WHERE filename ~~ '%.parquet';
END
$$
LANGUAGE plpgsql;
SELECT import_parquet_s3_explicit(
'abc',
'public',
'parquet_srv',
array['one', 'three', 'six'],
array['int8', 'text', 'bool']::regtype[],
'list_parquet_files',
'{"dir": "/path/to/directory"}',
'{"sorted": "one"}'
);
Schemaless mode is enabled by schemaless
option:
schemaless
option is true
: enable schemaless mode.schemaless
option is false
: disable schemaless mode (We call it non-schemaless
mode).schemaless
option is not configured, default value is false.schemaless
option is supported in CREATE FOREIGN TABLE
, IMPORT FOREIGN SCHEMA
, import_parquet_s3()
and import_parquet_s3_explicit()
.Schemaless foreign table needs at least one jsonb column to represent data:
CREATE FOREIGN TABLE example_schemaless (
id int,
v jsonb
) OPTIONS (filename '/path/to/parquet_file', schemaless 'true');
SELECT * FROM example_schemaless;
id | v
----+---------------------------------------------------------------------------------------------------------------------------------
| {"one": 1, "six": "t", "two": [1, 2, 3], "five": "2018-01-01", "four": "2018-01-01 00:00:00", "seven": 0.5, "three": "foo"}
| {"one": 2, "six": "f", "two": [null, 5, 6], "five": "2018-01-02", "four": "2018-01-02 00:00:00", "seven": null, "three": "bar"}
(2 rows)
Create foreign table:
With IMPORT FOREIGN SCHEMA
, import_parquet_s3()
and import_parquet_s3_explicit()
, foreign table will create with fixed column difinition like below:
CREATE FOREIGN TABLE example (
v jsonb
) OPTIONS (filename '/path/to/parquet_file', schemaless 'true');
Query data:
-- non-schemaless mode
SELECT * FROM example;
one | two | three | four | five | six | seven
-----+------------+-------+---------------------+------------+-----+-------
1 | {1,2,3} | foo | 2018-01-01 00:00:00 | 2018-01-01 | t | 0.5
2 | {NULL,5,6} | bar | 2018-01-02 00:00:00 | 2018-01-02 | f |
(2 rows)
-- schemaless mode
SELECT * FROM example_schemaless;
v
---------------------------------------------------------------------------------------------------------------------------------
{"one": 1, "six": "t", "two": [1, 2, 3], "five": "2018-01-01", "four": "2018-01-01 00:00:00", "seven": 0.5, "three": "foo"}
{"one": 2, "six": "f", "two": [null, 5, 6], "five": "2018-01-02", "four": "2018-01-02 00:00:00", "seven": null, "three": "bar"}
(2 rows)
Fetch values in jsonb expression:
->>
jsonb arrow operator which return text type. User may cast type the jsonb expression to get corresponding data representation. v->>'col'
expression of fetch value col
will be column name col
in parquet file and we call it schemaless variable
or slvar
.
SELECT v->>'two', sqrt((v->>'one')::int) FROM example_schemaless;
?column? | sqrt
--------------+--------------------
[1, 2, 3] | 1
[null, 5, 6] | 1.4142135623730951
(2 rows)
Some feature is different with non-schemaless
mode
WHERE
condition below:slvar::type {operator} const
. For example: (v->>'int64_col')::int8 = 100
const {operator} slvar ::type
. For example: 100 = (v->>'int64_col')::int8
slvar::boolean is true/false
. For example: (v->>'bool_col')::boolean is false
!(slvar::boolean)
. For example: !(v->>'bool_col')::boolean
exist
operator: ((v->>'col')::jsonb) ? element
, (v->'col') ? element
and v ? 'col'
sorted
option same as non-schemaless mode
slvar
instead of column name in the ORDER BY
clause.CREATE FOREIGN TABLE example_sorted (v jsonb)
SERVER parquet_s3_srv
OPTIONS (filename '/path/to/example1.parquet /path/to/example2.parquet', sorted 'int64_col', schemaless 'true');
EXPLAIN (COSTS OFF) SELECT * FROM example_sorted ORDER BY (v->>'int64_col')::int8;
QUERY PLAN
--------------------------------
Foreign Scan on example_sorted
Reader: Multifile Merge
Row groups:
example1.parquet: 1, 2
example2.parquet: 1
(5 rows)
->
operator.
SELECT * FROM example_schemaless;
v
----------------------------------------------------------------------------
{"array_col": [19, 20], "jsonb_col": {"1": "foo", "2": "bar", "3": "baz"}}
{"array_col": [21, 22], "jsonb_col": {"4": "test1", "5": "test2"}}
(2 rows)
SELECT v->'array_col'->1, v->'jsonb_col'->'1' FROM example3; ?column? | ?column? ----------+---------- 20 | "foo" 22 | (2 rows)
- Postgres cost for caculate `(jsonb->>'col')::type` is much larger than fetch column directly in `non-schemaless` mode, The query plan of `schemaless` mode can be different with `non-schemaless` mode in some complex query.
For other feature, schemaless
mode works same as non-schemaless
mode.
The user can issue an insert, update and delete statement for the foreign table, which has set the key columns.
CREATE FOREIGN TABLE userdata (
id1 int OPTIONS(key 'true'),
id2 int OPTIONS(key 'true'),
first_name text,
last_name text
) SERVER parquet_s3_srv
OPTIONS (
filename 's3://bucket/dir/userdata1.parquet'
);
key_columns
option:
CREATE FOREIGN TABLE userdata (
v JSONB
) SERVER parquet_s3_srv
OPTIONS (
filename 's3://bucket/dir/userdata1.parquet',
schemaless 'true',
key_columns 'id1 id2'
);
key_columns
option can be use in IMPORT FOREIGN SCHEMA feature:
-- in schemaless mode
IMPORT FOREIGN SCHEMA 's3://data/' FROM SERVER parquet_s3_srv INTO tmp_schema
OPTIONS (sorted 'c1', schemaless 'true', key_columns 'id1 id2');
-- corresponding CREATE FOREIGN TABLE
CREATE FOREIGN TABLE tbl1 (
v jsonb
) SERVER parquet_s3_srv
OPTIONS (filename 's3://data/tbl1.parquet', sorted 'c1', schemaless 'true', key_columns 'id1 id2');
-- in non-schemaless mode IMPORT FOREIGN SCHEMA 's3://data/' FROM SERVER parquet_s3_srv INTO tmp_schema OPTIONS (sorted 'c1', schemaless 'true', key_columns 'id1 id2'); -- corresponding CREATE FOREIGN TABLE CREATE FOREIGN TABLE tbl1 ( id1 INT OPTIONS (key 'true'), id2 INT OPTIONS (key 'true'), c1 TEXT, c2 FLOAT ) SERVER parquet_s3_srv OPTIONS (filename 's3://data/tbl1.parquet', sorted 'c1');
### insert_file_selector option
User defined function signature that is used by parquet_s3_fdw to retrieve the target parquet file on INSERT query:
```sql
CREATE FUNCTION insert_file_selector_func(one INT8, dirname text)
RETURNS TEXT AS
$$
SELECT (dirname || '/example7.parquet')::TEXT;
$$
LANGUAGE SQL;
CREATE FOREIGN TABLE example_func (one INT8 OPTIONS (key 'true'), two TEXT)
SERVER parquet_s3_srv
OPTIONS (
insert_file_selector 'insert_file_selector_func(one, dirname)',
dirname '/tmp/data_local/data/test',
sorted 'one');
[function name]([arg name] , [arg name] ...)
TEXT
(full paths to parquet file)[arg name]
: must be foreign table column name or dirname
dirname
arg: value of dirname option.column
args: get from inserted slot by name.parquet_s3_fdw supports keeping the sorted column still sorted in the modify feature.
Basically, the parquet file schema is defined according to a list of column names and corresponding types, but in parquet_s3_fdw's scan, it assumes that all columns with the same name have the same type. So, in modify feature, this assumption will be use also.
primitive type mapping: | SQL type | Arrow type |
---|---|---|
BOOL | BOOL | |
INT2 | INT16 | |
INT4 | INT32 | |
INT8 | INT64 | |
FLOAT4 | FLOAT | |
FLOAT8 | DOUBLE | |
TIMESTAMP/TIMESTAMPTZ | TIMESTAMP | |
DATE | DATE32 | |
TEXT | STRING | |
BYTEA | BINARY |
Default time precision for arrow::TIMESTAMP is microsecond an in UTC timezone.
LIST are created by its element type, just support primitive type for element.
MAP are created by its jsonb element type: | jsonb type | Arrow type |
---|---|---|
text | STRING | |
numeric | FLOAT8 | |
boolean | BOOL | |
null | STRING | |
other types | STRING |
In schemaless mode:
For first nested jsonb in schemaless mode: | jsonb type | Arrow type |
---|---|---|
array | LIST | |
object | MAP |
-- non-schemaless mode
CREATE FOREIGN TABLE example_insert (
c1 INT2 OPTIONS (key 'true'),
c2 TEXT,
c3 BOOLEAN
) SERVER parquet_s3_srv OPTIONS (filename 's3://data/example_insert.parquet');
INSERT INTO example_insert VALUES (1, 'text1', true), (2, DEFAULT, false), ((select 3), (select i from (values('values are fun!')) as foo (i)), true);
INSERT 0 3
SELECT * FROM example_insert;
c1 | c2 | c3
----+-----------------+----
1 | text1 | t
2 | | f
3 | values are fun! | t
(3 rows)
-- schemaless mode
CREATE FOREIGN TABLE example_insert_schemaless (
v JSONB
) SERVER parquet_s3_srv OPTIONS (filename 's3://data/example_insert.parquet', schemaless 'true', key_column 'c1');
INSERT INTO example_insert_schemaless VALUES ('{"c1": 1, "c2": "text1", "c3": true}'), ('{"c1": 2, "c2": null, "c3": false}'), ('{"c1": 3, "c2": "values are fun!", "c3": true}');
SELECT * FROM example_insert_schemaless;
v
-----------------------------------------------
{"c1": 1, "c2": "text1", "c3": "t"}
{"c1": 2, "c2": null, "c3": "f"}
{"c1": 3, "c2": "values are fun!", "c3": "t"}
(3 rows)
insert_file_selector
exists, target file is the result of this function.insert_file_selector
does not exist:dirname
option has specified. Creating new file with name format:
[foreign_table_name]_[date_time].parquet
-- non-schemaless mode
CREATE FOREIGN TABLE example (
c1 INT2 OPTIONS (key 'true'),
c2 TEXT,
c3 BOOLEAN
) SERVER parquet_s3_srv OPTIONS (filename 's3://data/example.parquet');
SELECT * FROM example; c1 | c2 | c3 ----+-----------------+---- 1 | text1 | t 2 | | f 3 | values are fun! | t (3 rows)
UPDATE example SET c3 = false WHERE c2 = 'text1'; UPDATE 1
SELECT * FROM example; c1 | c2 | c3 ----+-----------------+---- 1 | text1 | f 2 | | f 3 | values are fun! | t (3 rows)
DELETE FROM example WHERE c1 = 2; DELETE 1
SELECT * FROM example; c1 | c2 | c3 ----+-----------------+---- 1 | text1 | f 3 | values are fun! | t (2 rows)
-- schemaless mode CREATE FOREIGN TABLE example_schemaless ( v JSONB ) SERVER parquet_s3_srv OPTIONS (filename 's3://data/example.parquet', schemaless 'true', key_columns 'c1');
{"c1": 1, "c2": "text1", "c3": "t"} {"c1": 2, "c2": null, "c3": "f"} {"c1": 3, "c2": "values are fun!", "c3": "t"} (3 rows)
UPDATE example_schemaless SET v='{"c3":false}' WHERE v->>'c2' = 'text1'; UPDATE 1
{"c1": 1, "c2": "text1", "c3": "f"} {"c1": 2, "c2": null, "c3": "f"} {"c1": 3, "c2": "values are fun!", "c3": "t"} (3 rows)
DELETE FROM example_schemaless WHERE (v->>'c1')::int = 2; DELETE 1
{"c1": 1, "c2": "text1", "c3": "f"} {"c1": 3, "c2": "values are fun!", "c3": "t"} (2 rows)
## Limitations
- Transaction is not supported.
- Cannot create a single foreign table using parquet files on both file system and Amazon S3.
- The 4th and 5th arguments of `import_parquet_s3_explicit()` function are meaningless in `schemaless` mode.
- These arguments should be defined as `NULL` value.
- If these arguments is not NULL value the `WARNING` below will occur:
WARNING: parquet_s3_fdw: attnames and atttypes are expected to be NULL. They are meaningless for schemaless table.
HINT: Schemaless table imported always contain "v" column with "jsonb" type.
```
schemaless
mode does not support create partition table by CREATE TABLE parent_tbl (v jsonb) PARTITION BY RANGE((v->>'a')::int)
.parquet_s3_fdw
modifies the parquet file by creating a modifiable cache data from the target parquet file and overwriting the old one:sorted
columns only supports the following types: int2
, int4
, int8
, date
, timestamp
, float4
, float8
.key
columns only supports the following types: int2
, int4
, int8
, date
, timestamp
, float4
, float8
and text
.key
columns values must be unique, parquet_s3_fdw
does not support checking for unique values for key columns, user must do that.key
columns only required for UPDATE/DELETE.Opening issues and pull requests on GitHub are welcome.
Copyright (c) 2021, TOSHIBA Corporation
Copyright (c) 2018 - 2019, adjust GmbH
Permission to use, copy, modify, and distribute this software and its documentation for any purpose, without fee, and without a written agreement is hereby granted, provided that the above copyright notice and this paragraph and the following two paragraphs appear in all copies.
See the LICENSE.md
file for full details.