pgspider / influxdb_fdw

InfluxDB Foreign Data Wrapper for PostgreSQL.
Other
58 stars 14 forks source link

InfluxDB Foreign Data Wrapper for PostgreSQL

This is a foreign data wrapper (FDW) to connect PostgreSQL to InfluxDB database file. This FDW works with PostgreSQL 12, 13, 14, 15, 16.0 and confirmed with

PostgreSQL + InfluxDB

Contents

  1. Features
  2. Supported platforms
  3. Installation
  4. Usage
  5. Functions
  6. Identifier case handling
  7. Generated columns
  8. Character set handling
  9. Examples
  10. Limitations
  11. Contributing
  12. Useful links
  13. License

Features

Common features

GROUP BY time intervals and fill()

Support GROUP BY times() fill() syntax for InfluxDB. The fill() is supported by two stub function:

The influx_fill_numeric() and influx_fill_option() is embeded as last parameter of time() function. The table below illustrates the usage:

PostgreSQL syntax Influxdb Syntax
influx_time(time, interval '2h') time(2h)
influx_time(time, interval '2h', interval '1h') time(2h, 1h)
influx_time(time, interval '2h', influx_fill_numeric(100)) time(2h) fill(100)
influx_time(time, interval '2h', influx_fill_option('linear')) time(2h) fill(linear)
influx_time(time, interval '2h', interval '1h', influx_fill_numeric(100)) time(2h, 1h) fill(100)
influx_time(time, interval '2h', interval '1h', influx_fill_option('linear')) time(2h,1h) fill(linear)

Schemaless feature

Columns of foreign table in schemaless mode

Fetch values in jsonb expression:

For examples:

Pushdowning

Notes about features

The existence of NULL values depends on the target list in remote query in InfluxDB

For example:

The targets list contains both functions and fields schemaless jsonb column

Also see Limitations.

Time operator support for both schemaless and non-schemaless

For example:

CREATE FOREIGN TABLE tmp_time (
time timestamp,
c1 time,
c2 timestamp,
c3 timestamp with time zone,
agvState character varying NULL COLLATE pg_catalog."default",
value numeric NULL
) SERVER server1 OPTIONS (table 'tmp_time');

INSERT INTO tmp_time (time, c1, agvState, value) VALUES ('1900-01-01 01:01:01', '01:02:03', 'state 1', 0.1);
INSERT INTO tmp_time (time, c1, agvState, value) VALUES ('2100-01-01 01:01:01', '04:05:06', 'state 2', 0.2);
INSERT INTO tmp_time (time, c1, agvState, value) VALUES ('1990-01-01 01:01:01', '07:08:09', 'state 3', 0.3);
INSERT INTO tmp_time (time, c2) VALUES ('2020-12-27 03:02:56.634467', '1950-02-02 02:02:02');
INSERT INTO tmp_time (time, c3, agvState, value) VALUES ('2021-12-27 03:02:56.668301', '1800-02-02 02:02:02+9', 'state 5', 0.5);
INSERT INTO tmp_time (time, c1, c2, c3, agvState, value) VALUES ('2022-05-06 07:08:09', '07:08:09', '2022-05-06 07:08:09', '2022-05-06 07:08:09+9', 'state 6', 0.6);
INSERT INTO tmp_time (time, c1, c2, c3, agvState, value) VALUES ('2023-05-06 07:08:09', '07:08:10', '2023-05-06 07:08:09', '2023-05-06 07:08:09+9', 'state 7', 0.7);
INSERT INTO tmp_time (time, c1, c2, c3, c4, c5, agvState, value) VALUES ('2023-05-06 07:08:09', '07:08:10', '2023-05-06 07:08:09', '2023-05-06 07:08:09+9', '2023-05-06 08:08:09', '2023-05-06 08:08:09+9', 'state 8', 0.8);
INSERT INTO tmp_time (time, c1, c2, c3, c4, c5, agvState, value) VALUES ('2025-05-06 07:08:09', '07:08:10', '2025-05-06 07:08:09', '2025-05-06 07:08:09+9', '2025-05-06 08:08:09', '2025-05-06 08:08:09+9', 'state 9', 0.9);

SELECT * FROM tmp_time WHERE time > '2022-05-06 07:08:09'; time | c1 | c2 | c3 | agvstate | value ---------------------+----------+---------------------+------------------------+----------+------- 2023-05-06 07:08:09 | 07:08:10 | 2023-05-06 07:08:09 | 2023-05-06 07:08:09+09 | state 7 | 0.7 2100-01-01 01:01:01 | 04:05:06 | | | state 2 | 0.2 (2 rows)

