bvt123 / SCH

GNU General Public License v3.0
1 stars 4 forks source link

Data loss on replication lag #3

Open bvt123 opened 9 months ago

bvt123 commented 9 months ago

For clustered setups where the event stream (like from Kafka) goes to different replicas, some rows from the source table could be lost if replication lag became greater than the delay configured per destination table.

We already have a workaround for that situation in some processing Templates:

set receive_timeout=300;
SYSTEM SYNC REPLICA {src} ;
SELECT throwLog(count() > 0,'WARNING','Replication is Active')
FROM clusterAllReplicas('replicated',system.replication_queue)
WHERE database=extract('{src}', '(.*?)\.') AND table=extract('{src}', '\.(.*)');

But it looks ugly. While waiting for table replication to be not active is possible in some cases, checking replication_queue on every cluster node looks very strange.

It could be done better.

When Prepare for Transform code (Template) reads data from the source table it can check Clickhouse insert blocks sequence in all parts that are applied to the WHERE filter. Part names contain block numbers and if some part is still not fetched from the remote replica block sequence will be not monotonic and the error or warning should be fired enabling the retry of the Transform attempt.

create or replace function checkBlockSequence as (arr) ->
-- input: [array of array[start,end]] 
-- output: bool
    arraySum(
        (arrayFilter(x,y->y%2=0,
            (arrayPopFront
                (arrayDifference
                    (arrayFlatten(arraySort(arr))
                )
            ) as s),
            arrayEnumerate(s) ) as t
        )
    ) != length(t)
;

 insert into SCH.Offsets (topic, next, last, rows,  processor,state,hostid)
    with getTableDependencies(getSetting('agi_topic'),{delay}) as _deps,
         ( select last from SCH.Offsets where topic=getSetting('agi_topic') ) as _last,
         data as ( select max(pos) as p,id, splitByChar('_',_part) as block
                   from {src}
                   where (pos > _last.1 or pos = _last.1 and  id > _last.2 )
                     and pos < _deps.1
                   group by id
                   order by p,id
                   limit {maxstep}
                 ),
(select count(), max((p,id)),
 checkBlockSequence(groupUniqArray([toUInt32(block[2]),toUInt32(block[3])]))
from data) as stat
    select topic,
        stat.2,
        last,
        stat.1                                        as rows,
        if(rows >= {maxstep},'FullUniqId','UniqId')       as processor,
        if(rows > 0, 'processing', _deps.2 )          as state,
        splitByChar(':',getSetting('log_comment'))[1] as hostid
    from SCH.Offsets
    where topic=getSetting('agi_topic')
      and next.1 = toDateTime64(0,3)
      and  throwLog(stat.3, 'ERROR','Block Sequence Mismatch. It could be too high replication lag.')=0

The source table should not be partitioned as it complicates checking block sequences.

bvt123 commented 9 months ago

test:

create table t4 on cluster '{cluster}' (a UInt32, pos DateTime) engine=ReplicatedMergeTree order by pos;
system stop fetches on cluster '{cluster}' t4;
insert into t4 values (1, now());  -- node 0
insert into t4 values (2, now());  -- node 1
insert into t4 values (3, now());  -- node 0
select *,_part from t4 order by pos;
select * from system.replication_queue;
with splitByChar('_',_part) as block
select checkBlockSequence(groupUniqArray([toUInt32(block[2]),toUInt32(block[3])])) as a from t4;
system start fetches on cluster '{cluster}' t4;
select checkBlockSequence(groupUniqArray([toUInt32(block[2]),toUInt32(block[3])])) as a from t4;