dotnet / orleans

Cloud Native application framework for .NET
https://docs.microsoft.com/dotnet/orleans
MIT License
10.08k stars 2.03k forks source link

Consume EventHub - the best way to do that #5120

Closed darthkurak closed 2 years ago

darthkurak commented 6 years ago

Hi. Im looking on Orleans, and have scenario where i have to read external data from EventHub (EventHub is not populated with data from Orleans applications/clients), and eventually pass them further to other grains - model flows and etc. I wonder - is this is somehow achievable inside orleans middle-tier (to leverage scaling and distributed nature of the system) or i have to externally (as OrleanClient) read the events from EventHub, and then eventually pass them to needed grains? For example, let's assume scenario where i want to read data from EventHub, partition them and catch by arbitrary property (UserId), and then write, grouped, as single transaction (for reach key) to DataLake. The trigger to write should be size of cached data (around 4 MB) or time (if we not make it to gather 4 MB before time elapsed). From theory, i would use some Orleans Service or Stream to read data, extract key, and then pass each message to grain with key as ID - then grain batch this events, and have Orleans Timer or size Trigger to flush data do DataLake - but if this has any chance of success? Is 4 MB size grains are suitable for Orleans? And still, i have no idea how to read those events from EventHub. Is something like this achievable in Olreans? @jason-bragg - could you help here? :)

jason-bragg commented 6 years ago

We do support reading non-orleans written data using EventHub streams. Unfortunately it's non-trivial to do so. We have a work item to simplify this: https://github.com/dotnet/orleans/issues/4621 Until we get this work done, you'll need to customize the queue adapter as described in the linked issue. Please let me know if you encounter any difficulties.

The sort of processing you're describing has been implemented by a couple groups in MSFT using EventHub Streams, so yes, that sort of processing should be fine. The EventHub streams are recoverable, that is, under errors, one can rewind the stream (up to a limit) and reprocess data from a previous checkpoint. This sort of recovery logic was designed for scenarios where stream data is batched up over time and periodically written, like what you've described. If you're considering this approach, you may want to look at the cases covered in our recoverable stream testing, as most of those cases are based off of real world issues seen in production services. https://github.com/dotnet/orleans/blob/master/test/Tester/StreamingTests/ImplicitSubscritionRecoverableStreamTestRunner.cs

darthkurak commented 6 years ago

Hi, i tried implementation from #4621, but I'm missing something - logging say, that pulling manager was initialized with 0 queues = how to tell him to read from all my 32 partitions of EventHub?

darthkurak commented 6 years ago

