getindata / flink-http-connector

Http Connector for Apache Flink. Provides sources and sinks for Datastream , Table and SQL APIs.
Apache License 2.0
136 stars 39 forks source link

Support SQL queries with project pushdown after a lookup join #83

Closed OlivierZembri closed 3 months ago

OlivierZembri commented 3 months ago

Enable the project pushdown capablity to make it possible to use the result of a SQL lookup join query in a dynamic query and restrict the number of columns being used.

Example:

CREATE TABLE Orders (
    proc_time AS PROCTIME(),
    id STRING,
    `row` ROW<`aStringColumn` STRING, `anIntColumn` INT, `aFloatColumn` FLOAT>
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1',
  'fields.id.kind' = 'sequence',
  'fields.id.start' = '1',
  'fields.id.end' = '5'
);

CREATE TABLE Customers (
    `enrichedInt` INT,
    `enrichedString` STRING,
    `row` ROW<`aStringColumn` STRING, `anIntColumn` INT, `aFloatColumn` FLOAT>
) WITH (
    'format' = 'json',
    'lookup-request.format' = 'json',
    'lookup-request.format.json.fail-on-missing-field' = 'true',
    'connector' = 'rest-lookup',
    'lookup-method' = 'POST',
    'url' = 'http://localhost:9090/client',
    'gid.connector.http.source.lookup.header.Content-Type' = 'application/json',
    'asyncPolling' = 'true'
);

CREATE TEMPORARY VIEW lookupResult AS
    SELECT o.`id`, o.`row`, c.`enrichedInt`, c.`enrichedString` FROM Orders AS o
    JOIN Customers FOR SYSTEM_TIME AS OF o.`proc_time` AS c
    ON ( o.`row` = c.`row` );

SELECT r.id, r.enrichedInt FROM lookupResult r;