- Example for point (2):
```sql
EXPLAIN VERBOSE
SELECT * FROM tmp_time WHERE c2 > '1950-02-02 02:02:02';
                                   QUERY PLAN                                   
--------------------------------------------------------------------------------
 Foreign Scan on public.tmp_time  (cost=10.00..227.00 rows=227 width=96)
   Output: "time", c1, c2, c3, agvstate, value
   Filter: (tmp_time.c2 > '1950-02-02 02:02:02'::timestamp without time zone)
   InfluxDB query: SELECT "c1", "c2", "c3", "agvstate", "value" FROM "tmp_time"
(4 rows)

SELECT * FROM tmp_time WHERE c2 > '1950-02-02 02:02:02';
        time         |    c1    |         c2          |           c3           | agvstate | value 
---------------------+----------+---------------------+------------------------+----------+-------
 2022-05-06 07:08:09 | 07:08:09 | 2022-05-06 07:08:09 | 2022-05-06 07:08:09+09 | state 6  |   0.6
 2023-05-06 07:08:09 | 07:08:10 | 2023-05-06 07:08:09 | 2023-05-06 07:08:09+09 | state 7  |   0.7
(2 rows)

EXPLAIN VERBOSE
SELECT * FROM tmp_time WHERE c2 = '2022-05-06 07:08:09';
                                                      QUERY PLAN                                                       
-----------------------------------------------------------------------------------------------------------------------
 Foreign Scan on public.tmp_time  (cost=10.00..3.00 rows=3 width=96)
   Output: "time", c1, c2, c3, agvstate, value
   InfluxDB query: SELECT "c1", "c2", "c3", "agvstate", "value" FROM "tmp_time" WHERE (("c2" = '2022-05-06 07:08:09'))
(3 rows)

SELECT * FROM tmp_time WHERE c2 = '2022-05-06 07:08:09';
        time         |    c1    |         c2          |           c3           | agvstate | value 
---------------------+----------+---------------------+------------------------+----------+-------
 2022-05-06 07:08:09 | 07:08:09 | 2022-05-06 07:08:09 | 2022-05-06 07:08:09+09 | state 6  |   0.6
(1 row)

SELECT * FROM tmp_time WHERE time != '2022-05-06 07:08:09'; time | c1 | c2 | c3 | agvstate | value ----------------------------+----------+---------------------+------------------------------+----------+------- 1900-01-01 01:01:01 | 01:02:03 | | | state 1 | 0.1 1990-01-01 01:01:01 | 07:08:09 | | | state 3 | 0.3 2020-12-27 03:02:56.634467 | | 1950-02-02 02:02:02 | | |
2021-12-27 03:02:56.668301 | | | 1800-02-02 02:21:01+09:18:59 | state 5 | 0.5 2023-05-06 07:08:09 | 07:08:10 | 2023-05-06 07:08:09 | 2023-05-06 07:08:09+09 | state 7 | 0.7 2100-01-01 01:01:01 | 04:05:06 | | | state 2 | 0.2 (6 rows)

- Example for point (4):
```sql
-- Does not push down time subtraction time - time vs interval
EXPLAIN VERBOSE
SELECT * FROM tmp_time WHERE time - c2 <= interval '1d';
                                   QUERY PLAN                                   
--------------------------------------------------------------------------------
 Foreign Scan on public.tmp_time  (cost=10.00..227.00 rows=227 width=96)
   Output: "time", c1, c2, c3, agvstate, value
   Filter: ((tmp_time."time" - tmp_time.c2) <= '@ 1 day'::interval)
   InfluxDB query: SELECT "c1", "c2", "c3", "agvstate", "value" FROM "tmp_time"
(4 rows)

SELECT * FROM tmp_time WHERE time - c2 <= interval '1d';
        time         |    c1    |         c2          |           c3           | agvstate | value 
---------------------+----------+---------------------+------------------------+----------+-------
 2022-05-06 07:08:09 | 07:08:09 | 2022-05-06 07:08:09 | 2022-05-06 07:08:09+09 | state 6  |   0.6
 2023-05-06 07:08:09 | 07:08:10 | 2023-05-06 07:08:09 | 2023-05-06 07:08:09+09 | state 7  |   0.7
(2 rows)

-- Does not push down nested time subtraction
EXPLAIN VERBOSE
SELECT * FROM tmp_time WHERE (time - c2) - (c1 - c1) > interval '-1d';
                                              QUERY PLAN                                               