This is my implementation: ` public class CustomDataStreamAdapterFactory : EventHubAdapterFactory { // All these are already set in base class, but not accessible here.
// So we need to duplicate them. private readonly EventHubOptions ehOptions; private readonly StreamCacheEvictionOptions evictionOptions; private readonly StreamStatisticOptions statisticOptions; private readonly SerializationManager serializationManager; private EventHubClient client;

    public CustomDataStreamAdapterFactory(string name, IOptionsSnapshot<EventHubOptions> ehOptions, IOptionsSnapshot<EventHubReceiverOptions> receiverOptions,
        IOptionsSnapshot<EventHubStreamCachePressureOptions> cacheOptions, IOptionsSnapshot<StreamCacheEvictionOptions> evictionOptions, IOptionsSnapshot<StreamStatisticOptions> statisticOptions,
        IServiceProvider serviceProvider, SerializationManager serializationManager, ITelemetryProducer telemetryProducer, ILoggerFactory loggerFactory)
        : base(name, ehOptions.Get(name), receiverOptions.Get(name), cacheOptions.Get(name), evictionOptions.Get(name), statisticOptions.Get(name), serviceProvider, serializationManager, telemetryProducer, loggerFactory)
    {
        this.ehOptions = ehOptions.Get(name);
        this.evictionOptions = evictionOptions.Get(name);
        this.statisticOptions = statisticOptions.Get(name);
        this.serializationManager = serializationManager;
    }

    // need to write custom eventhub client init code, because base class client is private, 
    // and client is needed to write events in QueueMessageBatchAsync.
    protected override void InitEventHubClient()
    {
        var connectionStringBuilder = new EventHubsConnectionStringBuilder(this.ehOptions.ConnectionString)
        {
            EntityPath = this.ehOptions.Path
        };
        this.client = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
    }

    // need to override this to change the way data is written to eventhub, rather than just having
    // a component that handles that like we do on the read size (cache data adapter)
    public override Task QueueMessageBatchAsync<T>(Guid streamGuid, string streamNamespace, IEnumerable<T> events, StreamSequenceToken token,
        Dictionary<string, object> requestContext)
    {
        if (token != null)
        {
            throw new NotImplementedException("EventHub stream provider currently does not support non-null StreamSequenceToken.");
        }
        // REPLACE - Encode event data as you need 
        EventData eventData = EventHubBatchContainer.ToEventData(this.SerializationManager, streamGuid, streamNamespace, events, requestContext);
        // REPLACE - Encode event data as you need 

        return this.client.SendAsync(eventData, streamGuid.ToString());
    }

    /// <summary>
    /// Get partition Ids from eventhub
    /// </summary>
    /// <returns></returns>
    protected override async Task<string[]> GetPartitionIdsAsync()
    {
        EventHubRuntimeInformation runtimeInfo = await client.GetRuntimeInformationAsync();
        return runtimeInfo.PartitionIds;
    }

    // We can customize the cache without writing an entire queue adapter factory, but no point as
    // we need to customize the adapter factory anyway for the QueueMessageBatchAsync call, so
    // may as well customize the cache here rather than via the configurator
    protected override IEventHubQueueCacheFactory CreateCacheFactory(EventHubStreamCachePressureOptions options)
    {
        var eventHubPath = this.ehOptions.Path;
        var sharedDimensions = new EventHubMonitorAggregationDimensions(eventHubPath);
        return new CustomCacheFactory(options, this.evictionOptions, this.statisticOptions, this.serializationManager, sharedDimensions);
    }

    /// <summary>
    /// Custom cache adapter which converts EventData from the cache to CachedEventHubMessages to be kept in the cache
    /// </summary>
    private class CachedDataAdapter : EventHubDataAdapter, ICacheDataAdapter<EventData, CachedEventHubMessage>
    {
        public CachedDataAdapter(string partitionKey, IObjectPool<FixedSizeBuffer> bufferPool, SerializationManager serializationManager)
            : base(serializationManager, bufferPool)
        {
        }

        StreamPosition ICacheDataAdapter<EventData, CachedEventHubMessage>.GetStreamPosition(EventData queueMessage)
        {
            return new StreamPosition(new StreamIdentity(Guid.Empty, "messages"), new EventHubSequenceTokenV2(queueMessage.SystemProperties.Offset, queueMessage.SystemProperties.SequenceNumber, 0));
        }

        StreamPosition ICacheDataAdapter<EventData, CachedEventHubMessage>.QueueMessageToCachedMessage(ref CachedEventHubMessage cachedMessage, EventData queueMessage, DateTime dequeueTimeUtc)
        {
            // REPLACE - this not need to be modified, QueueMessageToCachedMessage use GetStreamPosition
            return base.QueueMessageToCachedMessage(ref cachedMessage, queueMessage, dequeueTimeUtc);
            // REPLACE - convert EventData to CachedEventHubMessage
        }
    }

    /// <summary>
    /// Custom Eventhub queue cache factory, which creates the queue cache using the CustomDataAdapter
    /// </summary>
    private class CustomCacheFactory : EventHubQueueCacheFactory
    {
        public CustomCacheFactory(
            EventHubStreamCachePressureOptions cacheOptions,
            StreamCacheEvictionOptions evictionOptions,
            StreamStatisticOptions statisticOptions,
            SerializationManager serializationManager, EventHubMonitorAggregationDimensions sharedDimensions,
            Func<EventHubCacheMonitorDimensions, ILoggerFactory, ITelemetryProducer, ICacheMonitor> cacheMonitorFactory = null,
            Func<EventHubBlockPoolMonitorDimensions, ILoggerFactory, ITelemetryProducer, IBlockPoolMonitor> blockPoolMonitorFactory = null)
            : base(cacheOptions, evictionOptions, statisticOptions, serializationManager, sharedDimensions, cacheMonitorFactory, blockPoolMonitorFactory)
        {
        }

        protected override IEventHubQueueCache CreateCache(string partition, StreamStatisticOptions statisticOptions, IStreamQueueCheckpointer<string> checkpointer,
                    ILoggerFactory loggerFactory, IObjectPool<FixedSizeBuffer> bufferPool, string blockPoolId, TimePurgePredicate timePurge,
                    SerializationManager serializationManager, EventHubMonitorAggregationDimensions sharedDimensions, ITelemetryProducer telemetryProducer)
        {
            var cacheMonitorDimensions = new EventHubCacheMonitorDimensions(sharedDimensions, partition, blockPoolId);
            ICacheMonitor cacheMonitor = this.CacheMonitorFactory(cacheMonitorDimensions, loggerFactory, telemetryProducer);
            ILogger cacheLogger = loggerFactory.CreateLogger($"{typeof(EventHubQueueCache).FullName}.{sharedDimensions.EventHubPath}.{partition}");
            ICacheDataAdapter<EventData, CachedEventHubMessage> dataAdapter = new CachedDataAdapter(partition, bufferPool, serializationManager);
            IEvictionStrategy<CachedEventHubMessage> evictionStrategy = new EventHubCacheEvictionStrategy(cacheLogger, timePurge, cacheMonitor, statisticOptions.StatisticMonitorWriteInterval);

            return new EventHubQueueCache(checkpointer, dataAdapter, EventHubDataComparer.Instance, cacheLogger, evictionStrategy, cacheMonitor, statisticOptions.StatisticMonitorWriteInterval);
        }
    }

    public static new IQueueAdapterFactory Create(IServiceProvider services, string name)
    {
        var factory = ActivatorUtilities.CreateInstance<CustomDataStreamAdapterFactory>(services, name);
        factory.Init();
        return factory;
    }
}`
jason-bragg commented 6 years ago

The guid of the stream id (set in the stream position in the adapter) must be unique per EventHub partition. A partition can have multiple streams, but a stream cannot come from multiple partitions. Using a hard coded empty id will put all of the events from all event hub partitions into a single stream, which won't work, or at least is unsupported. If you want a single stream per partition you can generate a unique stream id per partition, something like:

    public static Guid GetPartitionGuid(string partition)
    {
        byte[] bytes = Encoding.UTF8.GetBytes(partition);
        Array.Resize(ref bytes, 10);
        return new Guid(partition.GetHashCode(), bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7], bytes[8], bytes[9]);
    }

* above is an example, please don't use GetHashCode() for consistent id in production.

A unique stream per partition should, at least, allow the system to work, but it may still have performance issues because you'll have a single rather hot stream per partition. This may be ok, depending on you're processing patterns. We do have internal users that do this, so it's not necessarily a problem.

Also, if you're not overriding the QueueMessageToCachedMessage behavior may as well just not add the override.

darthkurak commented 6 years ago

Ok, but how i should subscribe to this "partitions" streams from one type of grain, to consume them all? Manually for each unique guides and subscribe to them? Is this will perform? Basically what i want, is having one type of grain, stateless worker, working concurrently and reading events (the best with batches) from eventhub and automatically scale out - in based on load on eventhub, across whole cluster. This grain will read events, it's system/user properties, and then write payload to some hdfs storage (blob, data lake).

darthkurak commented 6 years ago

I thought also about using GrainService sa EventHubProcessorHost, but if I understand correctly:

Requests made to Stateless Worker grains are always executed locally, that is on the same silo where the request originated,

that means, that all calls to stateless grains from host will occur on execution on the same silo, so there will be no auto-scaling to use other nodes/silo in cluster, which creates bottleneck. Im trying to achieve this from two days, and honestly wonder if it is not the best solution to just use Clusterclient as EventProcessorHost which just call stateless grain on cluster. Can you elaborate this three approaches? Which should be the best for scaling and performance? :) (ClusterClient, GrainService or Streams)

darthkurak commented 6 years ago

Also, another thing with custom Stream adaptor is that in EventData we have partitionKey, but we want to have partitionId to propertly translate it to streamGuid .

jason-bragg commented 6 years ago

If you use EventHub streams with the custom adapter you can process events from each hub in stateless workers via implicit subscriptions with the namespace "messages" (since that's the namespace used in your adapter). The result of this will be that the EventHub streams will spin up pulling agents for each eventhub partition and the agents will be distributed over the cluster. All the events from a partition will be processed by a grain on the same silo as the partition since the grains are stateless workers. This will result in a fairly efficient event processing system with the load balanced in the same proportions as the queues are balanced. If the default queue balancer does not balance the load as well as you'd like you can configure different queue balancers or even provide your own if necessary.

we want to have partitionId to propertly translate it to streamGuid .

You'll need to write some logic that consistently translates your partition key to a guid. Kinda like the logic GetPartitionGuid above. We do the same thing, but it's easier for us as our partition key is just a guid string.

Three approaches

darthkurak commented 6 years ago

@jason-bragg Big thanks for providing me guidance for this scenario. I'm closer to the right solution, but still, i have more questions:

Option 2: I'm afraid of one scenario - reading from eventhub by pulling agents is fairly quick, and we don't need to spread them out for whole cluster (for example, only for one-two silos), but then, grain which will process events and doing the main job - we want them to spread out for whole cluster. In above behavior, we cannot do that - pulling agent can't send message across silos. The same applies to Option 3. I see that only Option 1 can provide us fairly good load grain balacing for whole cluster, but then have one-two machines for EventHubProccesorHost only.

Option 3. I rather would say that implementation of it is fairly easy - just use EventProcessorHost, and launch this service on every silo which we wants. But we have the same problem - stateless worker will be running from the same silo as origin EventProcessorHost.

Option 1:

The only issue I see with this is that you'd need to address recoverability issues yourself.

That means that Streaming option provide some kind of recoverability, which i'm not aware. Can you elaborate this?

Also you writing about logic that consistently translates partition key to a guid - the problem is, that i don't want translate partition key - because in the specific scenario - i could not have it at all. What i want is queue for each partition - and partition is unique known by partitionId. But in StreamPosition ICacheDataAdapter<EventData, CachedEventHubMessage>.GetStreamPosition(EventData queueMessage) i don't have such a information. (from what partitionId that event comes) I managed to create some dumb logic which just count the GetStreamPosition calls and modulo them by eventHub partitionCount, but still, this is not what i would want in production code.

StreamPosition ICacheDataAdapter<EventData, CachedEventHubMessage>.GetStreamPosition(EventData queueMessage)
        {
            var g = GetPartitionGuid(_partitionIndex.ToString());
            var streamPosition = new StreamPosition(new StreamIdentity(g, "messages"), new EventHubSequenceTokenV2(queueMessage.SystemProperties.Offset, queueMessage.SystemProperties.SequenceNumber, 0));
            _partitionIndex++;
            _partitionIndex = _partitionIndex % PartitionCount;
            return streamPosition;
        }
darthkurak commented 6 years ago

Another thing is subscription itself. Have a look:

[StatelessWorker]
    [ImplicitStreamSubscription("messages")]
    public class ReadFromEventHubGrain : Grain, IReadFromEventHubGrain
    {
        public override async Task OnActivateAsync()
        {
            //Get one of the providers which we defined in config
            var streamProvider = GetStreamProvider("eventhub");
            //Get the reference to a stream
            var stream = streamProvider.GetStream<EventData>(GUID for that partition, "messages");
            //Set our OnNext method to the lambda which simply prints the data, this doesn't make new subscriptions
            await stream.SubscribeAsync(async (data, token) => Console.WriteLine(data.ToString()));

            await base.OnActivateAsync();
        }
    }

I have to subscribe for each partition separately (in for loop for example)? Or somehow i can subscribe for whole namespace?

darthkurak commented 6 years ago

Also, after I managed to receive messages via Stream, another problem occur: Serialization behavior doesn't support external messages from EventHub. The problem is:

private Body GetPayload() => payload ?? (payload = this.serializationManager.DeserializeFromByteArray<Body>(eventHubMessage.Payload))

Payload is a eventhub message send from external source, which does have completely different structure than:

[Serializable]
        private class Body
        {
            public List<object> Events { get; set; }
            public Dictionary<string, object> RequestContext { get; set; }
        }

Any suggestions how to workaround this?

jason-bragg commented 6 years ago

grain which will process events and doing the main job - we want them to spread out for whole cluster.

Then they'd need not be stateless workers (for that senario), as stateless workers are always local.

That means that Streaming option provide some kind of recoverability, which i'm not aware. Can you elaborate this?

Recoverable stream providers, like eventhub streams, support limited recoverability. If a consumer encounters transient issues processing the stream (say a short outage of storage, or a silo crash) a consumer can rewind the stream and reprocess data without effecting other consumers.

"the problem is, that i don't want translate partition key - because in the specific scenario - i could not have it at all. What i want is queue for each partition - and partition is unique known by partitionId"

If the partitionId is the eventhub partition id, then that information should be available in the data adapter. It's passed in as the partitinKey (poorly named) in the code you referced.

    public CachedDataAdapter(string partitionKey, IObjectPool<FixedSizeBuffer> bufferPool, SerializationManager serializationManager)
        : base(serializationManager, bufferPool)

"I have to subscribe for each partition separately (in for loop for example)? Or somehow i can subscribe for whole namespace?"

The stream id is the same as the grain ID when using implicit subscriptions. If there is a stream id per partition and a single grain subscribes to all stream id's then a single grain will be processing events from all partitions. Is that what you want?

" Payload is a eventhub message send from external source, which does have completely different structure from Body{}."

Correct. Good catch. I missed that in the list of changes needed to support external data. To address this you'll also need to override the EventHubDataAdapter.GetBatchContainer call.

  protected virtual IBatchContainer GetBatchContainer(EventHubMessage eventHubMessage)
  {
      return new EventHubBatchContainer(eventHubMessage, this.serializationManager);
  }

You'll need to either write your own batch container, or take the byte[] payload body, deserialize it. and pack it into a Body{} for the EventHubBatchContainer.

Alternatively, you can convert the bytes from the eventdata into a serialized body during the QueueMessageToCachedMessage call.

darthkurak commented 6 years ago

grain which will process events and doing the main job - we want them to spread out for whole cluster.

Then they'd need not be stateless workers (for that senario), as stateless workers are always local.

Then what should I use? If I use normal grains, with [reentrant] code will not execute concurrently - and i want to perform this grain as much as possible - there will be no state at all.

Recoverable stream providers, like eventhub streams, support limited recoverability. If a consumer encounters transient issues processing the stream (say a short outage of storage, or a silo crash) a consumer can rewind the stream and reprocess data without effecting other consumers.

How consumer can do that? Or it is happening out of the box by the runtime?

darthkurak commented 6 years ago

Another thing - can you confirm that if I use Option 1 with stateless worker - that will spread out across whole cluster?

jason-bragg commented 5 years ago

"there will be no state at all."

I'm confused about your usage patterns. In original post you mentioned accumulating state in the grain (like 4 meg) then performing periotic writes.

"How consumer can do that? Or it is happening out of the box by the runtime?"

For recoverablity, grains would need to persist the stream sequence token at every right, and upon errors reload and subscribe (or resume processing) the stream from that point. See grains used in our recovery tests for some examples. https://github.com/dotnet/orleans/blob/master/test/Grains/TestGrains/ImplicitSubscription_TransientError_RecoverableStream_CollectorGrain.cs https://github.com/dotnet/orleans/blob/master/test/Grains/TestGrains/ImplicitSubscription_NonTransientError_RecoverableStream_CollectorGrain.cs https://github.com/dotnet/orleans/blob/master/test/Tester/StreamingTests/ImplicitSubscritionRecoverableStreamTestRunner.cs

"can you confirm that if I use Option 1 with stateless worker - that will spread out across whole cluster?"

The load should be spread out over all of the active gateways, so if all of the silos in the cluster are configured with gateways the load should be distributed over the entire cluster.

darthkurak commented 5 years ago

Thanks for code reference. :)

I'm confused about your usage patterns. In original post you mentioned accumulating state in the grain (like 4 meg) then performing periotic writes.

Yes, you are right. The accumulating state was one from couple scenarios which we consider, and was put away for a while. Right now i want to implement simple flow, which allow me to read events from EventHub at batches (as partitionReciever/EventHubProcessorHost output them) and write that batches to the storage. And do this as quick as possible in auto-scale manner. Then when i will have this base, I'm going to figure out optimization for write numbers if it will be necessary at all.

Using your guidance i managed to consume EventHub using Streaming and custom adapter based on EventHub one in repo. I have one more issue - all events are read one by one by grain. BatchContainer contain one EventData from partitionReciever even if partitionReciever output to the pulling agent batch of them. Do you have idea is there any easy way to override this. Or i have to completely rewrite the EventHubAdapter?

jason-bragg commented 5 years ago

"all events are read one by one by grain. BatchContainer contain one EventData from partitionReciever even if partitionReciever output to the pulling agent batch of them"

Orleans streams is setup to handle multiple data streams. Events coming from eventhub are associated with a stream and delivered to all of the consumers of that stream one at a time. This is done to preserve ordering and recovery guarantees.

To have all events in a single eventhub read to be placed into a single batch you'd need to write your own IBatchContainer and IQueueAdapterReceiver, then override the IQueueAdapter.CreateReceiver behavior in the adapter.

For the pattern you described in the last post, the cluster client running an EventProcessorHost would probably be the most strait forward approach.

darthkurak commented 5 years ago

Thanks a lot for help. Finally i end up with EventHubHostProcessor as ClusterClient, which send messages to the Orleans Grain which writes those events to external storage (Blob). That close this topic. I have a lot more questions in other areas, but i will create appropriate issues-questions.

darthkurak commented 5 years ago

Hi. I have a couple questions more. I'm using EventHubStreamProvider modified as discussed before. I have grain like this:

[StatelessWorker]
    [ImplicitStreamSubscription("messages")]
    public class ReadFromEventHubGrain : Grain, IReadFromEventHubGrain
    {
        public override async Task OnActivateAsync()
        {
            var streamProvider = GetStreamProvider("eventhub");
            var stream = streamProvider.GetStream<EventHubMessage>(this.GetPrimaryKey(), "messages");

            IList<StreamSubscriptionHandle<EventHubMessage>> allMyHandles = await stream.GetAllSubscriptionHandles();

            foreach (var subscriptionHandle in allMyHandles)
            {
                await subscriptionHandle.ResumeAsync((data, token) => HandleSubscription(data, token));
            }

            await base.OnActivateAsync();
        }

        private Task HandleSubscription(EventHubMessage data, StreamSequenceToken token)
        {
            return Task.CompletedTask;
        }
    }
  1. How the checkpointing works with handling subscription? If it is done per each HandleSubscription? What happens when exception occur? What happens when silo crush? How this works when we have multiple consumers? Is there a way to manually checkpoint from grain handler? How ResumeAsync works when we provide sequence token?

