shayhatsor / zookeeper

Apache ZooKeeper .NET async Client
https://nuget.org/packages/ZooKeeperNetEx/
Apache License 2.0
236 stars 53 forks source link

Calling getChildrenAsync could take forever #52

Open laurenceSaes opened 2 years ago

laurenceSaes commented 2 years ago

Hello,

I was trying to implement a zookeeper change monitor. It seems that a situation could occur where calling getChildrenAsync would take forever. I made a small test program to demonstrate this behavior.

Steps: 1) Set a breakpoint at the TrackChildren(eventPath); line in the ChildrenElementsChanged method. 2) Step into the TrackChildren call and step over _zookeeper.getChildrenAsync. 3) The application is now stuck on this line

The program:

namespace ZooKeeperDemo
{
  using System;
  using System.Collections.Generic;
  using System.Linq;
  using System.Threading.Tasks;
  using org.apache.zookeeper;

  internal sealed class ChildrenBug
  {
    public static async Task Start(string connectString, string root)
    {
      var connection = new ZooKeeper(connectString, 10_000_000, null);
      var listener = new TreeMonitor(connection, "/");
      listener.Start();

      await CreateElement(root, connection);

      var first = Guid.NewGuid();

      for (var i = 0; i < 3; i++)
      {
        await CreateElement(root + "/" + first, connection);
        await connection.deleteAsync(root + "/" + first);
        await CreateElement(root + "/" + Guid.NewGuid(), connection);
      }

      while (true)
      {
        await Task.Delay(TimeSpan.FromDays(1));
      }
    }

    private static async Task CreateElement(string path, ZooKeeper zookeeper)
    {
      var channelExists = await zookeeper.existsAsync(path);
      if (channelExists != null)
      {
        return;
      }

      await zookeeper.createAsync(path, Array.Empty<byte>(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    class TreeMonitor
    {
      private readonly object _lockObject = new object();
      private readonly ZooKeeper _zookeeper;
      private readonly string _root;
      private readonly SimpleWatcher _childrenWatcher;
      private readonly SimpleWatcher _childWatcher;
      private readonly Dictionary<string, byte[]> _state = new Dictionary<string, byte[]>();

      public TreeMonitor(ZooKeeper zookeeper, string root)
      {
        _zookeeper = zookeeper;
        _root = root == "/" ? string.Empty : root;
        _childrenWatcher = new SimpleWatcher(ChildrenElementsChanged);
        _childWatcher = new SimpleWatcher(ChildElementsChanged);
      }

      public void Start()
      {
        lock (_lockObject)
        {
          TrackChildren(_root);
        }
      }

      private void TrackChildren(string trackPath)
      {
        var rootToUse = trackPath == string.Empty ? "/" : trackPath;

        var children = _zookeeper.getChildrenAsync(rootToUse, _childrenWatcher).ConfigureAwait(false).GetAwaiter().GetResult();

        foreach (var child in children.Children.Where(child => !_state.ContainsKey(child)))
        {
          var childPath = $"{(trackPath != "/" ? trackPath : string.Empty)}/{child}";
          TrackChild(childPath);
          TrackChildren(childPath);
        }
      }
      private void TrackChild(string path)
      {
        if (path == null)
        {
          return;
        }

        try
        {
          var dataFetch = _zookeeper.getDataAsync(path, _childWatcher).ConfigureAwait(false).GetAwaiter().GetResult();
          _state[path] = dataFetch.Data;
        }
        catch (KeeperException.NoNodeException)
        {
          _state.Remove(path);
        }
      }

      private Task ChildrenElementsChanged(WatchedEvent watchedEvent)
      {
        if (watchedEvent.get_Type() == Watcher.Event.EventType.NodeChildrenChanged)
        {
          lock (_lockObject)
          {
            var eventPath = watchedEvent.getPath();
            TrackChildren(eventPath);
          }
        }

        return Task.CompletedTask;
      }

      private Task ChildElementsChanged(WatchedEvent watchedEvent)
      {
        var child = watchedEvent.getPath();
        var eventType = watchedEvent.get_Type();

        lock (_lockObject)
        {
          if (eventType == Watcher.Event.EventType.NodeDataChanged || eventType == Watcher.Event.EventType.NodeCreated)
          {
            TrackChild(child);
          }

          if (eventType == Watcher.Event.EventType.NodeDeleted)
          {
            _state.Remove(child);

            // The child could already be created in the meantime. Try to track it, it will fail when it is still missing.
            TrackChild(child);
          }
        }

        return Task.CompletedTask;
      }
    }
  }
}