louthy / language-ext

C# functional language extensions - a base class library for functional programming
MIT License
6.44k stars 417 forks source link

`repeat` capturing free variables #1065

Closed mozesa closed 2 years ago

mozesa commented 2 years ago

Discussed in https://github.com/louthy/language-ext/discussions/1064

Originally posted by **mozesa** June 23, 2022 ```csharp using System.Text; using LanguageExt; using LanguageExt.Common; using LanguageExt.Effects.Traits; using LanguageExt.Sys; using LanguageExt.Sys.Traits; using static LanguageExt.Prelude; namespace ConsoleApp; public class QueueExample where RT : struct, HasCancel, HasConsole, HasDirectory, HasFile { public static Aff Main() { var content = Encoding.ASCII.GetBytes("test\0test\0test\0"); var memStream = new MemoryStream(100); memStream.Write(content, 0, content.Length); memStream.Seek(0, SeekOrigin.Begin); return repeat( // from _51 in SuccessEff(unit) from ln in ( from data in Eff(memStream.ReadByte) from _ in guard(data != -1, Errors.Cancelled) select data).FoldUntil(string.Empty, (s, ch) => s + (char)ch, ch => ch == '\0') from _52 in Console.writeLine(ln) select unit) | @catch(exception => Console.writeLine(exception.Message)); } } ``` Result when this line `// from _51 in SuccessEff(unit)` left commented out. ```text test test test test test test cancelled ``` Result when this line ` from _51 in SuccessEff(unit)` is enabled. ```text test test test cancelled ```
louthy commented 2 years ago

Fixed in v4.2.8

mozesa commented 2 years ago
private static Producer<RT, string, Unit> Reading(string ip) =>
    from client in Eff(() => new TcpClient())
    from _ in client.ConnectAsync(ip, 23).ToUnit().ToAff()
    let stream = client.GetStream()
    let reader = new StreamReader(stream)
    from _43 in repeat(
        from data in reader.ReadLineAsync().ToAff()
        from _1 in Producer.yield<RT, string>(data)
        select unit)
    select unit;

The problem that reader.ReadLineAsync().ToAff() line captures variable. Interestingly, Aff(() => reader.ReadLineAsync().ToValue()) solves the issue.

Is it enough? or I should create an MRE?

Am I misusing something that I always bump into captured variable issue?

Thanks for your support in advance.

louthy commented 2 years ago

ToAff() can't re-call reader.ReadLineAsync(), it can only convert the result of calling reader.ReadLineAsync() (once) to an Aff. And so you are baking in the result-value for good. Usually you'd only use ToAff() with a pure value, or something that you want to effectively become a constant. The only way to create something that re-runs each time the expression is evaluated is to lift a lambda, which is what you do with Aff(() => ...). This is just how C# works, so there's no bug here, just different use-cases. Remember also that LINQ is a nested set of lambdas, and so after the first from ... everything below it is within a lamdba.

Even then you need to be careful of what you lift into the lambda, reader in this case is instanced outside of the repeat. So, you need to consider what it is you want to be repeated.