Let's take example: We have to grains:

In both scenario is unclear for me what will be default behavior. It's even more difficult to guess when we consider this both scenario together as two subscribers of stream. When checkpoint will happen, how this is correlated with handling subscriptions and sequenceToken.

It would be nice to get here a nice explanation and guide :)

  1. I noticed that grain activation for attached grain above have more than 32 activations which is number of my eventhub partitions. That is weired. I though that to preserve ordering, there can be only 32 max activation. Do you know why I see more than 32?

@jason-bragg Can you help here? :)

jason-bragg commented 5 years ago

I suggest looking at https://github.com/dotnet/orleans/blob/master/test/Grains/TestGrains/ImplicitSubscription_RecoverableStream_CollectorGrain.cs and the other grains used in our recoverability tests https://github.com/dotnet/orleans/blob/master/test/Tester/StreamingTests/ImplicitSubscritionRecoverableStreamTestRunner.cs

The logic used in those grains mirror production code used by other services in MSFT and the faults we inject mimic various points of failure that services have encountered.

First, each grain needs to store the sequence token along with any aggregated state (not necessarily using grain storage, any storage is sufficient). In ImplicitSubscription_RecoverableStream_CollectorGrain.cs the StreamCheckpoint\<> state object does this, as it has the accumulated state as well as the sequence token from the first event in the stream it received and the last sequence token it processed. These sequence tokens are used for recoverability and event de-duplication.

Next, the accumulated state and stream tokens need be persisted periodically. The checkpoint period should be often enough for the messages that have been processed to still be in the stream cache. By default the message cache keeps 30 minutes of data (see StreamCacheEvictionOptions.DataMaxAgeInCache). In the ImplicitSubscription_RecoverableStream_CollectorGrain.cs we persist the state every 10 events, but that's probably not ok for most systems as the cache is time based.

Upon failure, either due to silo outage or storage failure (in which we usually deactivate the grain, and let the stream redelivery reactivate it) the last written state is used and stream processing is resumed using it's persisted sequence token.

Do you know why I see more than 32? By default, stateless workers can have more than one instance, unlike normal grains. To prevent this you need attribute the grain with [StatelessWorker(1)]

When handling stream processing errors, you need consider both transient errors, like temporary storage outages and non-transient errors like poison messages. For those sorts of errors please look at https://github.com/dotnet/orleans/blob/master/test/Grains/TestGrains/ImplicitSubscription_TransientError_RecoverableStream_CollectorGrain.cs and https://github.com/dotnet/orleans/blob/master/test/Grains/TestGrains/ImplicitSubscription_NonTransientError_RecoverableStream_CollectorGrain.cs

I understand that there is quite a bit of info here and our documentation around how to recover from stream failures is lacking. Please review the above and come back with any questions. I'm reopening this issue until we work through any issues you have.

darthkurak commented 5 years ago

Indeed there is a lot information here, but that's great! One thing is still not unclear for me. How internal checkpointing (on Azure Storage) is correlated with grain procceed and SequenceToken. Messages are read from EventHub and put to internal Cache independently. Then subscribers are read from stream from that cache. But when internal check-pointing happen? And with what offset and sequence number? Last processed with success message from cache? By all subscribers? On some time-basis?

jason-bragg commented 5 years ago

By internal checkpoint, I assume you mean the per-partition checkpointing performed by eventhub streams. Those checkpoints are written periodically (see AzureTableStreamCheckpointerOptions.PersistInterval) and store the sequenceId of the oldest event in the cache. This way, when a pulling agent starts due to silo failure or rebalancing, it rebuilds the cache, including older data which has not been purged. Data is purged from the cache based on age (see StreamCacheEvictionOptions.DataMaxAgeInCache). The age in the cache refers to the delta in event generation time of the events in the cache not the actual ammount of time the events are in the cache.

If a consuming grain attempts to subscribe using a sequence token for events that have been purged from the cache (are over 30 minutes old), the consumer will recieve a DataNotAvailableException in it's OnErrorAsync handler. The system does not support unlimited recovery, only recovery in a timely manner. Basically the consumer must recover within StreamCacheEvictionOptions.DataMaxAgeInCach. There are ways to block reading from a partition until all consumers have recovered to prevent DataNotAvailableExceptions, but that requires more customization of the provider and is not the way the system works out of the box.

darthkurak commented 5 years ago

Thanks for explanation. From above means that if within 30 minutes, consumer will not process message - that message will be lost for him. So, i wonder what should happen with non-transient errors, like storage outage for longer than 30 minutes.

When handling stream processing errors, you need consider both transient errors, like temporary storage outages and non-transient errors like poison messages. For those sorts of errors please look at https://github.com/dotnet/orleans/blob/master/test/Grains/TestGrains/ImplicitSubscription_TransientError_RecoverableStream_CollectorGrain.cs and https://github.com/dotnet/orleans/blob/master/test/Grains/TestGrains/ImplicitSubscription_NonTransientError_RecoverableStream_CollectorGrain.cs

From that links i don't see any difference between handling TransientError and NonTransient - logic of handling SequenceToken is the same - it suggest that orleans will try deliver message forever (until it pass or cache clear it). Is that correct? If so - i believe that we need here some additional solutions to handle non-transient errors, and i see a couple of cases:

darthkurak commented 5 years ago

I have another question: What is expected behavior in multi-silo scenario? For example We have one silo: it is attached to 32 partitions, we have consumers, everything work. We decide add second silo to unload first machine. What happens here? How two silo communicate with each other to not provide duplication of events and so on? Is orealns EventHubStreamProvider has some lease-based mechanism like EventHubProcessorHost? Second machine will steal partitions from other in safe way? What with items in internal cache? There could be a situation where we have duplicated data, because one machine has cache, and checkpoint, second machine steal that partition, start from checkpoint, and then both have the same data and process them? Or first one clear the cache on stealing? How this works? :)

Also, I did change to StatelessWorker(1) and in one-silo scenario i have 32 activations. When i add second silo, I see more than 32 activations in summary. First silo still have 32 activations, and then second one have around 20. I would expect 32 activations in summary, evenly balanced across two silo. Any ideas why it is not a true?

Another thing is - why StatelessWorker needs (1)? That means that StreamProvider activate more than one for single queue? And they read it independently? Or it somehow smart and pass to them different subset of batch?

jason-bragg commented 5 years ago

So, i wonder what should happen with non-transient errors

If the pulling agent received and error while delivering a message to a consumer (usually thrown from OnNextAsync) it will retry delivery up to a configured period of time (see StreamPullingAgentOptions.MaxEventDeliveryTime). If the message were not successfully delivered in this period of time the pulling agent will notify the application layer by calling the OnErrorAysnc on the consumer and the OnDeliveryFailure on the IStreamFailureHandler. After notifying the application layer of the failure, the agent will skip the event and move to the next in the stream. This does not mean the data is lost yet, as the consumer can rewind the stream if it wants, until the data is purged from the cache, at which point the data will be lost.

From that links i don't see any difference between handling TransientError and NonTransient

The main difference between those two tests is how the fault is cleared. The non-transient test does not clear the fault until the agent gives up on delivering the event and calls the grain's OnErrorAsync.

i believe that we need here some additional solutions to handle non-transient errors, and i see a couple of cases:

This really depends on your business needs. I advocate you start with simply monitoring the occurrence to assess how often it happens. There is a risk of overengineering here, but again, this depends on your business needs, which I’m quite ignorant of.

What is expected behavior in multi-silo scenario?

Partitions will be distributed across the cluster. If you have 5 silos in the cluster and 32 partitions, the partitions will be distributed over the 5 silos. There are various algorithms for this which can be configured, including a lease-based solution (see IStreamQueueBalancer and its implementations). There is risk of duplicate data being due to partitions being moved (as well as other causes) so consumers may need perform de-duplication via sequence token).

When i add second silo, I see more than 32 activations in summary. First silo still have 32 activations, and then second one have around 20.

This is likely because the grains on the first silo which were processing the queues that got moved have not deactivated yet. They are likely idle and will be deactivated after the configured period (see GrainCollectionOptions.DeactivationTimeout)

why StatelessWorker needs (1)?

Stateless workers are assumed to have no state, so by default there can be more than one of them per silo. [StatelessWorker(1)] indicates that there can be only one per silo, which prevents duplicate activations. This has nothing to do with streams, rather, it’s due to the fact that we’re using stateless workers in an unconventional way by maintaining state within them.

darthkurak commented 5 years ago

Thank you very much for all your answers. I played a little more with solutions, and here are a new ones:

  1. After notifying the application layer of the failure, the agent will skip the event and move to the next in the stream.

So, basically, if we doesn't deactivate grain on OnErrorAysnc , that grain will have this event skipped and receive the next one?

  1. The non-transient test does not clear the fault until the agent gives up on delivering the event and calls the grain's OnErrorAsync.

What about a scenario: We have de-duplicate logic. We receive event, set sequenceToken, doesn't save it, further operation failed due to transient error, agent will retry? If i understand correctly, agent will re-try on current activation which has sequenceToken set in memory in previously try, and event will be ignored due to de-duplication. But that not make sense. Obviously im missing something here. Can you help me here? :)

  1. What is the reason of backpressure when internal cache is populated with data regardless of subscribers count? I observe that memory of my machine is eaten quickly even if don't have any subscribers.

  2. Internal cache has fixed-time size. In high throughputs this mean, that i have a lower DataMaxAgeInCache to fit into machine memory size, but in low throughput i have to rise it. I have to do that manually. I can't scale out this issue. Only scale in increasing memory of my machines what is for me out of the scope. This seems for me to be a real obstacle to have fully auto-scale solution. I'm missing something? Is this real or fantasy? :D

jason-bragg commented 5 years ago

So, basically, if we doesn't deactivate grain on OnErrorAysnc , that grain will have this event skipped and receive the next one?

If the consumer does not rewind the stream, the event will be lost. Correct. One can directly rewind the stream by calling the ResumeAsync on the stream handle. A full deactivation/reactivation is not necessary, we just tend to use that pattern for simplicity and clarity.

agent will re-try on current activation which has sequenceToken set in memory in previously try, and event will be ignored due to de-duplication.

The sequence token should not be updated until after a successful processing of the event. In our test code, we update it before the storage call because we deactivate the grain on storage failure, which removes the in memory token, but if you're not deactivating the grain, then one should not use the new sequence token until the storage call is successful (or reset it to old sequence token if storage fails).

What is the reason of backpressure when internal cache is populated with data regardless of subscribers count?

Back pressure ensures that we're not reading faster than consumers can process the messages. If there are no consumers then there is nothing to slow the reading, so in that case, backpressure makes little sense, but that's not understood to be a common case. System isn't optimized around streaming without consumers, as that pattern had no foreseen utility.

Internal cache has fixed-time size. In high throughputs this mean, that i have a lower DataMaxAgeInCache to fit into machine memory size, but in low throughput i have to rise it. I have to do that manually.

Yes. DataMaxAgeInCache needs be low enough for the cache to fit within memory under peak load. Default eviction strategy does not support a dynamic time window, or fixed memory size. A custom eviction strategy can be used that purges the cache in a manner tuned more to your service’s needs, but the default has a fixed time window. If you'd rather not maintain a custom eviction strategy you can attempt to update the default to support a dynamic window or add support for a memory-based eviction strategy, rather than a time based one. We'd welcome pull requests for either change.

darthkurak commented 5 years ago

Another questions: If internal cache has DataMaxAgeInCache window and pulling agent save checkpoint on pure cache with last item - that means, that first checkpoint will not happen until DataMaxAgeInCache pass? Or Im missing something?

In case of backpressure - by slowing reading, you mean - holding filling the internal cache? What is the point of that?

darthkurak commented 5 years ago

And more: 64k partition keys, 32 partitions, one grain attached to one stream namespace with PreferLocalPlacement. I tried multi-silo scenario (2 exactly). First, i launched one, observe how grains perform - there wasn't any errors, performance degrees when Orleans create more and more streams up to 64k. Then i launched second silo. After a while i observed a lot errors: Item Not Found in Cache. Also, grain activations doesn't redistribute evenly for two silos. First one still has majority of activations. This errors happen on two silos. Im using SequenceToken recover implementation similar to those in provided above tests. It seems, like RecoveryToken loaded from Grain State, doesn't exist in cache on silo which steal that partition. That's a huge problem. Can you guide me here?

Ps. Im using LeasedBaseQueueBalancer (on Azure Blob) Ps2. Another error which can happen during this operation is: Azure Blob: “The condition specified using HTTP conditional header(s) is not met”

darthkurak commented 5 years ago

Next one: How to handle "Item Not Found in Cache."? Or this is done out of the box? - if StreamProvider occur this exception on subscription - it will retry forever? Or in some point starts ingore passed SequenceToken and start reads events as it would be null?

darthkurak commented 5 years ago

@jason-bragg ? :)

darthkurak commented 5 years ago

I want to summary my journey so far. I played a little with consuming those events from eventhub and still can't figure out the best setup. This post will be a long one, but i want to introduce all setups which i tried and their flaws.

Background: I'm creating the IoT solution. EventHub/IoTHub are ingesting queues here. I have multiple devices/clients writing to eventhub, partitioned by their id. (so, each device is only on one EventHub partition). I want to check, if Orleans is good solution here to use it for building whole processing and business layer. The goal throughput which i want to achieve is around 60K requests per second. (1kB messages). In the end we want to have possibility to process messages in device context.

So, we have EventHub. I created EventHubAdapter with suggestion in discussion above to read from it external messages.

And here is first factor: How we map messages to streams.

1) Per partition 2) Per device 3) ??

Going further. How grains implementation can looks like using those two different approaches:

1) Per partition, we can use: 1a. normal grain (there will be 32 activation) and process messages with them, switching device context 1b. normal grain (reader) (32) and normal device grain (processor) (as many as devices) 1c. stalessworker(1) (32) and process messages with them, switching device context 1d. statlessworker(1) (32) and normal device grain (processor) (as many as devices) 1e. statlessworker(1) (32) and statlessworker(1) device grain

2) Per device 2a. normal grain(as many as devices) - also having the same number of streams 2b. statlessworker(1) (as many as devices) - also having the same number of streams

going one by one:

1a - it seems to be a best option if we are talking about throughput performance, but has a few flaws:

1b - noticeable downgrade in throughput performance to mentioned above, however we use grain separation (cleaner code), other flaws still remain

1c - this fix scaling out machines, but it seems more like "hack" than expected usage

1d - noticeable downgrade, however we use grain separation (cleaner code), other flaws still remain, issue with scaling out those device grain activations

1e - haven't test it yet, but seems to be all way "hacky"

I tried also trick with DeactivateOnIdle in device grain, but this downgrade performance so much, that is out of the scope

From mentioned above implementations 1c seems to have the best performance, but i have to switch device context and hold it's states in memory on my own. Im not gaining anything from Orleans in that way.

2a and 2b - both have problem with so much streams flying around (as many as devices) and i see huge throttling around 32k streams. I need process 2M. 2a have scaling out issues with grain staying actives. 2b fix this, but then i have so much streams and activations to be handled when scaling out cluster, that i almost always getting a lot errors in that transitions.

2b is the winner here.

From both forks (stream mapping per partition or per device) i would choose 1c, but this mean, that im not gaining anything from orleans. And also have limit in scaling out.

I have feeling that the best option would be something between 1 and 2. Don't map per partition, neither device, but something like partition*(cores in machine) and process them directly in grain which consumes those streams, switching device context on the fly.

Im not happy with that. I assume, that im struggling with preparing a good implementation here and need further guidance. If not, a confirmation that i can't achieve here more, would be appreciate. Can you help me? How would you model this kind of processing?

jason-bragg commented 5 years ago

@DarkthKurak, I apologize for the delayed reply. Thanks for pinging the thread.

first checkpoint will not happen until DataMaxAgeInCache pass?

If there is no partition checkpoint, the partition reader will write a partition checkpoint using the offset of the first message it read from the hub. By default, without a pre-existing checkpoint the receiver will start reading from the time the receiver is initialized. It can be configured to read from the start of the partition instead by configuring EventHubReceiverOptions. StartFromNow to false.

In case of backpressure - by slowing reading, you mean - holding filling the internal cache? What is the point of that?

This prevents the reading thread which fills the cache from reading data faster than it can be delivered to consumers. For instance, if consumers take 10ms to process an event (processing 100 events per second) and the receiver can read 1000 events/second from a partition, then the system would read then purge data before the grain can process the events. Instead, back pressure detects that the grains are not keeping up and will slow down the read rate.

grain activations doesn't redistribute evenly for two silos.

If you’re using stateless worker, the grain on the first silo which were processing stream from a partition that was moved to the second silo will become idle, but not go away until they are collected, by default, 2 hours. See GrainCollectionOptions.CollectionAge

It seems, like RecoveryToken loaded from Grain State, doesn't exist in cache on silo which steal that partition.

How often are the grains checkpointing? They need save their state more frequently than the cached age, by default 30 minutes.

How to handle "Item Not Found in Cache."?

By default, when the streaming infrastructure can’t deliver an item because it’s not in the cache, it will notify the application layer via the OnErrorAsync and IStreamFailureHandler, which is likely the error you’re seeing, then move the cursor to the front of the cache (the most recent data) and start delivering data from there. The application layer can unsubscribe, or resume processing from a different point in the stream if it likes, but it can’t, out of the box, rewind the stream further back, so the data is lost unless the application rereads it from the partition or resets the partition checkpoints.

sergeybykov commented 5 years ago

@darthkurak

2a and 2b - both have problem with so much streams flying around (as many as devices) and i see huge throttling around 32k streams. I need process 2M. 2a have scaling out issues with grain staying actives. 2b fix this, but then i have so much streams and activations to be handled when scaling out cluster, that i almost always getting a lot errors in that transitions.

Are you using implicit or explicit stream subscriptions here? Have you tried to leverage [PreferLocalPlacement] attribute for locality?

darthkurak commented 5 years ago

If there is no partition checkpoint, the partition reader will write a partition checkpoint using the offset of the first message it read from the hub. By default, without a pre-existing checkpoint the receiver will start reading from the time the receiver is initialized. It can be configured to read from the start of the partition instead by configuring EventHubReceiverOptions. StartFromNow to false.

And if there is a partition checkpoint? Next checkpoint will be written after first purge, which is after CollectionAge?

Are you using implicit or explicit stream subscriptions here? Have you tried to leverage [PreferLocalPlacement] attribute for locality?

Im using everywhere the implicitSubscription. I have tried [PreferLocalPlacement] but it is really slow, and still, main portion of activations remains on first silo. [StatlesWorker(1)] has a lot better results - half activations are moved to new silo, and then old ones on first silo, are deactivated after idle time.

But the problem is overall - how to construct architecture in Orleans to read with really high throughput messages form EventHub and process them in the context of device/client. After playing with this a lot of time, it seems, that Orleans gives the flexibility of Actor-based model compromising the throughput performance of processing this in not actor-based model, but task/thread model instead. But i want to believe that this sad conclusion is effect of my bad implementation. I want to remind the exact use case: IoT Platform. For example, 2 Milions devices sending messages to EventHub, where we have in overall 30k messages per second. That give us around 1 message per minute from device. We want to process those messages in Orleans in device context. Have nice auto-scale solution, where i can add easily new silos, or take them down to reduce costs and adjust to current throughput. Scaling can't loose events. Unexpected shutdown also. It has to be at least once delivery processing, where in final storage, per device, i can de-duplicate results (not allow to write the duplicated one).

If i think about solution without Orleans, I would create two services:

  1. One using EventHubProcessorHost to read events from EventHub in batches, then send each message to second service (DeviceService) in parallel from those batch and async wait for all result, checkpoint eventhub after success - there would be max 32 instances of it (as partitions numbers). When machine die, we only always reprocessing those whole batch, which wasn't check pointed.
  2. Second service DeviceService - process message, switching the context between different devices, write result if needed to device-storage, de-duplicate on storage level. - we can scale this independently, probably have machinesCount*machineCores instances.

Any suggestions how to improve already mentioned solutions in Orleans? Or maybe completely a new way to do that? What would you suggest?

jason-bragg commented 5 years ago

And if there is a partition checkpoint? Next checkpoint will be written after first purge, which is after CollectionAge?

Yes, that or AzureTableStreamCheckpointerOptions.PersistInterval, whichever is longer. When cache is fully populated, events tend to be purged from cache about as often as new messages are ingested, which is more frequent than PersistInterval, so persist interval tends to be used.

I have tried [PreferLocalPlacement] but it is really slow, and still, main portion of activations remains on first silo.

PreferLocalPlacement should only be slower for initial activation in most cases, but when partitions are moved, the grains are not moved with them. So if you start a full cluster (say 4 silos), it should be comparable to stateless workers. However, if a service starts processing on a single silo, then scale up to 4, only new grains will be activated on the 3 added silos. This is due to the fact that scaling up a cluster does not automatically balance existing grains activations across the new silos. Since stateless workers are always local, new grains are spun up on the new silo, I add this clarification not to dismiss the concern, only to help clarify what's causing the performance loss. If cluster size does not change often and streams are relatively short lived, this shouldn't be a problem, but if cluster is configured to auto scale, we may need to optimize this scenario.

how to construct architecture in Orleans to read with really high throughput messages from EventHub and process them in the context of device/client

For a frame of reference, another similar internal service using Orleans and EventHub handles 900k+ events per second, reading from multiple hubs with partition counts ranging from 4 to 250, so I'm confident Orleans -can- address those requirements, but whether there are simpler or more performant solutions is less clear. Let's setup a meeting and walk through the design of your service in person (skype?).

darthkurak commented 5 years ago

Thanks for great explanation. I would love to have call/meeting with you and discuss the exact usage and design and how to improve it/fix. What day and time would fit for you? I'm from Poland, so, the time difference is huge, but i can be available whole week from 19-23 CET time.

darthkurak commented 5 years ago

@jason-bragg ? :)

darthkurak commented 5 years ago

@jason-bragg Small remind :)

jason-bragg commented 5 years ago

@darthkurak, I apologize for the delayed reply, was out for holidays. I'm getting caught back up now. Can we setup something for Thursday? Let's set this up over email, you should be able to reach me at jbragg@microsoft.com.

ifle commented 4 years ago

@darthkurak This thread is old, but.... can you please share the summary of your solution?. We are planning to use the eventhub for application events and messages. Thanks.

rafikiassumani-msft commented 2 years ago

@ifle since we have not had any activity on this thread for the past 1 year, we will close it for housekeeping purpose. Please use the following issue for further update: https://github.com/dotnet/orleans/issues/4621

ghost commented 2 years ago

Thanks for contacting us. We believe that the question you've raised has been answered. If you still feel a need to continue the discussion, feel free to reopen the issue and add your comments.