apache / paimon

Apache Paimon is a lake format that enables building a Realtime Lakehouse Architecture with Flink and Spark for both streaming and batch operations.
https://paimon.apache.org/
Apache License 2.0
2.1k stars 834 forks source link

[Bug] LookupStreamingReader should not apply valueFilter in scanning kv table #3605

Open nonggialiang opened 1 week ago

nonggialiang commented 1 week ago

Search before asking

Paimon version

0.8.0

Compute Engine

flink 1.17.1

Minimal reproduce step


CREATE TABLE paimon.xxx.dim_0625 (
  id bigint,
  k string,
  v string,
  PRIMARY KEY (id) NOT ENFORCED 
)
WITH (
    'bucket' = '1'
);

CREATE TABLE paimon.xxx.fact_0625 (
  name string,
  k string,
  proctime as PROCTIME()
)
WITH (
    'bucket' = '1'
);

select 
a.name,
a.k as ak,
b.k as bk,
b.v
from paimon.xxx.fact_0625  /*+ OPTIONS('scan.mode'='latest') */ a
left join paimon.xxx.dim_0625 /*+ OPTIONS('continuous.discovery-interval'='3s') */ FOR SYSTEM_TIME AS OF a.proctime AS b 
on a.k = b.k
where b.v = 'x'

Run the sqls above in streaming mode, the operator in DAG would be, Note that it's turned into InnerJoin by flink because of the where condiction on lookup table:

 [5690]:TableSourceScan(table=[[paimon, xxx, fact_0625]], fields=[name, k], hints=[[[OPTIONS options:{scan.mode=latest}]]])
+- [5691]:LookupJoin(table=[paimon.xxx.dim_0625], joinType=[InnerJoin], lookup=[v=_UTF-16LE'x', k=k], where=[(v = 'x')], select=[name, k, k])
   +- [5692]:Calc(select=[name, k AS ak, k0 AS bk, CAST('x' AS VARCHAR(2147483647)) AS v])
      +- [5693]:Sink(table=[default_cat.default_db.print], fields=[name, ak, bk, v])

Now run there sqls in batch mode sequentially to produce records in both dim and fact tables:

insert into paimon.xxx.dim_0625
select 1 as id, 'k' as k, 'x' as v;

insert into paimon.xxx.fact_0625
select 'r1' as id, 'k' as k;

insert into paimon.xxx.dim_0625
select 1 as id, 'k' as k, 'y' as v;

(now dim table should contains no records with 'x' in field v)

insert into paimon.xxx.fact_0625
select 'r2' as id, 'k' as k;

insert into paimon.xxx.fact_0625
select 'r3' as id, 'k' as k;

The output will be :

+I[r1, k, k, x]
+I[r2, k, k, x]
+I[r3, k, k, x]

What doesn't meet your expectations?

+I[r2, k, k, x] +I[r3, k, k, x]

Those two records should not join out with x.

Anything else?

In this case the predicate [v = 'x'] will be applied to Scan used by LookupStreamingReader, image
And the predicate will finally become valueFilter in KeyValueFileStoreScan, and used in filterWholeBucketByStats, causing the v='y' record (and the whole lsm tree) being filtered out by the stats of the ManifestEntry.

But on the other hand, the old version of the record in lookup table is waiting for a UPDATE_AFTER record with the same key to retract itself. (See SecondaryIndexLookupTable.refresh() ) image

Are you willing to submit a PR?