btw, what you're attempting already exists (that is the yielding of strings from a stream). So, you only need to provide the behaviour of opening the stream. Here's an example (I've not tested it, but it should work! It also cleans up the disposables afterwards)

public class Example<RT>
    where RT : struct, HasCancel<RT>, HasTextRead<RT>
{
    static Producer<RT, TextReader, Unit> TcpConnect(string ip) =>
        from client in use<RT, TcpClient>(Eff(() => new TcpClient()))
        from _1     in client.ConnectAsync(ip, 23).ToUnit().ToAff()
        from reader in use<RT, StreamReader>(SuccessEff(new StreamReader(client.GetStream())))
        from _2     in Producer.yield<RT, TextReader>(reader)
        select unit;

    static Producer<RT, string, Unit> Reading(string ip) =>
        TcpConnect(ip) | TextRead<RT>.readLine;
}
mozesa commented 2 years ago

I can't thank you enough!

Update

static Producer<RT, string, Unit> Reading(string ip) =>
    TcpConnect(ip) | TextRead<RT>.readLine;

should be

static Effect<RT, Unit> Reading(string ip) =>
    TcpConnect(ip) | TextRead<RT>.readLine | writeLine;

where writeLine is

static Consumer<RT, string, Unit> writeLine =>
    from l in awaiting<string>()
    from _ in Console<RT>.writeLine(l)
    select unit;
mozesa commented 2 years ago

@louthy Stick to your example, how it is possible to cancel the TcpClient?

Thanks for your help in advance.

public class Example<RT>
    where RT : struct, HasCancel<RT>, HasTextRead<RT>
{
    static Producer<RT, TextReader, Unit> TcpConnect(string ip) =>
        from client in use<RT, TcpClient>(Eff(() => new TcpClient()))
        from _1     in client.ConnectAsync(ip, 23).ToUnit().ToAff()
        from reader in use<RT, StreamReader>(SuccessEff(new StreamReader(client.GetStream())))
        from _2     in Producer.yield<RT, TextReader>(reader)
        select unit;

    static Producer<RT, string, Unit> Reading(string ip) =>
        TcpConnect(ip) | TextRead<RT>.readLine;
}
louthy commented 2 years ago

It will be disposed automatically (and therefore closed) upon the effect completing.

mozesa commented 2 years ago

That's clear, but how I can force the completion of an effect? How to shut down the producer?

louthy commented 2 years ago

Producers and Pipes obviously have to be composed with Consumers to produce an Effect<RT, A>. Effects are entirely self-enclosed systems. They will continue to run until they have either:

For example, here's an effect that writes the lines to the screen, until a line is "exit". It uses guards to test and then raise the error if the predicate is true

public class Example<RT>
    where RT : struct, HasCancel<RT>, HasTextRead<RT>, HasConsole<RT>
{
    static Producer<RT, TextReader, Unit> TcpConnect(string ip) =>
        from client in use<RT, TcpClient>(Eff(() => new TcpClient()))
        from _1 in client.ConnectAsync(ip, 23).ToUnit().ToAff()
        from reader in use<RT, StreamReader>(SuccessEff(new StreamReader(client.GetStream())))
        from _2 in Producer.yield<RT, TextReader>(reader)
        select unit;

    static Producer<RT, string, Unit> Reading(string ip) =>
        TcpConnect(ip) | TextRead<RT>.readLine;

    private static Consumer<RT, string, Unit> Writing =>
        from ln in awaiting<string>()
        from _1 in guardnot(ln == "exit", Error.New("exit")).ToAff<RT>() 
        from _2 in Console<RT>.writeLine(ln)
        select unit;

    static Effect<RT, Unit> YourEffect(string ip) =>
        Reading(ip) | Writing;
}

(I need to make guards work properly with Proxy, but until then you can call ToAff<RT>() to make the guard into a type that can work with Proxy).

If you need to cancel the Effect from the outside, i.e. not from within the Effect stream itself. Then first you need to see what you get when you call RunEffect():

   Aff<RT, Unit> effect = YourEffect("127.0.0.1").RunEffect();

That returned Aff<RT, Unit> is the whole effect encapsulated in a re-runnable Aff. So, calling effect.Run(runtime) means that the runtime argument must have a CancellationToken which you can then cancel in the normal .NET way.

The other way is that you may then include effect in a larger Aff expression:

    from x in SuccessEff(1)
    from _ in YourEffect("127.0.0.1")
    from y in SuccessEff(2)
    select x + y;

As soon as the from y ... starts, the Effect and all of its resource will have automatically been cleaned up.

One final method is to fork the effect, so that it runs independently of its parent expression:

    from cancel in fork(YourEffect("127.0.0.1"))
    from _1     in Console<RT>.readKey
    from _2     in cancel
    select unit;

This will allow the effect to run in its own thread. The parent thread will wait for any key to be pressed in the console, and will then run the cancel effect, which will shutdown the forked effect (and in the process clean up all of the resources).