croservices / cro-core

The heart of the Cro library for building distributed systems in Raku, including pipeline composition and TCP support.
https://cro.services/
Artistic License 2.0
27 stars 9 forks source link

Possible other type of pipeline object required? #21

Open jonathanstowe opened 3 years ago

jonathanstowe commented 3 years ago

This isn't so much an issue as an RFC, or maybe it will reveal a grave error in my understanding of Cro or a lack in the documentation.

I'm looking at creating a service that encapsulates what should be considered an opaque implementation of a messaging service where one object manages the connection state and the subscriptions to messages, so I wind up with something like the following:

use Cro::Service;
use Cro::Message;
use Cro::Source;
use Cro::Types;
use Cro::Connection;
use Cro::Connector;
use Cro::Transform;
use Cro::Sink;
use Cro;

class Test::Client {

    has Supply $!supply;

    has @!topics = <one two three>;

    method connect() {
        $!supply = supply {
            whenever Supply.interval(0.5) {
                emit { topic => @!topics.pick, message => DateTime.now.Str };
            }
        }
        Promise.kept: True;
    }

    method subscribe(Str $topic) {
        $!supply.grep(*<topic> eq $topic)
    }
}

class Cro::Test does Cro::Service {

    class ClientConnection does Cro::Connection {
        has Test::Client $.client is required handles <connect subscribe>;

        method produces() { }

        method incoming() {
            supply {
                emit self;
            }
        }
    }

    class BrokerConnection does Cro::Source {

        method produces() { ClientConnection }

        method incoming( ) {
            my $client = Test::Client.new;
            supply {
                whenever $client.connect {
                    emit ClientConnection.new(:$client);
                }
            }
        }
    }

    class Message does Cro::Message {
        has Str     $.topic;
        has Str     $.message;
    }

    class Subscription does Cro::Transform {
        has Str $.topic is required;

        method consumes() { ClientConnection }
        method produces() { Message }

        method transformer( Supply:D $incoming --> Supply ) {
            supply {
                whenever $incoming -> $connection {
                    whenever $connection.subscribe($!topic) -> %message {
                        emit Message.new(|%message);
                    }
                }
            }
        }
    }

    class Consumer does Cro::Sink {

        has Callable $.consume is required;

        method consumes() { Message }

        method sinker ( Supply:D $messages ) {
            supply {
                whenever $messages -> $message {
                    $!consume.($message);
                }
            }
        }
    }
}

my $service = Cro.compose(
    Cro::Test::BrokerConnection.new,
    Cro::Test::Subscription.new(topic => 'two'),
    Cro::Test::Consumer.new( consume => -> $m { say "GOT IT ", $m.raku }),
    );

$service.start;

react {
    whenever signal(SIGINT) {
        $service.stop;
        done;
    }
}

Where the Test::Client is just a mock of the actual thing. The actual intent is to have multiple Subscriptions each associated with possibly multiple Consumer (which would be handled by some SubscriptionSet and ConsumerSet Transformers in a similar way to which Cro::HTTP works.)

This all works fine (the actual thing not so much because of some scoping issue,) but it seems to me that the ClientConnection is all wrong, it's necessary because the Source has to produce something for the other components to consume (of course if one was aiming for a single Subscription then the BrokerConnection could deal with the subscription and emit the Message itself and omit the ClientConnection and Subscription altogether,) but really it is a facade for the underlying connection provided by the Source so doesn't need to have the produces or incoming as the next stages in the pipeline will simply receive the object and call the (possibly delegated,) methods on that.

So I think there needs to be an additional role which is basically like a Cro::Message but with less semantic baggage which can be passed between a Source and, say, a Transform which just implements behaviour specific to the application.