SodiumFRP / sodium

Sodium - Functional Reactive Programming (FRP) Library for multiple languages
http://sodium.nz/
Other
848 stars 138 forks source link

Double firing of stream in the same transaction multiply applies function. #151

Closed greyson closed 6 years ago

greyson commented 6 years ago

We have an application which collects a pool, and then submits a request based on whether that pool satisfies a threshold (external stimulus of some kind) which, of course, removes from the pool.

Unfortunately, it appears that the pool is being deducted from more than once in a transaction. I have written a test case which shows this working. Logically, I would assume that adding 10 to the pool and the resulting immediate deduction from the pool would be mergeed into a single (x = x + 10 - 10) which would result in x = 0.

The assertion in this test shows x = -10;

The submitPooledAmount stream should only be triggered once, but it appears to be triggered twice; thus applying itself twice to the pool, and is only filtered out at the end so that listen only sees one firing.

Either that or I've done something incredibly dumb here and am operating under some false assumption. I'm sorry that the test case is so dense, but it's the smallest, clearest amount of code I could cobble together to replicate the problem.


public class TestMultiFire
{
   @Test
   public void poolDoubleSubtraction()
   {
      CellSink< Integer > threshold = new CellSink<>( 10 );
      StreamSink< Integer > addPoolSink = new StreamSink<>();

      Tuple2< Stream< Integer >, Cell< Integer > > dat = Transaction.run( () ->
      {
         StreamLoop< Integer > submitPooledAmount = new StreamLoop<>();

         // Ways that the pool is modified.
         Stream< UnaryOperator< Integer > > pool_addByInput = addPoolSink
            .map( i -> x -> x + i );
         Stream< UnaryOperator< Integer > > pool_removeByUsage = submitPooledAmount
            .map( i -> x -> x - i );

         // The current level of the pool
         Cell< Integer > pool = pool_addByInput
            .merge( pool_removeByUsage, ( f, g ) -> x -> g.apply( f.apply( x ) ) )
            .accum( 0, Function::apply );

         // The current input changes combined with the pool as a stream
         Stream< Integer > input_byAdded = Stream.filterOptional( pool_addByInput
            .snapshot( pool, threshold, ( f, x, t ) -> f.apply( x ) >= t
               ? Optional.of( f.apply( x ) )
               : Optional.empty() ) );

         // Simple rising edge on pool threshold satisfaction.
         Stream< Integer > input_bySatisfaction = Stream
            .filterOptional( Operational.updates( pool )
               .snapshot( pool, threshold, ( neu, alt, t ) -> neu >= t && alt < t
                  ? Optional.of( neu )
                  : Optional.empty() ) );

         submitPooledAmount.loop( input_byAdded.merge( input_bySatisfaction, Math::max ) );

         return new Tuple2<>( submitPooledAmount, pool );
      });

      Stream< Integer > input = dat.a;
      Cell< Integer > pool = dat.b;

      LinkedList< Integer > submissions = new LinkedList<>();
      Listener listener = input.listen( submissions::add );

      // Add amount which can be immediately used based on threshold.
      // Pool should remain zero after the transaction is complete.
      addPoolSink.send( 10 );

      assertEquals( 1, submissions.size() );
      assertEquals( Integer.valueOf( 10 ), submissions.get( 0 ) );
      assertEquals( Integer.valueOf( 0 ), pool.sample() );
   }
}
greyson commented 6 years ago

Note that this particular test case can be "fixed" by changing input_bySatisfaction to

         // Simple rising edge on pool threshold satisfaction.
         Stream< Integer > input_bySatisfaction = Stream.filterOptional(
            Operational.updates( threshold )
               .snapshot( pool, (t, x) -> x >= t
                  ? Optional.of( x )
                  : Optional.empty() ) );

But still does not address why the firing of both input_bySatisfaction and input_byInput in the same transaction would not be properly merged by their maximum value.

jam40jeff commented 6 years ago

This is not a legal FRP circuit because there is a dependency cycle. In fact, the C# code throws an appropriate exception on the line:

submitPooledAmount.loop( input_byAdded.merge( input_bySatisfaction, Math::max ) );

to indicate where the dependency cycle is created. There is an issue open (#139) to implement this logic in the Java code as well.

To implement a feedback loop in Sodium, you must send the next value of the feedback loop through in a new transaction. I have added tests to the C# repository showing that the current way will break and a way to get around the dependency cycle.

greyson commented 6 years ago

Ouch. I depend heavily on at least three loops like this one; this was the only one giving me issues (as far as I am aware) however. I guess I'll have to go back through the codebase and use some strategic additions of Operational.defer()

Thank you for the reply.

greyson commented 6 years ago

Is it necessary to have the listener? How would you keep that from getting garbage collected, other than passing it up from the (very) deep parts of your code?

jam40jeff commented 6 years ago

You're right, I should have used Operational.Defer, which would have not forced me to manage the listener manually. I have updated the test.

ziriax commented 6 years ago

I don't understand, the only reason CellLoop and StreamLoop exist is to allow dependency cycles no? What kind of cycles are not allowed then?

clinuxrulz commented 6 years ago

Generally ones where a cell or stream depends recursively on itself at the same instant in time. Sort like ones that make no sense mathematically, see below: E.g.

CellLoop<Integer> cla = new CellLoop<>();
Cell<Integer> ca = cla.map((Integer a) -> a + 1);
cla.loop(ca);

Is completely invalid. Its like saying to someone in math class that x = x + 1, now solve for x. No solution.

Its probably helps to think of CellLoop<> and StreamLoop<> as providing forward references to things you need to use.

E.g.

StreamLoop<Integer> sla = new StreamLoop<>();
Cell<Integer> ca = sla.hold(0);
sla.loop(sEvent.snapshot(ca).map((Integer a) -> a + 1));

Is completely valid.

And

CellLoop<Integer> cla = new CellLoop<>();
cla.loop(sEvent.snapshot(cla).map((Integer a) -> a + 1));

Is completely valid. They do not go against any math laws.

Long story short. If you know how the ranks work internally. Anything that causes the rank to skyrocket to infinity is invalid.