tikv / client-java

TiKV Java Client
https://tikv.github.io/client-java/
Apache License 2.0
111 stars 109 forks source link

[BUG] CDCRegionClient's key range is not working #256

Open shanzi opened 3 years ago

shanzi commented 3 years ago

CDC's region client will pass a key range to the ChangeDataRequest, assuming TiKV will perform filtering upon the key range so that only change logs in the range will be streamed back. However, this assumption does not hold. A GRPC request to the region will return all change logs in that region regardless of the key range.

Test code

package org.tikv.tiflink;

import com.google.common.collect.Range;
import java.math.BigInteger;
import org.tikv.cdc.CDCClient;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.key.Key;
import org.tikv.common.key.RowKey;
import org.tikv.common.meta.TiTableInfo;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.kvproto.Coprocessor.KeyRange;
import org.tikv.shade.com.google.protobuf.ByteString;
import org.tikv.txn.TwoPhaseCommitter;

public class CDCClientTest {
  public static void main(final String[] args) throws InterruptedException {
    final TiConfiguration conf = TiConfiguration.createDefault(args[0]);
    final TiSession session = TiSession.create(conf);
    final TiTableInfo tableInfo = session.getCatalog().getTable(args[1], args[2]);

    final RowKey startKey = RowKey.createMin(tableInfo.getId());
    final RowKey endKey = RowKey.createBeyondMax(tableInfo.getId());

    final TiRegion region0 = session.getRegionManager().getRegionByKey(startKey.toByteString());
    final TiRegion region1 = session.getRegionManager().getRegionByKey(endKey.toByteString());

    System.out.println(region0.getId() == region1.getId());

    final RowKey midKey =
        RowKey.toRowKey(
            tableInfo.getId(),
            BigInteger.valueOf(startKey.getHandle())
                .add(BigInteger.valueOf(endKey.getHandle()))
                .divide(BigInteger.valueOf(2))
                .longValue());

    final KeyRange range0 =
        KeyRange.newBuilder()
            .setStart(startKey.toByteString())
            .setEnd(midKey.toByteString())
            .build();
    final KeyRange range1 =
        KeyRange.newBuilder().setStart(midKey.toByteString()).setEnd(endKey.toByteString()).build();

    final CDCClient cdcClient0 = new CDCClient(session, range0);
    final CDCClient cdcClient1 = new CDCClient(session, range1);

    final long startTs = session.getTimestamp().getVersion();
    cdcClient0.start(startTs);
    cdcClient1.start(startTs);

    final ByteString value = ByteString.copyFromUtf8("test");

    final TwoPhaseCommitter committer = new TwoPhaseCommitter(session, startTs);
    committer.prewritePrimaryKey(
        ConcreteBackOffer.newRawKVBackOff(), startKey.getBytes(), value.toByteArray());
    committer.commitPrimaryKey(
        ConcreteBackOffer.newRawKVBackOff(),
        startKey.getBytes(),
        session.getTimestamp().getVersion());

    final Range<Key> range =
        Range.closedOpen(Key.toRawKey(range1.getStart()), Key.toRawKey(range1.getEnd()));
    final Key key = Key.toRawKey(cdcClient1.get().getKey()); // Should not be able to get this key.
    System.out.println(range.test(key)); // flase
  }
}

The solution might be filtering at the client side in the region client.

liuzix commented 3 years ago

Yes. It is the best approach now to support filtering on the client side.