I was playing around with the PerformanceTest example and wanted to try the multithreaded non-library mode. If I understood the config correctly this is achieved by setting the OwnedThreads stream scheduler.
However, if I do this a simple GroupAggregate will cause the Cache()call to throw in StreamableIO.cs:179 with a NotImplementedException.
I included the full code to reproduce the issue below including a dump of the config. Is there something I am missing that I need to do in order to use Cache() in multithreaded mode or is this a bug?
I have tried multiple configuration combinations for the OwnedThreads scheduler but all result in the same error, same with using intervals instead of StartEdge events. I encountered the same issues for equi-joins as well, but the grouping example is simpler.
If caching is not inteded for multithreaded modes is there a way to achieve similar, efficient in-memory caching of the results that I could use to measuer the performance of my queries?
And are there other settings in the config I should fine-tune to achieve the best throughput for multi-threaded execution?
Thanks!
public static void Main(string[] args)
{
Config.StreamScheduler = StreamScheduler.OwnedThreads(2);
var rand = new Random();
var dataset =
Observable.Range(0, 100000)
.Select(e => StreamEvent.CreateStart(1, new { field1 = e % 1000, field2 = rand.Next(1, 100) }))
.ToStreamable()
.SetProperty().IsConstantDuration(true, StreamEvent.InfinitySyncTime)
.Cache();
var query = dataset.Where(d => d.field1 < 950).GroupAggregate(
d => d.field1,
d => d.Sum(d => d.field2),
d => d.Count(),
(key, sum, count) => new { key.Key, sum, count });
// Run the query
var sw = new Stopwatch();
GC.Collect();
GC.WaitForPendingFinalizers();
sw.Start();
var result = query.Cache();
GC.Collect();
GC.WaitForPendingFinalizers();
sw.Stop();
var count = 0ul;
result.Count().ToStreamEventObservable().ForEachAsync(r => count += r.Payload).Wait();
Console.WriteLine("Query Took {0} ms on average, had {1} rows", sw.ElapsedMilliseconds / repeatCount, count);
I was playing around with the PerformanceTest example and wanted to try the multithreaded non-library mode. If I understood the config correctly this is achieved by setting the OwnedThreads stream scheduler. However, if I do this a simple GroupAggregate will cause the
Cache()
call to throw inStreamableIO.cs:179
with aNotImplementedException
. I included the full code to reproduce the issue below including a dump of the config. Is there something I am missing that I need to do in order to useCache()
in multithreaded mode or is this a bug?I have tried multiple configuration combinations for the
OwnedThreads
scheduler but all result in the same error, same with using intervals instead of StartEdge events. I encountered the same issues for equi-joins as well, but the grouping example is simpler.If caching is not inteded for multithreaded modes is there a way to achieve similar, efficient in-memory caching of the results that I could use to measuer the performance of my queries? And are there other settings in the config I should fine-tune to achieve the best throughput for multi-threaded execution?
Thanks!
Config: