vert-x3 / vertx-zookeeper

Zookeeper based cluster manager implementation
Other
73 stars 67 forks source link

Zookeeper nodes accumulating as net connections are created/closed #128

Closed tsegismont closed 1 year ago

tsegismont commented 2 years ago

Questions

I use websocket through vertx and enable the cluster (zookeeper). I noticed that when a websocket connection is created, an address will be generated and registered to the zookeeper. When the registration is cancelled, only the data in the address will be cleared, not clear the address, causing the zookeeper node to increase indefinitely,Many nodes like this:

vertx.net.fe071115-964d-40d8-b908-3e764188d2ad vertx.ws.014f5f57-829e-45a8-9580-bc42007cb33b

Version

4.1.1

Context

i read the source code,found this :

class  NetSocketImpl

  void registerEventBusHandler() {
    Handler<Message<Buffer>> writeHandler = msg -> write(msg.body());
    registration = vertx.eventBus().<Buffer>localConsumer(writeHandlerID).handler(writeHandler);
  }

and

class  WebSocketImplBase

  void registerHandler(EventBus eventBus) {
    Handler<Message<Buffer>> binaryHandler = msg -> writeBinaryFrameInternal(msg.body());
    Handler<Message<String>> textHandler = msg -> writeTextFrameInternal(msg.body());
    binaryHandlerRegistration = eventBus.<Buffer>localConsumer(binaryHandlerID).handler(binaryHandler);
    textHandlerRegistration = eventBus.<String>localConsumer(textHandlerID).handler(textHandler);
  }

used method <T> MessageConsumer<T> localConsumer(String address) when registering ,

then will arrive this :

class ClusteredEventBus , method :

  @Override
  protected <T> void onLocalRegistration(HandlerHolder<T> handlerHolder, Promise<Void> promise) {
    if (!handlerHolder.isReplyHandler()) {
      RegistrationInfo registrationInfo = new RegistrationInfo(
        nodeId,
        handlerHolder.getSeq(),
        handlerHolder.isLocalOnly()
      );
      clusterManager.addRegistration(handlerHolder.getHandler().address, registrationInfo, Objects.requireNonNull(promise));
    } else if (promise != null) {
      promise.complete();
    }
  }

the handlerHolder,isReplyHandler() is false,but localOnly is true, so the address is registered to the clusterManager。

I think this address should not be registered to the clusterManager,because the address is registered with localConsumer method,

Maybe it should be so?

if (!handlerHolder.isReplyHandler() && !handlerHolder.isLocalOnly())

tsegismont commented 2 years ago

Originally reported by @jitawangzi

wjw465150 commented 1 year ago

I have found that the "remove" method of the "SubsMapHelper" class can be removed by adding only a few lines of code under the "eventbus" address ("/__vertx.subs") that does not subscribe! Here is my modified "remove" method:

  public void remove(String address, RegistrationInfo registrationInfo, Promise<Void> promise) {
    try {
      if (registrationInfo.localOnly()) {
        localSubs.computeIfPresent(address, (add, curr) -> removeFromSet(registrationInfo, curr));
        fireRegistrationUpdateEvent(address);
        promise.complete();
      } else {
        curator.delete().guaranteed().inBackground((c, e) -> {
          if (e.getType() == CuratorEventType.DELETE) {
            vertx.runOnContext(aVoid -> {
              //@wjw_del: ownSubs.computeIfPresent(address, (add, curr) -> removeFromSet(registrationInfo, curr));
              //@wjw_add-> 
              Set<RegistrationInfo> regInfoSet = ownSubs.computeIfPresent(address, (add, curr) -> removeFromSet(registrationInfo, curr));
              if (regInfoSet == null) { //There are no child nodes below
                try {
                  String parentPath = keyPath.apply(address);
                  if (parentPath.startsWith(VERTX_SUBS_NAME)) {
                    log.info("removed no child eventbus node:" + parentPath);
                    curator.delete().forPath(parentPath);
                  }
                } catch (Exception e1) {
                  log.error(String.format("remove subs address %s failed.", address), e1);
                }
              }
              //<-@wjw_add 
              promise.complete();
            });
          }
        }).forPath(fullPath.apply(address, registrationInfo));
      }
    } catch (Exception e) {
      log.error(String.format("remove subs address %s failed.", address), e);
      promise.fail(e);
    }
  }

