Open ChrisKujawa opened 1 year ago
I investigated that further.
The following statement:
This may cause reading the first key-value pair of the next logical column family (that contains data).
is not true.
I did a test:
@Test
public void shouldLoopWhileEqualPrefixEndOfCF() {
// given
upsertInDefaultCF(10, 1);
upsertInDefaultCF(10, 2);
upsertInCompositeCF(11, "foo", "baring");
upsertInCompositeCF(11, "foo2", "different value");
upsertInCompositeCF(12, "hello", "world");
upsertInCompositeCF(12, "this is the one", "as you know");
upsertInCompositeCF(13, "another", "string");
upsertInCompositeCF(14, "might", "be good");
upsertInOtherCF(10, 1);
upsertInOtherCF(10, 2);
firstKey.wrapLong(14);
// when
final List<String> values = new ArrayList<>();
compositeCF.whileEqualPrefix(
firstKey,
(KeyValuePairVisitor<DbCompositeKey<DbLong, DbString>, DbString>)
(key, value) -> values.add(value.toString()));
// then
assertThat(values).containsExactly("be good");
}
The debugger shows that we are NOT iterating over the column family bound. This is because we use .useFixedLengthPrefixExtractor(Long.BYTES)
and .setPrefixSameAsStart(true).
The combination of these options allows RocksDB to check always the prefix of 8 bytes length, which is our ColumnFamily prefix. If we reach the end, the rocksdb iterator becomes invalid (and or loop ends).
See related comment. and wiki page.
Still, the prefix check in the loop is necessary. For loops inside the same column family.
\cc @romansmirnov
I think it makes sense to separate the current iterating method, into two.
It looks like there is some bug in the prefix iteration, previously also mentioned by @romansmirnov. He was able to produce this yesterday. Today I had a look at it and I was also able to reproduce it.
I remove the manual prefix check for the normal foreach methods and from time to time engine tests seems to fail. It seems to be not deterministic, which is weird.
I was able to debug a case where the test failed. We can see that the iterating should be inside CF 14 (PENDING DEPLOYMENT)
When we run out of bounce we are standing at CF 34 (INCIDENTS)
java.lang.IndexOutOfBoundsException: index=8 length=4 capacity=8
at org.agrona.AbstractMutableDirectBuffer.boundsCheck0(AbstractMutableDirectBuffer.java:1716) ~[agrona-1.18.1.jar:1.18.1]
at org.agrona.concurrent.UnsafeBuffer.ensureCapacity(UnsafeBuffer.java:646) ~[agrona-1.18.1.jar:1.18.1]
at org.agrona.AbstractMutableDirectBuffer.getInt(AbstractMutableDirectBuffer.java:185) ~[agrona-1.18.1.jar:1.18.1]
at io.camunda.zeebe.db.impl.DbInt.wrap(DbInt.java:27) ~[classes/:?]
at io.camunda.zeebe.db.impl.DbCompositeKey.wrap(DbCompositeKey.java:47) ~[classes/:?]
at io.camunda.zeebe.db.impl.rocksdb.transaction.TransactionalColumnFamily.visit(TransactionalColumnFamily.java:426) ~[classes/:?]
at io.camunda.zeebe.db.impl.rocksdb.transaction.TransactionalColumnFamily.lambda$forEachWithoutPrefix$21(TransactionalColumnFamily.java:409) ~[classes/:?]
at io.camunda.zeebe.db.impl.rocksdb.transaction.ColumnFamilyContext.withPrefixKey(ColumnFamilyContext.java:112) ~[classes/:?]
at io.camunda.zeebe.db.impl.rocksdb.transaction.TransactionalColumnFamily.forEachWithoutPrefix(TransactionalColumnFamily.java:398) ~[classes/:?]
at io.camunda.zeebe.db.impl.rocksdb.transaction.TransactionalColumnFamily.lambda$forEach$7(TransactionalColumnFamily.java:163) ~[classes/:?]
at io.camunda.zeebe.db.impl.rocksdb.transaction.TransactionalColumnFamily.lambda$ensureInOpenTransaction$19(TransactionalColumnFamily.java:316) ~[classes/:?]
at io.camunda.zeebe.db.impl.rocksdb.transaction.DefaultTransactionContext.runInTransaction(DefaultTransactionContext.java:31) ~[classes/:?]
at io.camunda.zeebe.db.impl.rocksdb.transaction.TransactionalColumnFamily.ensureInOpenTransaction(TransactionalColumnFamily.java:315) ~[classes/:?]
at io.camunda.zeebe.db.impl.rocksdb.transaction.TransactionalColumnFamily.forEach(TransactionalColumnFamily.java:161) ~[classes/:?]
at io.camunda.zeebe.engine.state.deployment.DbDeploymentState.foreachPendingDeploymentDistribution(DbDeploymentState.java:128) ~[classes/:?]
at io.camunda.zeebe.engine.processing.deployment.distribute.DeploymentRedistributor.runRetryCycle(DeploymentRedistributor.java:56) ~[classes/:?]
at io.camunda.zeebe.stream.api.scheduling.SimpleProcessingScheduleService.lambda$runAtFixedRate$0(SimpleProcessingScheduleService.java:35) ~[classes/:?]
Right now it looks like we need the manual prefix check because of this bug/limitation. I would need to investigate further why this happens.
Might be related to transactions and missing prefix extractor support, see https://github.com/facebook/rocksdb/issues/5100
Some other related issues:
The idea was that we can remove our manual prefix check for some cases, since this should be already guaranteed by Rocksdb since we use prefix extractor etc. But there seems to be some issues as described here https://github.com/camunda/zeebe/issues/12036#issuecomment-1532638474
/* * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under * one or more contributor license agreements. See the NOTICE file distributed * with this work for additional information regarding copyright ownership. * Licensed under the Zeebe Community License 1.1. You may not use this file * except in compliance with the Zeebe Community License 1.1. */ package io.camunda.zeebe.db.impl; import static org.assertj.core.api.Assertions.assertThat; import io.camunda.zeebe.db.ColumnFamily; import io.camunda.zeebe.db.TransactionContext; import io.camunda.zeebe.db.ZeebeDb; import io.camunda.zeebe.db.ZeebeDbFactory; import io.camunda.zeebe.db.ZeebeDbTransaction; import java.io.File; import java.util.HashMap; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; public final class MultiColumnFamilyTest { @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); private final ZeebeDbFactorydbFactory = DefaultZeebeDbFactory.getDefaultFactory(); private ZeebeDb zeebeDb; private ColumnFamily , DbString> compositeCF; private DbLong compositeCFFirstKey; private DbString compositeCFSecondKey; private DbCompositeKey compositeKey; private DbString value; private DbLong defaultKey; private DbLong defaultValue; private ColumnFamily defaultCF; private DbString otherKey; private DbString otherValue; private ColumnFamily otherCF; private TransactionContext transactionContext; @Before public void setup() throws Exception { final File pathName = temporaryFolder.newFolder(); zeebeDb = dbFactory.createDb(pathName); transactionContext = zeebeDb.createContext(); compositeCFFirstKey = new DbLong(); compositeCFSecondKey = new DbString(); compositeKey = new DbCompositeKey<>(compositeCFFirstKey, compositeCFSecondKey); value = new DbString(); compositeCF = zeebeDb.createColumnFamily( ColumnFamilies.COMPOSITE, transactionContext, compositeKey, value); defaultKey = new DbLong(); defaultValue = new DbLong(); defaultCF = zeebeDb.createColumnFamily( ColumnFamilies.DEFAULT, transactionContext, defaultKey, defaultValue); otherKey = new DbString(); otherValue = new DbString(); otherCF = zeebeDb.createColumnFamily(ColumnFamilies.OTHER, transactionContext, otherKey, otherValue); } @Test public void shouldLoopWhileEqualPrefixEndOfCF() throws Exception { // given upsertInDefaultCF(10, 1); upsertInDefaultCF(11, 2); upsertInCompositeCF(11, "foo", "baring"); upsertInCompositeCF(11, "foo2", "different value"); upsertInCompositeCF(12, "hello", "world"); upsertInCompositeCF(12, "this is the one", "as you know"); upsertInCompositeCF(13, "another", "string"); upsertInCompositeCF(14, "might", "be good"); upsertInOtherCF("10", "1"); upsertInOtherCF("10", "2"); // when defaultCF.forEach((key, value) -> keyValues.put(key.getValue(), value.getValue())); // then assertThat(keyValues).containsOnlyKeys(10L, 11L).containsValues(1L, 2L); } private void upsertInDefaultCF(final long key, final long value) { defaultKey.wrapLong(key); defaultValue.wrapLong(value); defaultCF.upsert(defaultKey, defaultValue); } private void upsertInOtherCF(final String key, final String value) { otherKey.wrapString(key); otherValue.wrapString(value); otherCF.upsert(otherKey, otherValue); } private void upsertInCompositeCF( final long firstKey, final String secondKey, final String value) { compositeCFFirstKey.wrapLong(firstKey); compositeCFSecondKey.wrapString(secondKey); this.value.wrapString(value); compositeCF.upsert(compositeKey, this.value); } enum ColumnFamilies { DEFAULT, COMPOSITE, OTHER, } }
The test runs without issues, even in a loop.
I checked again where and when actually the problem occurred. It was for example in the PendingDeploymentCheck. This is scheduled within the ProcessingScheduleService, which is the same actor as the StreamProcessor.
The stream processor creates for the processing a transaction in order to accumulate all state changes, the transaction will be committed later in a different actor job. There is a chance that the pending deployment check is executed interleaving before the transaction commit. This is also the reason why the issue was not deterministically reproducible as described earlier https://github.com/camunda/zeebe/issues/12036#issuecomment-1532638474.
The running between process and committing the transaction might be not an issue IF the pending deployment checker (or redistributor) uses a different transaction, but it uses the same. :bug: This is quite problematic since it can see intermediate state changes, which might be not committed at all, because the transaction can still be rolled back on an error. (Issue: https://github.com/camunda/zeebe/issues/12647).
Due to this reusing of the transaction and the issue (which I referenced earlier), e.g https://github.com/facebook/rocksdb/issues/5100 and https://github.com/facebook/rocksdb/issues/2343. It runs into the issue. It seems as soon as the transaction is dirty the prefix iterating doesn't work anymore, and it will fail.
I was able to reproduce this with my previous test:
@Test
public void shouldLoopWhileEqualPrefixEndOfCF() throws Exception {
// given
upsertInDefaultCF(10, 1);
upsertInDefaultCF(11, 2);
upsertInCompositeCF(11, "foo", "baring");
upsertInCompositeCF(11, "foo2", "different value");
upsertInCompositeCF(12, "hello", "world");
upsertInCompositeCF(12, "this is the one", "as you know");
upsertInCompositeCF(13, "another", "string");
upsertInCompositeCF(14, "might", "be good");
upsertInOtherCF("10", "1");
upsertInOtherCF("10", "2");
// when
final var keyValues = new HashMap<Long, Long>();
+ final ZeebeDbTransaction currentTransaction = transactionContext.getCurrentTransaction();
+ currentTransaction.run(
+ () -> {
+ upsertInCompositeCF(114, "12", "be good");
+ upsertInOtherCF("10", "1121");
+ });
+ // we don't commit, so it is still on going
+ // currentTransaction.commit();
defaultCF.forEach((key, value) -> keyValues.put(key.getValue(), value.getValue()));
// then
assertThat(keyValues).containsOnlyKeys(10L, 11L).containsValues(1L, 2L);
}
The test will fail with the ongoing transaction and using the same transaction context. If we would commit the transaction the test would be green again, or when using a different transaction context.
Right now it looks like that we will still need the manual prefix check in our foreach implementation, due to the limitation in RocksDB. :black_square_button:
Description
Related to https://github.com/camunda/zeebe/issues/12033 Came up in https://github.com/camunda/zeebe/issues/11813