dotnet / orleans

Cloud Native application framework for .NET
https://docs.microsoft.com/dotnet/orleans
MIT License
10.04k stars 2.02k forks source link

Stream data adapter #4621

Open jason-bragg opened 6 years ago

jason-bragg commented 6 years ago

Custom handling of the conversion of application data to and from the underlying queue is a common concern for users of Orleans persistent streams. There are many reasons for this:

Some stream providers, like azure queue, memory, and eventhub, provide some support for custom data conversion, but there is no common pattern and some patterns are quite painful.

Given that every internal service using Orleans persistent streams has had to customize their handling of queue data for one of the above reasons, and in every case it’s made the use of Orleans persistent stream significantly more complicated, I am of the opinion that addressing this should be a fairly high priority.

jason-bragg commented 6 years ago

This is an example of what one would need to do to customize how data is written to and read from eventhub using eventhub streams.

The areas marked REPLACE would need be replaced with application specific code.

public class CustomDataStreamAdapterFactory : EventHubAdapterFactory
{
    // All these are already set in base class, but not accessible here.  
    //  So we need to duplicate them.
    private readonly EventHubOptions ehOptions;
    private readonly StreamCacheEvictionOptions evictionOptions;
    private readonly StreamStatisticOptions statisticOptions;
    private readonly SerializationManager serializationManager;
    private EventHubClient client;

    public CustomDataStreamAdapterFactory(string name, IOptionsSnapshot<EventHubOptions> ehOptions, IOptionsSnapshot<EventHubReceiverOptions> receiverOptions,
        IOptionsSnapshot<EventHubStreamCachePressureOptions> cacheOptions, IOptionsSnapshot<StreamCacheEvictionOptions> evictionOptions, IOptionsSnapshot<StreamStatisticOptions> statisticOptions, 
        IServiceProvider serviceProvider, SerializationManager serializationManager, ITelemetryProducer telemetryProducer, ILoggerFactory loggerFactory)
        : base(name, ehOptions.Get(name), receiverOptions.Get(name), cacheOptions.Get(name), evictionOptions.Get(name), statisticOptions.Get(name), serviceProvider, serializationManager, telemetryProducer, loggerFactory)
    {
        this.ehOptions = ehOptions.Get(name);
        this.evictionOptions = evictionOptions.Get(name);
        this.statisticOptions = statisticOptions.Get(name);
        this.serializationManager = serializationManager;
    }

    // need to write custom eventhub client init code, because base class client is private, 
    // and client is needed to write events in QueueMessageBatchAsync.
    protected override void InitEventHubClient()
    {
        var connectionStringBuilder = new EventHubsConnectionStringBuilder(this.ehOptions.ConnectionString)
        {
            EntityPath = this.ehOptions.Path
        };
        this.client = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
    }

    // need to override this to change the way data is written to eventhub, rather than just having
    // a component that handles that like we do on the read size (cache data adapter)
    public override Task QueueMessageBatchAsync<T>(Guid streamGuid, string streamNamespace, IEnumerable<T> events, StreamSequenceToken token,
        Dictionary<string, object> requestContext)
    {
        if (token != null)
        {
            throw new NotImplementedException("EventHub stream provider currently does not support non-null StreamSequenceToken.");
        }
        // REPLACE - Encode event data as you need 
        EventData eventData = EventHubBatchContainer.ToEventData(this.SerializationManager, streamGuid, streamNamespace, events, requestContext);
        // REPLACE - Encode event data as you need 

        return this.client.SendAsync(eventData, streamGuid.ToString());
    }

    // We can customize the cache without writing an entire queue adapter factory, but no point as
    // we need to customize the adapter factory anyway for the QueueMessageBatchAsync call, so
    // may as well customize the cache here rather than via the configurator
    protected override IEventHubQueueCacheFactory CreateCacheFactory(EventHubStreamCachePressureOptions options)
    {
        var eventHubPath = this.ehOptions.Path;
        var sharedDimensions = new EventHubMonitorAggregationDimensions(eventHubPath);
        return new CustomCacheFactory(options, this.evictionOptions, this.statisticOptions, this.serializationManager, sharedDimensions);
    }

    /// <summary>
    /// Custom cache adapter which converts EventData from the cache to CachedEventHubMessages to be kept in the cache
    /// </summary>
    private class CachedDataAdapter : EventHubDataAdapter, ICacheDataAdapter<EventData, CachedEventHubMessage>
    {
        public CachedDataAdapter(string partitionKey, IObjectPool<FixedSizeBuffer> bufferPool, SerializationManager serializationManager)
            : base(serializationManager, bufferPool)
        {
        }

        StreamPosition ICacheDataAdapter<EventData, CachedEventHubMessage>.GetStreamPosition(EventData queueMessage)
        {
            // REPLACE - get stream position from EventData
            return base.GetStreamPosition(queueMessage);
            // REPLACE - get stream position from EventData
        }

        StreamPosition ICacheDataAdapter<EventData, CachedEventHubMessage>.QueueMessageToCachedMessage(ref CachedEventHubMessage cachedMessage, EventData queueMessage, DateTime dequeueTimeUtc)
        {
            // REPLACE - convert EventData to CachedEventHubMessage
            return base.QueueMessageToCachedMessage(ref cachedMessage, queueMessage, dequeueTimeUtc);
            // REPLACE - convert EventData to CachedEventHubMessage
        }
    }

    /// <summary>
    /// Custom Eventhub queue cache factory, which creates the queue cache using the CustomDataAdapter
    /// </summary>
    private class CustomCacheFactory : EventHubQueueCacheFactory
    {
        public CustomCacheFactory(
            EventHubStreamCachePressureOptions cacheOptions,
            StreamCacheEvictionOptions evictionOptions,
            StreamStatisticOptions statisticOptions,
            SerializationManager serializationManager, EventHubMonitorAggregationDimensions sharedDimensions,
            Func<EventHubCacheMonitorDimensions, ILoggerFactory, ITelemetryProducer, ICacheMonitor> cacheMonitorFactory = null,
            Func<EventHubBlockPoolMonitorDimensions, ILoggerFactory, ITelemetryProducer, IBlockPoolMonitor> blockPoolMonitorFactory = null)
            : base(cacheOptions, evictionOptions, statisticOptions, serializationManager, sharedDimensions, cacheMonitorFactory, blockPoolMonitorFactory)
        {
        }

        protected override IEventHubQueueCache CreateCache(string partition, StreamStatisticOptions statisticOptions, IStreamQueueCheckpointer<string> checkpointer,
                    ILoggerFactory loggerFactory, IObjectPool<FixedSizeBuffer> bufferPool, string blockPoolId, TimePurgePredicate timePurge,
                    SerializationManager serializationManager, EventHubMonitorAggregationDimensions sharedDimensions, ITelemetryProducer telemetryProducer)
        {
            var cacheMonitorDimensions = new EventHubCacheMonitorDimensions(sharedDimensions, partition, blockPoolId);
            ICacheMonitor cacheMonitor = this.CacheMonitorFactory(cacheMonitorDimensions, loggerFactory, telemetryProducer);
            ILogger cacheLogger = loggerFactory.CreateLogger($"{typeof(EventHubQueueCache).FullName}.{sharedDimensions.EventHubPath}.{partition}");
            ICacheDataAdapter<EventData, CachedEventHubMessage> dataAdapter = new CachedDataAdapter(partition, bufferPool, serializationManager);
            IEvictionStrategy<CachedEventHubMessage> evictionStrategy = new EventHubCacheEvictionStrategy(cacheLogger, timePurge, cacheMonitor, statisticOptions.StatisticMonitorWriteInterval);

            return new EventHubQueueCache(checkpointer, dataAdapter, EventHubDataComparer.Instance, cacheLogger, evictionStrategy, cacheMonitor, statisticOptions.StatisticMonitorWriteInterval);
        }
    }

    public static new IQueueAdapterFactory Create(IServiceProvider services, string name)
    {
        var factory = ActivatorUtilities.CreateInstance<CustomDataStreamAdapterFactory>(services, name);
        factory.Init();
        return factory;
    }
}

Once the above adapter factory is implemented, users would then need to configure their services and clients to use the custom adapter, something like:

Silo:

hostBuilder
    .AddPersistentStreams(StreamProviderName, CustomDataStreamAdapterFactory.Create, configurator => configurator
        .Configure<EventHubOptions>(ob => ob.Configure(options =>
        {
            options.ConnectionString = TestDefaultConfiguration.EventHubConnectionString;
            options.ConsumerGroup = EHConsumerGroup;
            options.Path = EHPath;
        }))
        .UseStaticClusterConfigDeploymentBalancer()
        .ConfigureComponent<AzureTableStreamCheckpointerOptions, IStreamQueueCheckpointerFactory>(EventHubCheckpointerFactory.CreateFactory, builder => builder
            .Configure(options =>
            {
                options.ConnectionString = TestDefaultConfiguration.DataConnectionString;
            })));

Client:

clientBuilder
    .AddPersistentStreams(StreamProviderName, CustomDataStreamAdapterFactory.Create, b=>b
        .Configure<EventHubOptions>(ob=>ob.Configure( options =>
        {
            options.ConnectionString = TestDefaultConfiguration.EventHubConnectionString;
            options.ConsumerGroup = EHConsumerGroup;
            options.Path = EHPath;
        })));

Also, This is awful.

iannovic commented 3 years ago

This is an example of what one would need to do to customize how data is written to and read from eventhub using eventhub streams.

The areas marked REPLACE would need be replaced with application specific code.

public class CustomDataStreamAdapterFactory : EventHubAdapterFactory
{
    // All these are already set in base class, but not accessible here.  
    //  So we need to duplicate them.
    private readonly EventHubOptions ehOptions;
    private readonly StreamCacheEvictionOptions evictionOptions;
    private readonly StreamStatisticOptions statisticOptions;
    private readonly SerializationManager serializationManager;
    private EventHubClient client;

    public CustomDataStreamAdapterFactory(string name, IOptionsSnapshot<EventHubOptions> ehOptions, IOptionsSnapshot<EventHubReceiverOptions> receiverOptions,
        IOptionsSnapshot<EventHubStreamCachePressureOptions> cacheOptions, IOptionsSnapshot<StreamCacheEvictionOptions> evictionOptions, IOptionsSnapshot<StreamStatisticOptions> statisticOptions, 
        IServiceProvider serviceProvider, SerializationManager serializationManager, ITelemetryProducer telemetryProducer, ILoggerFactory loggerFactory)
        : base(name, ehOptions.Get(name), receiverOptions.Get(name), cacheOptions.Get(name), evictionOptions.Get(name), statisticOptions.Get(name), serviceProvider, serializationManager, telemetryProducer, loggerFactory)
    {
        this.ehOptions = ehOptions.Get(name);
        this.evictionOptions = evictionOptions.Get(name);
        this.statisticOptions = statisticOptions.Get(name);
        this.serializationManager = serializationManager;
    }

    // need to write custom eventhub client init code, because base class client is private, 
    // and client is needed to write events in QueueMessageBatchAsync.
    protected override void InitEventHubClient()
    {
        var connectionStringBuilder = new EventHubsConnectionStringBuilder(this.ehOptions.ConnectionString)
        {
            EntityPath = this.ehOptions.Path
        };
        this.client = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
    }

    // need to override this to change the way data is written to eventhub, rather than just having
    // a component that handles that like we do on the read size (cache data adapter)
    public override Task QueueMessageBatchAsync<T>(Guid streamGuid, string streamNamespace, IEnumerable<T> events, StreamSequenceToken token,
        Dictionary<string, object> requestContext)
    {
        if (token != null)
        {
            throw new NotImplementedException("EventHub stream provider currently does not support non-null StreamSequenceToken.");
        }
        // REPLACE - Encode event data as you need 
        EventData eventData = EventHubBatchContainer.ToEventData(this.SerializationManager, streamGuid, streamNamespace, events, requestContext);
        // REPLACE - Encode event data as you need 

        return this.client.SendAsync(eventData, streamGuid.ToString());
    }

    // We can customize the cache without writing an entire queue adapter factory, but no point as
    // we need to customize the adapter factory anyway for the QueueMessageBatchAsync call, so
    // may as well customize the cache here rather than via the configurator
    protected override IEventHubQueueCacheFactory CreateCacheFactory(EventHubStreamCachePressureOptions options)
    {
        var eventHubPath = this.ehOptions.Path;
        var sharedDimensions = new EventHubMonitorAggregationDimensions(eventHubPath);
        return new CustomCacheFactory(options, this.evictionOptions, this.statisticOptions, this.serializationManager, sharedDimensions);
    }

    /// <summary>
    /// Custom cache adapter which converts EventData from the cache to CachedEventHubMessages to be kept in the cache
    /// </summary>
    private class CachedDataAdapter : EventHubDataAdapter, ICacheDataAdapter<EventData, CachedEventHubMessage>
    {
        public CachedDataAdapter(string partitionKey, IObjectPool<FixedSizeBuffer> bufferPool, SerializationManager serializationManager)
            : base(serializationManager, bufferPool)
        {
        }

        StreamPosition ICacheDataAdapter<EventData, CachedEventHubMessage>.GetStreamPosition(EventData queueMessage)
        {
            // REPLACE - get stream position from EventData
            return base.GetStreamPosition(queueMessage);
            // REPLACE - get stream position from EventData
        }

        StreamPosition ICacheDataAdapter<EventData, CachedEventHubMessage>.QueueMessageToCachedMessage(ref CachedEventHubMessage cachedMessage, EventData queueMessage, DateTime dequeueTimeUtc)
        {
            // REPLACE - convert EventData to CachedEventHubMessage
            return base.QueueMessageToCachedMessage(ref cachedMessage, queueMessage, dequeueTimeUtc);
            // REPLACE - convert EventData to CachedEventHubMessage
        }
    }

