ArroyoSystems / arroyo

Distributed stream processing engine in Rust
https://arroyo.dev
Apache License 2.0
3.73k stars 213 forks source link

May be can merge same sink node in a logical graph? #700

Open zhuliquan opened 2 months ago

zhuliquan commented 2 months ago

This is a awesome project, I notice that arroyo merge source node (two query sql with same source) in logical graph. Why not merge the sink nodes (if the sink is the same) as well, which will save the number of file handles or Kafka connections.

zhuliquan commented 2 months ago

for example:

CREATE TABLE process_details (
  timestamp TIMESTAMP,
  uuid BIGINT,
  asset_oid BIGINT,
  process_id1 INT,
  process_id2 INT,
  process_id3 INT
) WITH (
  connector = 'single_file',
  path = '$input_dir/test.json',
  format = 'json',
  type = 'source'
);

CREATE TABLE alerts (
  timestamp TIMESTAMP,
  uuid BIGINT,
  asset_oid BIGINT,
  process_id1 INT,
  process_id2 INT,
  process_id3 INT,
  rule_id TEXT,
) WITH (
  connector = 'single_file',
  path = '$output_path',
  format = 'json',
  type = 'sink'
);
INSERT INTO alerts SELECT *, 'v1' as rule_id FROM process_details 
WHERE partition_record((asset_oid = 1)) AND ((process_id1 + process_id2 = 2) OR (process_id3 = 4));
INSERT INTO alerts SELECT *, 'v2' as rule_id FROM process_details 
WHERE partition_record((asset_oid = 1)) AND ((process_id1 + process_id2 = 3) OR (process_id3 = 3));

we can get two logical plan

# Logical Plan 1
Projection: process_details.timestamp AS timestamp, process_details.uuid AS uuid, process_details.asset_oid AS asset_oid, process_details.process_id1 AS process_id1, process_details.process_id2 AS process_id2, process_details.process_id3 AS process_id3, rule_id AS rule_id, process_details._timestamp
  Projection: process_details.timestamp, process_details.uuid, process_details.asset_oid, process_details.process_id1, process_details.process_id2, process_details.process_id3, Utf8("v1") AS rule_id, process_details._timestamp
    Filter: partition_record(process_details.asset_oid = Int64(1)) AND (process_details.process_id1 + process_details.process_id2 = Int32(2) OR process_details.process_id3 = Int32(4))
      WatermarkNode(process_details): process_details.timestamp, process_details.uuid, process_details.asset_oid, process_details.process_id1, process_details.process_id2, process_details.process_id3, process_details._timestamp
        RemoteTableExtension: process_details.timestamp, process_details.uuid, process_details.asset_oid, process_details.process_id1, process_details.process_id2, process_details.process_id3, process_details._timestamp
          Projection: process_details.timestamp, process_details.uuid, process_details.asset_oid, process_details.process_id1, process_details.process_id2, process_details.process_id3, process_details._timestamp
            TableSourceExtension: process_details.timestamp, process_details.uuid, process_details.asset_oid, process_details.process_id1, process_details.process_id2, process_details.process_id3, process_details._timestamp

# Logical Plan 2
Projection: process_details.timestamp AS timestamp, process_details.uuid AS uuid, process_details.asset_oid AS asset_oid, process_details.process_id1 AS process_id1, process_details.process_id2 AS process_id2, process_details.process_id3 AS process_id3, rule_id AS rule_id, process_details._timestamp
  Projection: process_details.timestamp, process_details.uuid, process_details.asset_oid, process_details.process_id1, process_details.process_id2, process_details.process_id3, Utf8("v2") AS rule_id, process_details._timestamp
    Filter: partition_record(process_details.asset_oid = Int64(1)) AND (process_details.process_id1 + process_details.process_id2 = Int32(3) OR process_details.process_id3 = Int32(3))
      WatermarkNode(process_details): process_details.timestamp, process_details.uuid, process_details.asset_oid, process_details.process_id1, process_details.process_id2, process_details.process_id3, process_details._timestamp
        RemoteTableExtension: process_details.timestamp, process_details.uuid, process_details.asset_oid, process_details.process_id1, process_details.process_id2, process_details.process_id3, process_details._timestamp
          Projection: process_details.timestamp, process_details.uuid, process_details.asset_oid, process_details.process_id1, process_details.process_id2, process_details.process_id3, process_details._timestamp
            TableSourceExtension: process_details.timestamp, process_details.uuid, process_details.asset_oid, process_details.process_id1, process_details.process_id2, process_details.process_id3, process_details._timestamp

I notice that logical plan add into PlanToGraphVisitor, Source Node have been merged. https://github.com/ArroyoSystems/arroyo/blob/c076bfe087e56cb65bff5c14d558504bbe4d0535/crates/arroyo-planner/src/lib.rs#L602-L606

// Begin DataFusion GraphViz Plan,
// display it online here: https://dreampuf.github.io/GraphvizOnline

digraph {
  subgraph cluster_1
  {
    graph[label="LogicalGraph"]
    0[shape=box label="(id: source_process_details_0, desc: Single File Source)"]
    1 -> 0 [arrowhead=none, arrowtail=normal, dir=back]
    1[shape=box label="(id: value_1, desc: process_details)"]
    2 -> 1 [arrowhead=none, arrowtail=normal, dir=back]
    2[shape=box label="(id: watermark_2, desc: watermark)"]
    5 -> 2 [arrowhead=none, arrowtail=normal, dir=back]
    3 -> 2 [arrowhead=none, arrowtail=normal, dir=back]
    3[shape=box label="(id: value_3, desc: alerts)"]
    4 -> 3 [arrowhead=none, arrowtail=normal, dir=back]
    4[shape=box label="(id: sink_alerts_4, desc: Single File Source)"]
    5[shape=box label="(id: value_5, desc: alerts)"]
    6 -> 5 [arrowhead=none, arrowtail=normal, dir=back]
    6[shape=box label="(id: sink_alerts_6, desc: Single File Source)"]
  }
}
// End DataFusion GraphViz Plan

I think it's not just about merging Source nodes, but also about merging Sink nodes. Because sink node are same and duplicated