-------------------------------------------------------------------------------------------------------
 Foreign Scan on public.tmp_time  (cost=10.00..227.00 rows=227 width=96)
   Output: "time", c1, c2, c3, agvstate, value
   Filter: (((tmp_time."time" - tmp_time.c2) - (tmp_time.c1 - tmp_time.c1)) > '@ 1 day ago'::interval)
   InfluxDB query: SELECT "c1", "c2", "c3", "agvstate", "value" FROM "tmp_time"
(4 rows)

SELECT * FROM tmp_time WHERE (time - c2) - (c1 - c1) > interval '-1d';
        time         |    c1    |         c2          |           c3           | agvstate | value 
---------------------+----------+---------------------+------------------------+----------+-------
 2022-05-06 07:08:09 | 07:08:09 | 2022-05-06 07:08:09 | 2022-05-06 07:08:09+09 | state 6  |   0.6
 2023-05-06 07:08:09 | 07:08:10 | 2023-05-06 07:08:09 | 2023-05-06 07:08:09+09 | state 7  |   0.7
(2 rows)

SELECT * FROM tmp_time WHERE influx_time(time, interval '3m') - interval '3m' < time; ERROR: stub influx_time(timestamp with time zone, interval) is called CONTEXT: PL/pgSQL function influx_time(timestamp with time zone,interval) line 3 at RAISE

- Example for point (6):
```sql
EXPLAIN VERBOSE
SELECT * FROM tmp_time WHERE time = c2;
                                         QUERY PLAN                                         
--------------------------------------------------------------------------------------------
 Foreign Scan on public.tmp_time  (cost=10.00..3.00 rows=3 width=112)
   Output: "time", c1, c2, c3, c4, c5, agvstate, value
   Filter: (tmp_time."time" = tmp_time.c2)
   InfluxDB query: SELECT "c1", "c2", "c3", "c4", "c5", "agvstate", "value" FROM "tmp_time"
(4 rows)

SELECT * FROM tmp_time WHERE time = c2;
        time         |    c1    |         c2          |           c3           |         c4          |           c5           | agvstate | value 
---------------------+----------+---------------------+------------------------+---------------------+------------------------+----------+-------
 2022-05-06 07:08:09 | 07:08:09 | 2022-05-06 07:08:09 | 2022-05-06 07:08:09+09 |                     |                        | state 6  |   0.6
 2023-05-06 07:08:09 | 07:08:10 | 2023-05-06 07:08:09 | 2023-05-06 07:08:09+09 | 2023-05-06 08:08:09 | 2023-05-06 08:08:09+09 | state 8  |   0.8
 2025-05-06 07:08:09 | 07:08:10 | 2025-05-06 07:08:09 | 2025-05-06 07:08:09+09 | 2025-05-06 08:08:09 | 2025-05-06 08:08:09+09 | state 9  |   0.9
(3 rows)

--Testcase 471: SELECT * FROM tmp_time WHERE c2 < c4; time | c1 | c2 | c3 | c4 | c5 | agvstate | value ---------------------+----------+---------------------+------------------------+---------------------+------------------------+----------+------- 2023-05-06 07:08:09 | 07:08:10 | 2023-05-06 07:08:09 | 2023-05-06 07:08:09+09 | 2023-05-06 08:08:09 | 2023-05-06 08:08:09+09 | state 8 | 0.8 2025-05-06 07:08:09 | 07:08:10 | 2025-05-06 07:08:09 | 2025-05-06 07:08:09+09 | 2025-05-06 08:08:09 | 2025-05-06 08:08:09+09 | state 9 | 0.9 (2 rows)

- Example for point (8):
```sql
EXPLAIN VERBOSE
SELECT * FROM tmp_time WHERE (time + interval '1d') > now() + interval '-1d';
                                         QUERY PLAN                                         
--------------------------------------------------------------------------------------------
 Foreign Scan on public.tmp_time  (cost=10.00..201.00 rows=201 width=112)
   Output: "time", c1, c2, c3, c4, c5, agvstate, value
   Filter: ((tmp_time."time" + '@ 1 day'::interval) > (now() + '@ 1 day ago'::interval))
   InfluxDB query: SELECT "c1", "c2", "c3", "c4", "c5", "agvstate", "value" FROM "tmp_time"
(4 rows)

SELECT * FROM tmp_time WHERE (time + interval '1d') > now() + interval '-1d';
        time         |    c1    |         c2          |           c3           |         c4          |           c5           | agvstate | value 