    /// <summary>
    /// Custom Eventhub queue cache factory, which creates the queue cache using the CustomDataAdapter
    /// </summary>
    private class CustomCacheFactory : EventHubQueueCacheFactory
    {
        public CustomCacheFactory(
            EventHubStreamCachePressureOptions cacheOptions,
            StreamCacheEvictionOptions evictionOptions,
            StreamStatisticOptions statisticOptions,
            SerializationManager serializationManager, EventHubMonitorAggregationDimensions sharedDimensions,
            Func<EventHubCacheMonitorDimensions, ILoggerFactory, ITelemetryProducer, ICacheMonitor> cacheMonitorFactory = null,
            Func<EventHubBlockPoolMonitorDimensions, ILoggerFactory, ITelemetryProducer, IBlockPoolMonitor> blockPoolMonitorFactory = null)
            : base(cacheOptions, evictionOptions, statisticOptions, serializationManager, sharedDimensions, cacheMonitorFactory, blockPoolMonitorFactory)
        {
        }

        protected override IEventHubQueueCache CreateCache(string partition, StreamStatisticOptions statisticOptions, IStreamQueueCheckpointer<string> checkpointer,
                    ILoggerFactory loggerFactory, IObjectPool<FixedSizeBuffer> bufferPool, string blockPoolId, TimePurgePredicate timePurge,
                    SerializationManager serializationManager, EventHubMonitorAggregationDimensions sharedDimensions, ITelemetryProducer telemetryProducer)
        {
            var cacheMonitorDimensions = new EventHubCacheMonitorDimensions(sharedDimensions, partition, blockPoolId);
            ICacheMonitor cacheMonitor = this.CacheMonitorFactory(cacheMonitorDimensions, loggerFactory, telemetryProducer);
            ILogger cacheLogger = loggerFactory.CreateLogger($"{typeof(EventHubQueueCache).FullName}.{sharedDimensions.EventHubPath}.{partition}");
            ICacheDataAdapter<EventData, CachedEventHubMessage> dataAdapter = new CachedDataAdapter(partition, bufferPool, serializationManager);
            IEvictionStrategy<CachedEventHubMessage> evictionStrategy = new EventHubCacheEvictionStrategy(cacheLogger, timePurge, cacheMonitor, statisticOptions.StatisticMonitorWriteInterval);

            return new EventHubQueueCache(checkpointer, dataAdapter, EventHubDataComparer.Instance, cacheLogger, evictionStrategy, cacheMonitor, statisticOptions.StatisticMonitorWriteInterval);
        }
    }

    public static new IQueueAdapterFactory Create(IServiceProvider services, string name)
    {
        var factory = ActivatorUtilities.CreateInstance<CustomDataStreamAdapterFactory>(services, name);
        factory.Init();
        return factory;
    }
}

Once the above adapter factory is implemented, users would then need to configure their services and clients to use the custom adapter, something like:

Silo:

hostBuilder
    .AddPersistentStreams(StreamProviderName, CustomDataStreamAdapterFactory.Create, configurator => configurator
        .Configure<EventHubOptions>(ob => ob.Configure(options =>
        {
            options.ConnectionString = TestDefaultConfiguration.EventHubConnectionString;
            options.ConsumerGroup = EHConsumerGroup;
            options.Path = EHPath;
        }))
        .UseStaticClusterConfigDeploymentBalancer()
        .ConfigureComponent<AzureTableStreamCheckpointerOptions, IStreamQueueCheckpointerFactory>(EventHubCheckpointerFactory.CreateFactory, builder => builder
            .Configure(options =>
            {
                options.ConnectionString = TestDefaultConfiguration.DataConnectionString;
            })));

Client:

clientBuilder
    .AddPersistentStreams(StreamProviderName, CustomDataStreamAdapterFactory.Create, b=>b
        .Configure<EventHubOptions>(ob=>ob.Configure( options =>
        {
            options.ConnectionString = TestDefaultConfiguration.EventHubConnectionString;
            options.ConsumerGroup = EHConsumerGroup;
            options.Path = EHPath;
        })));

Also, This is awful.

Hi! @jason-bragg I have found your commentary on issues on this github really helpful. Thanks for leaving such great detail. Is this still the path of least resistance for reading external eventhub events?

We are looking to adopt orleans at my company. we want to use orleans to process IoT Hub events.

jason-bragg commented 3 years ago

No @iannovic, We've made some progress on this. Still not where I would like it to be but better. You should be able to implement a data adapter component and register it in the container, without specializing anything else. Please see https://github.com/dotnet/orleans/pull/5580.