1

wjw465150 commented 1 year ago

add: First see if we have children

  public void remove(String address, RegistrationInfo registrationInfo, Promise<Void> promise) {
    try {
      if (registrationInfo.localOnly()) {
        localSubs.computeIfPresent(address, (add, curr) -> removeFromSet(registrationInfo, curr));
        fireRegistrationUpdateEvent(address);
        promise.complete();
      } else {
        curator.delete().guaranteed().inBackground((c, e) -> {
          if (e.getType() == CuratorEventType.DELETE) {
            vertx.runOnContext(aVoid -> {
              //@wjw_del: ownSubs.computeIfPresent(address, (add, curr) -> removeFromSet(registrationInfo, curr));
              //@wjw_add-> 
              Set<RegistrationInfo> regInfoSet = ownSubs.computeIfPresent(address, (add, curr) -> removeFromSet(registrationInfo, curr));
              if (regInfoSet == null) { //There are no child nodes below
                try {
                  String parentPath = keyPath.apply(address);
                  if (parentPath.startsWith(VERTX_SUBS_NAME)) {
                    int childSize = curator.getChildren().forPath(parentPath).size();  //First see if we have children
                    if (childSize == 0) {
                      log.info("removed no child eventbus node:" + parentPath);
                      curator.delete().forPath(parentPath);
                    }
                  }
                } catch (Exception e1) {
                  log.warn(String.format("remove subs address %s failed.", address), e1);
                }
              }
              //<-@wjw_add 
              promise.complete();
            });
          }
        }).forPath(fullPath.apply(address, registrationInfo));
      }
    } catch (Exception e) {
      log.error(String.format("remove subs address %s failed.", address), e);
      promise.fail(e);
    }
  }
neterium commented 1 year ago

We have this issue as well. The content of the /io.vertx/vertx.subs is increasing, the stability of the cluster is at risk and we had issues in production multiple times. When a consumer is unregistered, the /io.vertx/vertx.subs/XX/NodeID entry is deleted, but not the /io.vertx/__vertx.subs/XX entry (when no more child). Not sure it's the same issue but this starts being critical on our side, please let us know what we can do to help speed up the delivery of this fix.

Thanks!

wjw465150 commented 1 year ago

I have already submitted the modified code, but it has not been accepted. The solution is: Use creatingParentContainersIfNeeded instead of creatingParentsIfNeeded, Can solve the legacy orphan node problem with no child nodes!

neterium commented 1 year ago

Indeed, according to this: CreateMode Enum

Looks like it's the correct way to create those kind of nodes.

neterium commented 1 year ago

But then I'm wondering why the code snipped you have suggested above is still required if the creatingParentContainersIfNeeded () method is used ?

wjw465150 commented 1 year ago

If you use the creatingParentContainersIfNeeded() method, you no longer need to delete it manually!You can refer to the branch of my fork: https://github.com/wjw465150/vertx-zookeeper.

neterium commented 1 year ago

We will issue a tested pull request soon, which will hopefully get merged quickly

vmorsiani commented 1 year ago

Hi,

See attached PR which is based on wjw465150

As explained in the PR we have the following scenario:

We have a scenario where we have multiple verticles which are used to index large sets of data. each verticle register a consumer on an address such "indexing." where the session id is a uuid (dynamic) once the indexing is done the consumer are unregistered and properly deleted, but the parent node not removed. Using a Container as Parent solves the issue. Not deleting the parent result in nodes accumulating in Zookeeper which cause the system to become unstable.

If you think using a Container as a parent is not a suitable option, please advise on how to approach the issue and I'll gladly look at providing an updated patch.

neterium commented 1 year ago

@tsegismont Can you please confirm this will be included in the upcoming release ?

tsegismont commented 1 year ago

@tsegismont Can you please confirm this will be included in the upcoming release ?

:+1:

tsegismont commented 1 year ago

Closed in 02f6d94