Closed rysavy-ondrej closed 6 months ago
Flow replay was implemented and tested.
The bottleneck seems to be in the interaction with DB (enricher and context saver).
The parallelization of operation in the reactive workflow can be done by using the following pattern:
using System.Reactive.Concurrency;
using System.Diagnostics;
var sw = new Stopwatch();
sw.Start();
int SomeCpuIntensiveOperation(int x)
{
Console.WriteLine($"[{sw.ElapsedMilliseconds}ms] LongRunningTask {x} %{Environment.CurrentManagedThreadId}");
Thread.Sleep(TimeSpan.FromSeconds(x));
return x;
}
var maxConcurrent = 4; // Maximum number of concurrent threads
var scheduler = TaskPoolScheduler.Default;
Observable.Range(1, 5)
.Do(x => Console.WriteLine($"[{sw.ElapsedMilliseconds}ms] Queued {x} %{Environment.CurrentManagedThreadId}"))
.Select(item =>
Observable.Start(() =>
{
// CPU-intensive work here
return SomeCpuIntensiveOperation(item);
}, scheduler))
.Merge(maxConcurrent)
.Subscribe(x => Console.WriteLine($"[{sw.ElapsedMilliseconds}ms] Completed {x} %{Environment.CurrentManagedThreadId}"));
The pipeline implementation has been rewritten to TPL instead of RX. This gives better control over element processing.
Measure performance and do possible optimizations: