Alachisoft / NCache

NCache: Highly Scalable Distributed Cache for .NET
http://www.alachisoft.com
Apache License 2.0
651 stars 124 forks source link

[Question] Pub/Sub Message TimeSpan Expiration and General Pub/Sub Performance #36

Open killemth opened 5 years ago

killemth commented 5 years ago

What is the purpose of the default topic expiration, or message level expiration?

Can I safely use TimeSpan.MinValue or Zero for messages to have them not persist, and only be sent to all current subscribers at that very moment?

Furthermore, I have yet to encounter when the actual message expiration is used -- merely subscribing to a topic doesn't automatically get all messages from what I've seen -- does it perhaps only get used on a temporary disconnect/reconnect event?

Currently I am using the following logic to publish a message;

public void PublishPubSubMessage(String topicName, Object message)
{
    ITopic topic = _cache.MessagingService.GetTopic(topicName) ?? _cache.MessagingService.CreateTopic(topicName);
    Metrics.Count("Cache Operation", "PubSub Request");

    topic.Publish(new Message(message, TimeSpan.FromSeconds(10)), DeliveryOption.All);
}

The use case is a very dynamic PubSub system where clients subscribe to adhoc topics and messages are sent to all subscribers, but there is no need to store these messages for new connections, as the objects are also persisted in normal cache and accessible by another Get method.

killemth commented 5 years ago

Also, how are topics maintained if there are no subscribers? Should this be a concern, since it's a guarantee that adhoc topics will be used and eventually not be needed anymore -- will NCache automatically close and delete topics when there are no subscribers? If not, is there a way I can get the total number of subscribers on an unsubscribe action, and then delete it if there are none?

killemth commented 5 years ago

The topics are definitely not being evicted, I see we've gone from 0 to over 100,000 topics overnight, with the vast majority of those undoubtedly being vacant.

killemth commented 5 years ago

@NCacheDev @Brad-NCache, Also, what would cause the ResponseQueueCount for the PubSub cache to increase and remain? Could this be due to dynamically scaling clients (servers) that were added and then removed when load decreased?

It seems the bigger issue is topic count persisting.

Something is also causing the CPU utilization to spike and remain higher after starting usage of the PubSub cache, which has grown over the past 2 days.

Originally we would see spikes up to about 10% CPU utilization when just using regular cache operations, dropping to 2% during low transaction volumes; After introducing the PubSub cache we saw spikes up to 50% on day 1, but during low transaction volume or connections it would drop back to 10% or so. On day 2 the spike went to 80%, and dropped no lower than 50% during the same low transaction volume. Today (day 3) we see spikes up to 95% and it has yet to settle below 70%.

I've also been watching the Average us/Cache Operation on the PubSub cache, and it is jumping to around 125,000 (125ms).

All of these days see nearly the same traffic patterns.

CPU Graph for cluster: image

PubSub Cache Requests/sec for cluster (monitored every 1 minute): image

Other (non PubSub) Cache Requests/sec image

AWS ELB Active Connections to ELB Serving PubSub: image

For more details on the cache, the config is as follows:

  <cache-config cache-name="...PubSub...">
    <cache-settings inproc="False">
      <logging enable-logs="True" trace-errors="True" trace-debug="False" log-path=""/>
      <performance-counters enable-counters="True" snmp-port="0"/>
      <cache-notifications item-remove="False" item-add="False" item-update="False"/>
      <cleanup interval="15sec"/>
      <storage type="heap" cache-size="5000mb"/>
      <eviction-policy default-priority="normal" eviction-ratio="5%"/>
      <cache-topology topology="partitioned">
        <cluster-settings operation-timeout="60sec" stats-repl-interval="60sec" use-heart-beat="False">
          <cluster-connection-settings port-range="2" connection-retries="4" connection-retry-interval="2secs" cluster-port="7821"/>
        </cluster-settings>
      </cache-topology>
      <client-death-detection enable="False" grace-interval="60sec"/>
    </cache-settings>
    <cache-deployment>
      <servers>
        ...
      </servers>
    </cache-deployment>
  </cache-config>

Some notable messages in the cache log are as follows;

System.Exception: Message with given id does not exist
   at Alachisoft.NCache.Caching.Topologies.Clustered.PartitionedServerCache.AssignmentOperation(MessageInfo messageInfo, SubscriptionInfo subscriptionInfo, TopicOperationType type, OperationContext context)
   at Alachisoft.NCache.Caching.Messaging.MessageManager.RevokeExpiredAssignments()
   at Alachisoft.NCache.Caching.Messaging.MessageManager.ProcessMessage()
ClusteredCacheBase.AnnouncePresence()   Alachisoft.NCache.Serialization.CompactSerializationException: Unable to cast object of type 'TopicStats' to type 'Alachisoft.NCache.Caching.Statistics.NodeInfo'.
   at Alachisoft.NCache.IO.CompactBinaryWriter.WriteObject(Object graph)
   at Alachisoft.NCache.Common.Util.SerializationUtility.SerializeDictionary[K,V](IDictionary`2 dictionary, CompactWriter writer)
   at Alachisoft.NCache.Caching.Statistics.CacheStatistics.Serialize(CompactWriter writer)
   at Alachisoft.NCache.Caching.Statistics.NodeInfo.Serialize(CompactWriter writer)
   at Alachisoft.NCache.Serialization.Surrogates.ContextSensitiveSerializationSurrogate.Write(CompactBinaryWriter writer, Object graph)
   at Alachisoft.NCache.IO.CompactBinaryWriter.WriteObject(Object graph)
   at Alachisoft.NCache.Caching.Topologies.Clustered.Function.Alachisoft.NCache.Runtime.Serialization.ICompactSerializable.Serialize(CompactWriter writer)
   at Alachisoft.NCache.Serialization.Surrogates.ContextSensitiveSerializationSurrogate.Write(CompactBinaryWriter writer, Object graph)
   at Alachisoft.NCache.Serialization.Formatters.CompactBinaryFormatter.Serialize(Stream stream, Object graph, String cacheContext, Boolean closeStream)
   at Alachisoft.NCache.Caching.Topologies.Clustered.ClusterService.SerializeMessage(Object msg)
   at Alachisoft.NCache.Caching.Topologies.Clustered.ClusterService.SendNoReplyMessage(Address dest, Object msg, Priority priority, Boolean isSeqRequired)
   at Alachisoft.NCache.Caching.Topologies.Clustered.ClusterCacheBase.AnnouncePresence(Boolean urgent)
killemth commented 5 years ago

So, another bit of information -- I am now also seeing the amount of time for a client to publish data to the PubSub cache taking in excess of 6,000ms

killemth commented 5 years ago

Today I have logs of the PublishPubSubMessage command taking over 30,000ms to complete...

Similar to yesterday, below are the activity charts.

Another anomaly from yesterday, the CPU did actually drop down to an average of 20%.

CPU Graph for cluster: image

PubSub Cache Requests/sec for cluster (monitored every 1 minute): image

Other (non PubSub) Cache Requests/sec image

AWS ELB Active Connections to ELB Serving PubSub: image

Here is another interesting graph that shows the Message Count data (sampled every 1 minute, not sure why one of the nodes is missing). image

killemth commented 5 years ago

I almost feel like I need to create a key for each topic that tracks presence, and if it ever drops, remove the topic, and then re-create the topic the next time a user joins.

We're using topics like keyed tokens that represent message channels for user 1:1 chat or other object updates to sync with web clients.

It would be really nice if the API allowed getting an active subscriber count for a topic, and even an API that allowed getting all current topics so that I could create a process that prunes them on a timer.

Brad-NCache commented 5 years ago

Hi Killemth,

Below are answers to your questions which I am stating on a per question basis in the order in which they put forward:

What is the purpose of the default topic expiration, or message level expiration?

Please note that expiration is always on message level and default topic expiration again applies to topic messages as their default expiration value of TimeSpan.MaxValue for cases when you don't specify an expiration on messages individually. Please note that any expiration does not remove topics that are created and is applied on messages only.

Moreover, as far as message behavior is concerned, If one or more subscribers consume the message, the message will be removed from the cache immediately without regards to its expiration time. In case no subscriber consumes the message, only then the massage is removed due to expiration after the expiration time is elapsed and message was still in the cache.

Topics are removed/unregistered by calling the DeleteTopic method explicitly. More information on NCache pub/sub messages and topics can be found at the following links:

Pub/Sub Messages: http://www.alachisoft.com/resources/docs/ncache/prog-guide/publish-subscribe-messages.html

Pub/Sub Topics: http://www.alachisoft.com/resources/docs/ncache/prog-guide/publish-subscribe-messages.html

Can I safely use TimeSpan.MinValue or Zero for messages to have them not persist, and only be sent to all current subscribers at that very moment?

A TimeSpan.MinValue set as an expiration period value on a message will throw an exception as TimeSpan.MinValue is a negative number which doesn't make sense for an expiration period value. Setting a TimeSpan.Zero value, on the other hand, marks the message for expiration almost immediately and might be removed right away on next cleanup interval. This can raise a situation where messages are removed before being sent to subscriber(s).

A Quick test that you can perform to test message expiration is by using an expiration value of 1-2 minutes and publish messages without any subscriber and you will notice that all messages in a topic are automatically removed after the expiration time is elapsed. If there are subscribers connected then messages are delivered and are removed immediately without waiting for expiration time.

As mentioned in answer to question 1, messages will be removed from cache as soon as they are consumed by current subscribers, and will not wait for end of expiration period. This behavior is same whether your DeliveryOption enum set on the message is All or Any in the ITopic Publish method parameter.

Furthermore, I have yet to encounter when the actual message expiration is used -- merely subscribing to a topic doesn't automatically get all messages from what I've seen -- does it perhaps only get used on a temporary disconnect/reconnect event?

Please note that standard Pub/Sub platform is loosely coupled in nature and messages are always sent to current subscribers and are removed from the cache immediately. New subscribers may not get messages that are already delivered and removed from the cache.

You also setup DeliveryOption enum for message and if the DeliveryOption is set to Any in which case messages are sent to any connected subscriber and it is not guaranteed to be received by others. Thus, messages may be removed by cache without reaching all the subscribers even if those subscribers are connected and listening.

If the DeliveryOption is set to All, currently connected subscribers receive all the messages being published whereas a later subscriber comes and only gets the messages most recently put forward. The older messages, having already been consumed by current subscribers before the new subscriber came along would already be removed from cache on successful delivery to current subscribers.

In either case, if the message is expired because its expiration period is too short, then it could be removed before being delivered to any subscriber. This is why I have suggested to run a test with 1-2 minutes of expiration time to understand message expiration more.

Also, how are topics maintained if there are no subscribers?

Topics don't have an expiration period of their own on them and they can persist for as long as the cache is up and running. If the cache is stopped or the topic is explicitly removed by calling the DeleteTopic method, only then will the topic be removed. The number of subscribers currently listening to the topic has no bearing on the topic lifetime.

will NCache automatically close and delete topics when there are no subscribers?

As mentioned above, NCache does not automatically close and delete topics. Topics are only removed by stopping the cache or calling the DeleteTopic method on the topic explicitly.

If not, is there a way I can get the total number of subscribers on an unsubscribe action, and then delete it if there are none?

Pub/Sub messaging is built with the characteristic that publishers and subscribers can exchange messages but it is always a loosely coupled setup where subscriber or publisher don't have any knowledge of each other. Further, users can not delete subscribers directly because subscribers are registered against auto-generated ids which are kept private.

There is, however, a PowerShell cmdlet Get-Topics that details all the topics registered on a specific cache along with the number of publishes and subscribers each of the topics has. This tool is the only means of viewing the topics state of the cache. More information about the tool can be found at the following link:

http://www.alachisoft.com/resources/docs/ncache/powershell-ref/get-topics.html

High CPU and High Response Times Issue:

As for the resource utilization you are seeing in terms of CPU bursts and topic counts increasing, please integrate topic deletion strategies into your code and see if that resolves the matter. To really solidify the concepts related to Pub/Sub, I would suggest going through our detailed discussion on the various moving parts of this NCache feature that can be found at the following link:

http://www.alachisoft.com/resources/docs/ncache/prog-guide/publish-subscribe-ncache.html

Once you are reasonably familiar with the terminology used, please run the sample located at %NCHOME%\NCache\samples\dotnet\PubSub. This will prove most helpful in making clear Pub/Sub concepts such as Topic creation/ deletion, message creation and setting properties, subscribing and publishing events, raising event and registering for event notifications etc.

Please let me know if there are any questions.

killemth commented 5 years ago

@Brad-NCache thanks again for your response, the information is greatly appreciated.

I have implemented the following to manage topics and subscriptions and will post back results tomorrow and later in the week.

There has also been a change to publishing messages, as previously when publishing I was creating the topic if it didn't exist -- which would also better explain why the Message Count was higher than zero at times (and perhaps response queue).

I will be adding a new CacheItem keyed on the topic name and client socket ID (GUID I generate on connection), and tagging with the topic name. This will be used on Subscribe and Unsubscribe actions, and when unsubscribing, if the count of subscribers is zero, the topic will be deleted. The topic will then be re-created when the next subscriber comes in if it doesn't already exist.

        private const String TopicSubscriberTokenName = "PubSubTopicSubscriber";

        /////////////////////////////////////////////////

        private static String TagTopicName(String topicName)
        {
            return String.Format("PUBSUB-TPC-{0}", topicName);
        }

        /////////////////////////////////////////////////

        private static void AddTopicSubscriber(String topicName, String clientSocketId)
        {
            LowCache.Insert(
                BuildCacheKey(TopicSubscriberTokenName, topicName, clientSocketId),
                new CacheItem(true)
                {
                    Tags = new[]
                    {
                        new Tag(TagTopicName(topicName))
                    },
                    AbsoluteExpiration = DateTime.UtcNow.AddHours(3)
                });
        }

        private static void RemoveTopicSubscriber(String topicName, String clientSocketId)
        {
            LowCache.Delete(BuildCacheKey(TopicSubscriberTokenName, topicName, clientSocketId));
        }

        private static Int32 CountTopicSubscribers(String topicName)
        {
            return LowCache.GetKeysByTag(new Tag(TagTopicName(topicName))).Count;
        }

        /////////////////////////////////////////////////

        public static ITopicSubscription Subscribe(String topicName, String clientSocketId, Action<String, Object> messageCallback, Action<String> topicClosedCallback)
        {
            if (String.IsNullOrEmpty(topicName))
            {
                return null;
            }

            AddTopicSubscriber(topicName, clientSocketId);

            return PubSubCache.Subscribe(topicName, messageCallback, topicClosedCallback);
        }

        public static void Unsubscribe(ITopicSubscription subscription, String clientSocketId)
        {
            String topicName = subscription.Topic.Name;
            RemoveTopicSubscriber(topicName, clientSocketId);

            Boolean deleteTopic = (CountTopicSubscribers(topicName) == 0);
           PubSubCache.Unsubscribe(subscription, deleteTopic);
        }

        public static void Publish(String topicName, Object message)
        {
            try
            {
                if (String.IsNullOrEmpty(topicName) || message == null)
                {
                    return;
                }

                PubSubCache.PublishPubSubMessage(topicName, message);
            }
            catch (Exception ex)
            {
                LoggingHelper.Write(ex);
            }
        }

The PubSubCache is a wrapper for the Alachisoft Cache class, in-part containing;

        public ITopicSubscription Subscribe(String topicName, Action<String, Object> messageCallback, Action<String> topicClosedCallback)
        {
            ITopicSubscription subscription = null;
            ProfileTracer tracer = ProfileTracer.CreateAsTrackedFunction("Cache : PubSub Subscribe");
            ProfileTracer.TraceString("Topic = " + topicName);
            Metrics.Count("Cache Operation", "PubSub Request");
            Metrics.Count("Cache Operation", "Request");

            tracer.Exec(() =>
            {
                ITopic topic = _cache.MessagingService.GetTopic(topicName);
                if (topic == null)
                {
                    topic = _cache.MessagingService.CreateTopic(topicName);
                    Metrics.Count("Cache Operation", "PubSub Topic Create");
                }

                subscription = topic.CreateSubscription(
                    (sender, args) =>
                    {
                        messageCallback(args.Topic.Name, args.Message.Payload);
                    });

                subscription.Topic.OnTopicDeleted = (sender, args) =>
                {
                    topicClosedCallback(args.TopicName);
                };
            });

            return subscription;
        }

        public void Unsubscribe(ITopicSubscription subscription, Boolean removeTopic)
        {
            ProfileTracer tracer = ProfileTracer.CreateAsTrackedFunction("Cache : PubSub Unsubscribe");
            ProfileTracer.TraceString("Topic = " + subscription.Topic);
            Metrics.Count("Cache Operation", "PubSub Request");
            Metrics.Count("Cache Operation", "Request");

            String topicName = subscription.Topic.Name;

            tracer.Exec(
                () =>
                {
                    subscription.UnSubscribe();

                    if (removeTopic)
                    {
                        _cache.MessagingService.DeleteTopic(topicName);
                        Metrics.Count("Cache Operation", "PubSub Topic Delete");
                    }
                });
        }

        public void PublishPubSubMessage(String topicName, Object message)
        {
            ProfileTracer tracer = ProfileTracer.CreateAsTrackedFunction("Cache : PubSub Publish");
            ProfileTracer.TraceString("Topic = " + topicName);
            Metrics.Count("Cache Operation", "PubSub Request");
            Metrics.Count("Cache Operation", "Request");

            tracer.Exec(
                () =>
                {
                    // ITopic topic = _cache.MessagingService.GetTopic(topicName) ?? _cache.MessagingService.CreateTopic(topicName);
                    ITopic topic = _cache.MessagingService.GetTopic(topicName);
                    if (topic == null || topic.IsClosed)
                    {
                        return;
                    }

                    topic.Publish(new Message(message, TimeSpan.FromSeconds(60)), DeliveryOption.All);
                });
        }
killemth commented 5 years ago

Using PowerShell to Stop-Cache and then Start-Cache did in-fact "restart" the PubSub cache as expected, however, after Start-Cache the Topics Count counter surged back to over 100,000.

I issued the Stop-Cache command on all cluster nodes, and then did a rolling Start-Cache -- so there shouldn't have been any state transfer, right?

I have logic within the web socket class that will re-create a topic if it is deleted while a subscriber is still active, however, I have no logs of that happening. In addition to this, I am inserting a CacheItem in another Cache for every topic that is created, and then adding a Tag so that I can perform a GetKeysByAllTags to see all active topics (since the PowerShell tool you referenced isn't available). I am only seeing 400 topics, which is very far from 100,000 from the performance counter.

Is the cache dumping a copy of all topics and then reloading them after start? I also reloaded the AppPool serving the web socket on all servers, so it should have been a completely new connection to NCache after the cache was started.

Update: That was really strange -- I performed that Stop-Cache process a few different times, and each time I would Start-Cache the topic count would go down.

Ron-Hussain commented 5 years ago

Using PowerShell to Stop-Cache and then Start-Cache did in-fact "restart" the PubSub cache as expected, however, after Start-Cache the Topics Count counter surged back to over 100,000.

I issued the Stop-Cache command on all cluster nodes, and then did a rolling Start-Cache -- so there shouldn't have been any state transfer, right?

I have logic within the web socket class that will re-create a topic if it is deleted while a subscriber is still active, however, I have no logs of that happening. In addition to this, I am inserting a CacheItem in another Cache for every topic that is created, and then adding a Tag so that I can perform a GetKeysByAllTags to see all active topics (since the PowerShell tool you referenced isn't available). I am only seeing 400 topics, which is very far from 100,000 from the performance counter.

Is the cache dumping a copy of all topics and then reloading them after start? I also reloaded the AppPool serving the web socket on all servers, so it should have been a completely new connection to NCache after the cache was started.

Update: That was really strange -- I performed that Stop-Cache process a few different times, and each time I would Start-Cache the topic count would go down.

Killemth,

I believe you are now seeing expected behavior where topic count goes to zero after cache is stopped based on the update that you have posted on this thread.

If you are still seeing this issue then please let us know and I can have this reviewed. We will need complete steps that you have performed along with a simple console sample app that helps us reproduce this situation.

Moreover, I will highly recommend that you create separate posts for different issues as it is going to be more readable for everyone.

Regards,

Ron Hussain

killemth commented 5 years ago

@Brad-NCache and @Ron-Hussain so far things have been performing better, with the exception of publishing times.

I'm logging when the PublishMessage request takes longer than 5-seconds; there are quite a few logs in here where it takes in excess of 10-seconds.

I'm not logging the number of topic-specific subscribers, however, the total unique connections with at least one subscription are logged and charted; at the time of the increased latency, there were only 300 connections.

During a period of about 5-hours, the publish latency averaged 10-seconds for JSON string objects that were about 800 bytes in size.

The CPU utilization for all of caching nodes and all of the "client" servers never spiked above 80%.

Ron-Hussain commented 4 years ago

@Brad-NCache and @Ron-Hussain so far things have been performing better, with the exception of publishing times. [Ron] Thanks for letting us know that this is working better now, I will review issue with publishing times in detail. Please see my comments below.

I'm logging when the PublishMessage request takes longer than 5-seconds; there are quite a few logs in here where it takes in excess of 10-seconds. I'm not logging the number of topic-specific subscribers, however, the total unique connections with at least one subscription are logged and charted; at the time of the increased latency, there were only 300 connections. During a period of about 5-hours, the publish latency averaged 10-seconds for JSON string objects that were about 800 bytes in size. The CPU utilization for all of caching nodes and all of the "client" servers never spiked above 80%.

[Ron] This could be a capacity issue. Please share below details with us. 1. What is total number of caching servers and client servers used here?

  1. What are hardware specs on these servers ?
  2. Please share NCache version and edition details.
  3. Share complete NCache cache logs from all NCache Servers (C:\Program Files\NCache\log-files).
  4. Also share your log snippets where it is taking 10 seconds or more to publish messages

We may have to setup some performance counters to review influx of data and number of client connections but will wait for above information to be shared first.