---------------------+----------+---------------------+------------------------+---------------------+------------------------+----------+-------
 2025-05-06 07:08:09 | 07:08:10 | 2025-05-06 07:08:09 | 2025-05-06 07:08:09+09 | 2025-05-06 08:08:09 | 2025-05-06 08:08:09+09 | state 9  |   0.9
 2100-01-01 01:01:01 | 04:05:06 |                     |                        |                     |                        | state 2  |   0.2
(2 rows)

SELECT * FROM tmp_time WHERE time = c2 + interval '25896 days 01:00:54.634467'; time | c1 | c2 | c3 | c4 | c5 | agvstate | value ----------------------------+----+---------------------+----+----+----+----------+------- 2020-12-27 03:02:56.634467 | | 1950-02-02 02:02:02 | | | | |
(1 row)


Supported platforms
-------------------

`influxdb_fdw` was developed on Linux, and should run on any
reasonably POSIX-compliant system.

`influxdb_fdw` is designed to be compatible with PostgreSQL 12 ~ 16.0.

Installation
------------
### Prerequisites

`Influxdb_fdw` supports 2 different client:
- Go client
- `Influxdb_cxx` client.

The installation for each kind of client is described as below.

#### Install InfluxDB Go client library

Go version should be 1.10.4 or later.

go get github.com/influxdata/influxdb1-client/v2

To use Go client, use GO_CLIENT=1 flag when compile the source code

#### Install `Influxdb_cxx` client library

Get source code from [`influxdb-cxx`](https://github.com/pgspider/influxdb-cxx) github repository and install as manual:

git clone https://github.com/pgspider/influxdb-cxx cd influxdb-cxx cmake .. -DINFLUXCXX_WITH_BOOST=OFF -DINFLUXCXX_TESTING=OFF sudo make install

Update `LD_LIBRARY_PATH` follow the installation folder of `Influxdb_cxx` client

To use `Influxdb_cxx`, use `CXX_CLIENT=1` flag when compile the source code. It is required to use `gcc` version 7 to build `influxdb_fdw` with `influxdb_cxx` client.

### Source installation
Add a directory of `pg_config` to PATH and build and install `influxdb_fdw`.
If you want to build `influxdb_fdw` in a source tree of PostgreSQL instead, don't use `USE_PGXS=1`.

#### Using Go client
```sh
make USE_PGXS=1 with_llvm=no GO_CLIENT=1
make install USE_PGXS=1 with_llvm=no GO_CLIENT=1

with_llvm=no is necessary to disable llvm bit code generation when PostgreSQL is configured with --with-llvm because influxdb_fdw use go code and cannot be compiled to llvm bit code.

Using Influxdb_cxx client

make USE_PGXS=1 with_llvm=no CXX_CLIENT=1
make install USE_PGXS=1 with_llvm=no CXX_CLIENT=1

Usage

CREATE SERVER options

influxdb_fdw accepts the following options via the CREATE SERVER command:

Target database name.

The address used to connect to InfluxDB server. Please note it's important to use http:// or https:// prefix before host address.

The port used to connect to InfluxDB server.

InfluxDB server version which to connect to. If not, InfluxDB FDW will try to connect to InfluxDB V2 first. If unsuccessful, it will try to connect to InfluxDB V1. If it is still unsuccessful, error will be raised. Availlable values: 1 for InfluxDB ver 1.x and 2 for InfluxDB ver 2.x.

Retention policy of target database. See in InfluxDB ver 2.x documentation.

CREATE USER MAPPING options

influxdb_fdw accepts the following options via the CREATE USER MAPPING command:

CREATE FOREIGN TABLE options

influxdb_fdw accepts the following table-level options via the CREATE FOREIGN TABLE command.

IMPORT FOREIGN SCHEMA options

influxdb_fdw supports IMPORT FOREIGN SCHEMA and accepts the following options via the IMPORT FOREIGN SCHEMA command.

TRUNCATE support

influxdb_fdw don't support the foreign data wrapper TRUNCATE API, available from PostgreSQL 14.

Functions

As well as the standard influxdb_fdw_handler() and influxdb_fdw_validator() functions, influxdb_fdw provides the following user-callable utility functions:

Listed as type, name with arguments, returm datatype where such function types are availlable:

Common functions

Special aggregations

Selectors

Transformations

Predictors

Technical Analysis

Special time functions

Identifier case handling

PostgreSQL folds identifiers to lower case by default. Rules and problems with InfluxDB identifiers yet not tested and described.

Generated columns

Behavoiur within generated columns yet not tested and described.

For more details on generated columns see:

Character set handling

Yet not described. Strongly recommended to use any of Unicode encodings for PostgreSQL with influxdb_fdw.

Examples

Install the extension:

Once for a database you need, as PostgreSQL superuser.

    CREATE EXTENSION influxdb_fdw;

Create a foreign server with appropriate configuration:

Once for a foreign datasource you need, as PostgreSQL superuser. Please specify database name using dbname option.

Go Client connect to InfluxDB ver 1.x

    CREATE SERVER influxdb_svr
    FOREIGN DATA WRAPPER influxdb_fdw
    OPTIONS (
          dbname 'mydb',
      host 'http://localhost',
      port '8086'
    );

Influxdb_cxx Client connect to InfluxDB ver 1.x

    CREATE SERVER influxdb_svr
    FOREIGN DATA WRAPPER influxdb_fdw
    OPTIONS (
          dbname 'mydb',
      host 'http://localhost',
      port '8086',
      version '1'
    );

Influxdb_cxx Client connect to InfluxDB ver 2.x

    CREATE SERVER influxdb_svr
    FOREIGN DATA WRAPPER influxdb_fdw
    OPTIONS (
          dbname 'mydb',
      host 'http://localhost',
      port '8086',
      version '2',
      retention_policy ''
    );

Grant usage on foreign server to normal user in PostgreSQL:

Once for a normal user (non-superuser) in PostgreSQL, as PostgreSQL superuser. It is a good idea to use a superuser only where really necessary, so let's allow a normal user to use the foreign server (this is not required for the example to work, but it's secirity recomedation).

    GRANT USAGE ON FOREIGN SERVER influxdb_svr TO pguser;

Where pguser is a sample user for works with foreign server (and foreign tables).

User mapping

Create an appropriate user mapping:

Go Client connect to InfluxDB ver 1.x

        CREATE USER MAPPING
    FOR pguser
    SERVER influxdb_svr
        OPTIONS (
      user 'username',
      password 'password'
    );

Where pguser is a sample user for works with foreign server (and foreign tables).

Influxdb_cxx Client connect to InfluxDB ver 1.x

        CREATE USER MAPPING
    FOR pguser
    SERVER influxdb_svr
        OPTIONS (
      user 'username',
      password 'password'
    );

Where pguser is a sample user for works with foreign server (and foreign tables).

Influxdb_cxx Client connect to InfluxDB ver 2.x

        CREATE USER MAPPING
    FOR pguser
    SERVER influxdb_svr
        OPTIONS (
      auth_token 'token'
    );

Where pguser is a sample user for works with foreign server (and foreign tables).

Create foreign table

All CREATE FOREIGN TABLE SQL commands can be executed as a normal PostgreSQL user if there were correct GRANT USAGE ON FOREIGN SERVER. No need PostgreSQL supersuer for security reasons but also works with PostgreSQL supersuer.

Create a foreign table referencing the InfluxDB table: You need to declare a column named time to access InfluxDB time column.

    CREATE FOREIGN TABLE t1(
      time timestamp with time zone,
      tag1 text,
      field1 integer
    )
    SERVER influxdb_svr
    OPTIONS (
      table 'measurement1'
    );

You can use tags option to specify tag keys of a foreign table.

    CREATE FOREIGN TABLE t2(
      tag1 text,
      field1 integer,
      tag2 text,
      field2 integer
    )
    SERVER influxdb_svr
    OPTIONS (
      tags 'tag1, tag2'
    );

You can import foreign schema

    IMPORT FOREIGN SCHEMA public
    FROM SERVER influxdb_svr
    INTO public;

Access foreign table

    SELECT * FROM t1;

Limitations

Limitations originate from data model and query language of InfluxDB

When a query to foreign tables fails, you can find why it fails by seeing a query executed in InfluxDB with EXPLAIN VERBOSE.

Contributing

Opening issues and pull requests on GitHub are welcome. Test scripts is multiversional for PostgreSQL, works in POSIX context and based on comparing output of SQL commands in psql with expected output text files. Current test expected results are generated based on results in Rocky Linux 8 and its default glibc 2.28. Test results may fail with other version of glibc with the following cases.

Useful links

Source code

Reference FDW realisation, postgres_fdw

General FDW Documentation

Other FDWs

License

Copyright (c) 2018, TOSHIBA CORPORATION

Copyright (c) 2011-2016, EnterpriseDB Corporation

Permission to use, copy, modify, and distribute this software and its documentation for any purpose, without fee, and without a written agreement is hereby granted, provided that the above copyright notice and this paragraph and the following two paragraphs appear in all copies.

See the LICENSE file for full details.