Open dspangen opened 3 months ago
Is you node leaving because it crashed or is it a graceful shutdown?
Not sure if i could follow your idea fully.
I fear an update on the NodeInfo might as well get lost when there is a node leaving and not running a graceful shutdown. Or was the idea to just update the local state on all cluster members and then double check against the cluster map if the node really left? Can you come up with a test that covers this behaviour?
As a backup solution any join or leave event could trigger getting the member list to make sure they are still in sync.
In general i would like to make sure that the vertx side does not compensate a missing feature or bug on the ignite side.
For details--the service is hosted in Amazon ECS and ECS can terminate a container quite un-gracefully (and multiple containers at once at times since for us we run 9 cluster members).
The solution is to ensure that the __vertx.subs
gets properly cleaned even if the node that is doing the cleaning crashes--the cleaning of __vertx.subs
is triggered by successfully deleting from __vertx.nodeInfo
, and since the two cache operations are not transactional (and IgniteCache::removeAll
is explicitly not transactional), Ignite cannot provide any guarantees that changes to __vertx.subs
will happen at some future point given it's only invoked once.
Instead of the current "first deleter wins", __vertx.nodeInfo
would also store the state of "removing" nodes to make sure the cluster manager is always able to clean the subs map of all subscriptions for removed nodes. I don't have the exact details worked out to handle that cleaning, but presumably it is that a node takes a lock for some period of time and executes IgniteCache::removeAll
, giving up / refreshing the lock periodically until all entries are removed at which point the entry in __vertx.nodeInfo
could be removed as well.
I can attempt to write a test but it can be triggered in a kubernetes / other container management situation where 2+ nodes are killed at ~ the same time.
Might not be the full extend of what you thought of but have a look at https://github.com/vert-x3/vertx-ignite/pull/142 and let me know if this would go into the right direction.
As the cache is configured as atomic there is even a retry behind the scenes to the atomic remove methods, so this could be promising.
i extended the test in ClusteredEventbosTest
to not only start 3 nodes and kill one to
@Test
public void testSubsRemovedForKilledNode() throws Exception {
int start = 20;
int stop = 18;
int messages = 30;
testSubsRemoved(start, stop, messages, latch -> {
for(int i = 1; i<=stop; i++) {
VertxInternal vi = (VertxInternal) vertices[i];
Promise<Void> promise = vi.getOrCreateContext().promise();
vi.getClusterManager().leave(promise);
promise.future().onComplete(onSuccess(v -> {
latch.countDown();
}));
}
});
}
private void testSubsRemoved(int start, int stop, int messages, Consumer<CountDownLatch> action) throws Exception {
if((start - stop) < 2) {
fail("start count needs to be at least stop count +2");
}
startNodes(start);
CountDownLatch regLatch = new CountDownLatch(stop);
AtomicInteger cnt = new AtomicInteger();
vertices[0].eventBus().consumer(ADDRESS1, msg -> {
int c = cnt.getAndIncrement();
assertEquals(msg.body(), "foo" + c);
if (c == messages - 1) {
testComplete();
}
if (c >= messages) {
fail("too many messages");
}
}).completionHandler(onSuccess(v -> {
for(int i = 1; i<=stop; i++) {
vertices[i].eventBus().consumer(ADDRESS1, msg -> {
fail("shouldn't get message");
}).completionHandler(onSuccess(v2 -> {
regLatch.countDown();
}));
}
}));
awaitLatch(regLatch);
CountDownLatch closeLatch = new CountDownLatch(1);
action.accept(closeLatch);
awaitLatch(closeLatch);
// Allow time for kill to be propagate
Thread.sleep(2000);
vertices[start - 1].runOnContext(v -> {
// Now send some messages from node 2 - they should ALL go to node 0
EventBus ebSender = vertices[start - 1].eventBus();
for (int i = 0; i < messages; i++) {
ebSender.send(ADDRESS1, "foo" + i);
}
});
await();
}
i get some ignite exceptions but it might be too graceful shutdowns still...
@dspangen what do you think?
I think this is a useful addition, but not sure it will be everything that is needed.
For context we've been using the following in prod for the last few weeks and are still getting some instability. (We use vert.x in Quarkus fwiw). After a restart we may get a few straggler addresses that contain node info for departed nodes, triggering a Connecting to server <cluster uuid> failed
. It is much better with this bean running on each node though.
@ApplicationScoped
public class IgniteVertxSubscriptionsPeriodicCleaner {
private static final Logger LOGGER = Logger.getLogger(IgniteVertxSubscriptionsPeriodicCleaner.class);
@ConfigProperty(name = "co.explo.vertx.ignite.periodic-subscriptions-cleaner-initial-delay")
Duration cleanerInitialDelay;
@ConfigProperty(name = "co.explo.vertx.ignite.periodic-subscriptions-cleaner-period")
Duration cleanerPeriod;
@ConfigProperty(name = "co.explo.vertx.ignite.periodic-subscriptions-cleaner-settle-period")
Duration settlePeriod;
final Random entropy = new Random();
@Inject
IgniteFactory igniteFactory;
Ignite ignite;
@Inject
Vertx vertx;
IgniteCache<String, IgniteNodeInfo> nodeInfo;
IgniteCache<String, IgniteNodeRemovalStatus> nodeRemovalStatus;
IgniteCache<IgniteRegistrationInfo, Boolean> subsMap;
String currentMemberNodeId;
@Priority(999)
void startPeriodicCleaner(@Observes final StartupEvent e) {
if (igniteFactory.isIgniteConfigured()) {
ignite = igniteFactory.getIgniteInstance();
currentMemberNodeId = ignite.cluster().localNode().id().toString();
nodeInfo = ignite.getOrCreateCache("__vertx.nodeInfo");
nodeRemovalStatus = ignite.getOrCreateCache("__vertx.nodeRemovalStatus");
subsMap = ignite.getOrCreateCache("__vertx.subs");
nodeInfo.query(new ContinuousQuery<String, IgniteNodeInfo>()
.setAutoUnsubscribe(true)
.setTimeInterval(100L)
.setPageSize(128)
.setLocalListener(this::listenOnNodeInfoChanges));
vertx.setPeriodic(
entropy.nextLong(1000, cleanerPeriod.toMillis()),
cleanerPeriod.toMillis(),
ignored -> vertx.executeBlockingAndForget(() -> {
blockingFindUnremovedNode();
return null;
}));
}
}
void blockingFindUnremovedNode() {
final Instant minimumSettledStartedAt = Instant.now().minus(settlePeriod);
final Set<String> currentClusterNodes =
ignite.cluster().nodes().stream().map(n -> n.id().toString()).collect(Collectors.toSet());
LOGGER.debugf("Attempting to find unremoved nodes");
nodeRemovalStatus
.query(new ScanQuery<String, IgniteNodeRemovalStatus>(
// Find entries where the processing node is no longer part of the cluster
// (i.e., crashed in the middle) and has settled (to ensure the node that started processing
// the removal is actually part of the cluster state cluster-wide)
(k, v) -> v.getCurrentStartedAt().isBefore(minimumSettledStartedAt)
&& !currentClusterNodes.contains(v.getCurrentNodeId())))
.getAll()
.stream()
.map(entry -> {
LOGGER.infof(
"Node removal for node %s by node %s orphaned; attempting to steal task",
entry.getKey(), entry.getValue().getCurrentNodeId());
// So we have an entry where the processing node is gone and this task is orphaned. Let's replace
// the entry and attempt to mark us as the new owner. If we succeed at marking ourselves as the
// owner we can proceed immediately to continuing to clean the subs map.
if (nodeRemovalStatus.replace(
entry.getKey(),
entry.getValue(),
new IgniteNodeRemovalStatus(Instant.now(), currentMemberNodeId))) {
blockingDeleteForNode(entry.getKey());
return true;
}
return false;
})
.findFirst() // only process a single un-removed node per loop
.ifPresent(ignored -> {
// ignored
});
}
void blockingDeleteForNode(final String removedNodeId) {
LOGGER.infof("Node removal starting for node %s", removedNodeId);
final TreeSet<IgniteRegistrationInfo> toRemove = subsMap
.query(new ScanQuery<IgniteRegistrationInfo, Boolean>(
(k, v) -> k.registrationInfo().nodeId().equals(removedNodeId)))
.getAll()
.stream()
.map(Cache.Entry::getKey)
.collect(Collectors.toCollection(TreeSet::new));
// Cannot use streams here as that would remove the ordering from the TreeSet
for (final IgniteRegistrationInfo info : toRemove) {
try {
subsMap.remove(info, Boolean.TRUE);
} catch (IllegalStateException | CacheException e) {
LOGGER.warnf("Could not remove subscriber (for nodeId=%s): %s", removedNodeId, e.getMessage());
}
}
nodeRemovalStatus.remove(removedNodeId);
LOGGER.infof("Node removal complete for node %s", removedNodeId);
}
void listenOnNodeInfoChanges(final Iterable<CacheEntryEvent<? extends String, ? extends IgniteNodeInfo>> events) {
vertx.executeBlockingAndForget(() -> {
StreamSupport.stream(events.spliterator(), false)
.filter(e -> e.getEventType() == EventType.REMOVED)
.forEach(e -> blockingOnNodeRemove(e.getKey()));
return null;
});
}
void blockingOnNodeRemove(final String removedNodeId) {
if (nodeRemovalStatus.putIfAbsent(
removedNodeId, new IgniteNodeRemovalStatus(Instant.now(), currentMemberNodeId))) {
LOGGER.infof("Node removal recorded for node %s", removedNodeId);
vertx.setTimer(
cleanerInitialDelay.toMillis(),
ignored -> vertx.executeBlockingAndForget(() -> {
blockingDeleteForNode(removedNodeId);
return null;
}));
}
}
}
i am not sure if this is only valid for the ignite cluster manager. i would go with that solution as last resort and maybe have it as a toggle-able option.
i would prefer to have something like an interface where such a recovery can be hooked into, like handling clustered eventbus exceptions not being able to talk to a peer. so this event could trigger a consistency check and make sure this node is still part of the cluster and/or has subscriptions not cleared yet.
@tsegismont what do you think about the above problem, or do you see something obvious in the ignite side that could go wrong?
A recovery interface seems like a good idea... at least until the root cause is identified. Today it's not possible to respond to a subs map inconsistency programmatically in any way (besides forking the cluster manager) hence this heavy-handed approach.
@dspangen I'm not familiar with ECS, is there anyway to configure rolling updates and up/downscaling so that it does not confuse the data grid? In the vertx-hazelcast
doc we have:
https://vertx.io/docs/vertx-hazelcast/java/#_rolling_updates and https://vertx.io/docs/vertx-hazelcast/java/#_cluster_administration
I know it's not the same datagrid but I think the concepts apply as well
Apologies in advance for a long explanation.
Looking into this further I believe I found the root cause, a mis-understanding of the Ignite distributed cache contract. The IgniteClusterManager
(and its SubsMapHelper
) are written with the assumption that updates observed via a ContinuousQuery
are immediately visible when performing a subsequent ScanQuery
. That, in my reading, isn't the case for REPLICATED
caches in Ignite (which all vertx internal state caches are marked as)--it's quite possible that you read from a partition copy that hasn't received all updates.
Here's how it manifests:
IgniteClusterManager
observes the event locally on all nodes__vertx.nodeInfo
)
IgniteVertxSubscriptionsPeriodicCleaner
that I shared above.ConcurrentMap
in SubsMapHelper
.
SubsMapHelper
(on every other node) relies on reading updates from the __vertx.subs
cache that the node removal master is writing and reflecting them locally, also querying the distributed cache, also potentially reading data from a non-primary or backup.I tested this hypothesis by forking the IgniteClusterManager
and SubsMapHelper
to explicitly mark nodes in the removal process when observing a node loss event in a local ConcurrentMap
. Entries in said map are maintained until the node removal process is complete, which is signified by the IgniteVertxSubscriptionsPeriodicCleaner
bean (which I shared above) removing the entry from the __vertx.nodeRemovalStatus
cache.
Importantly, with this change we have observed no inconsistency in a week. Previously, after every update to the cluster (approx once a day) several registrations would stick around in perpetuity, failing approximately 0.5-1.0% of all requests to our application with the infamous connecting to server <guid> failed
log message. This would force us to manually kill off containers until we got each node into a state without stale data in the local subs map. (See below for details of our application).
Given that this state is maintained across two unrelated objects it will take some work to clean that all up into a PR for submission, but I am happy to do so if of interest.
Explo provides embedded analytics and BI to companies looking to share data with their customers. We provide a SQL-first analytic engine on top of customer databases and data warehouses. We use Vert.x via Quarkus 3 to maintain our connection pooling infrastructure for our customer data warehouse connections. Given that we serve hundreds of customers and thousands of databases and data warehouses we needed a sharding solution to maintain pools of database connections through a cluster of nodes. We use Vert.x to manage that.
Our application is architected as follows:
haGroup
and has many shards
assigned to it.Thanks for the long explanation, that helps to trace down the potential problem a lot.
The reason for using REPLICATED
in combination with writeSynchronizationMode
FULL_SYNC
should guarantee that the replicas (not partitions) are always updated with a successful write. So reading should be quick and consistent, only writes have potential to get a performance hit.
To stay consistent by default for the rest of the shared data caches, it's PARTITIONED
with turned readFromBackup
off to stay consistent as well.
Can you refer to the Ignite documentation where this is stated?
Where you able to test my PR draft to check if that one get's you a lower failure rate?
This is a continuation of https://github.com/vert-x3/vertx-ignite/issues/94 as it's hitting us in production. Looking into it, it seems the culprit is in handling a node leave event--the subscriptions are cleared by the cluster member that gets to delete the node info from the cache. But that can take a little while during which the member that is handling the removal (essentially the leader for this event) could crash or be shutdown, leaving the subs map in a bad state.
I think the solution here is to add a status field to
IgniteNodeInfo
of{STARTED, STOPPING}
and mark the entry for a node as stopping. Then, whoever gets to update that entry gets the first chance to clear the subs. There would also be a background executor service that would poll for stopping members, maybe synchronized by a sempaphore id'd to the removed member.Does this seem like a reasonable solution to the problem?
Version
4.5.7