akkadotnet / akka.net

Canonical actor model implementation for .NET with local + distributed actors in C# and F#.
http://getakka.net
Other
4.69k stars 1.04k forks source link

Akka.Streams: `MergeSubstreams` returns wrong type #5381

Open Aaronontheweb opened 2 years ago

Aaronontheweb commented 2 years ago

Version Information Version of Akka.NET? v1.4.28 Which Akka.NET Modules? Akka.Streams

Describe the bug

  var source = Source
                 .Cycle(() => Enumerable.Range(1, 100).Cast<long>().GetEnumerator())
                 .WireTapMaterialized(FlowRateMonitor("outbound/s"), Keep.None)
                 .Async()
                 .GroupBy(10, l => l % 10)
                 .Sum((l, l1) => l + l1)
                 .MergeSubstreams()
                 .WireTapMaterialized(FlowRateMonitor("outbound/s"), Keep.None)

MergeSubstreams returns an IFlow<TIn, TMat>, rather than a Flow<TIn, TMat> - this breaks our ability to continue to use the Akka.Streams builder interface to work with its output.

Expected behavior Should return a Flow<TIn, TMat>

Actual behavior returns an IFlow<TIn, TMat>

ismaelhamed commented 2 years ago

This would work:

var source = Source.From(Enumerable.Range(1, 100))
        .Select(i => (long)i)
        .WireTapMaterialized(Sink.ForEach<long>(l => Output.WriteLine(l.ToString())), Keep.Right)
        .Async()
        .GroupBy(10, l => l % 10)
        .Sum((l, l1) => l + l1)
        .MergeSubstreams()
        .WireTapMaterialized(Sink.ForEach<long>(l => Output.WriteLine($"Sum: {l}")), Keep.Right);

var task = ((Source<long, Task<Done>>)source)
    .RunWith(Sink.First<long>(), Materializer);

task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
leonardo-lurci-deltatre commented 1 month ago

@ismaelhamed where .WireTapMaterialized comes from?

ltouro commented 1 month ago

@ismaelhamed where .WireTapMaterialized comes from?

https://github.com/akkadotnet/akka.net/blob/dev/src/core/Akka.Streams/Dsl/FlowOperations.cs/#L1909

leonardo-lurci-deltatre commented 1 month ago

@ismaelhamed where .WireTapMaterialized comes from?

https://github.com/akkadotnet/akka.net/blob/dev/src/core/Akka.Streams/Dsl/FlowOperations.cs/#L1909

thank you!

This would work:

var source = Source.From(Enumerable.Range(1, 100))
        .Select(i => (long)i)
        .WireTapMaterialized(Sink.ForEach<long>(l => Output.WriteLine(l.ToString())), Keep.Right)
        .Async()
        .GroupBy(10, l => l % 10)
        .Sum((l, l1) => l + l1)
        .MergeSubstreams()
        .WireTapMaterialized(Sink.ForEach<long>(l => Output.WriteLine($"Sum: {l}")), Keep.Right);

var task = ((Source<long, Task<Done>>)source)
    .RunWith(Sink.First<long>(), Materializer);

task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();

Unfortunately it does not work because .WireTapMaterialised is an extension method for Flow<TIn, TOut, TMat> but .MergeSubstreams returns IFlow<TOut, TMat>. Am I doing something wrong?