davidmoten / state-machine

Finite state machine class generator for java, exports graphml, supports immutability!
Apache License 2.0
135 stars 14 forks source link

How to push a stream of JSON messages through state-machine? #1

Open davidmoten opened 7 years ago

davidmoten commented 7 years ago

From email:

David- Thank you so much for contributing to the code for RxJava2.

I have an application where I am receiving a single Flowable stream of JSON-formatted events that come from a number of different sources. Each event has the sourceID in it. These events are asynchronous. There can be a few events for a given source or a continual (never-ending) stream of events. I cannot know the specific sourceID beforehand, and there could be events from a thousand different sources intermingled in the single stream.

Question: If I need to evaluate the state of a given source using a Finite State Machine, do you have any ideas about adapting your state-machine to safely be used in such an environment? I could see trying to keep the state of a given source in a hashmap, but cannot quite envision how to fold that into your state machine into that scheme. Do any other approaches come to mind?

Kindest regards,

Ewin Barnett Columbia, Missouri.

davidmoten commented 7 years ago

Hi Ewin

This sort of conversation is much better to put on github issues so other people can see now and later and also I can use markdown for formatting. Do you mind if we move the conversation there? We don't have to if you don't want to of course.

Ok you are getting a single Flowable stream that is the merge of all the source streams. Presumably the sources can be categorized such that any source is of type T and emits events that happen to a T with unique id in the T domain.

The first step is to make a state machine definition like:

https://github.com/davidmoten/state-machine/blob/master/state-machine-test-definition/src/main/java/com/github/davidmoten/fsm/example/StateMachineDefinitions.java

Then you define a Processor like the createProcessor method in:

https://github.com/davidmoten/state-machine/blob/master/state-machine-test/src/test/java/com/github/davidmoten/fsm/rx/ProcessorTest.java

To use the rx side when you build the Processor you specify also the signals:

Observable<Signal<?, String> signals = ...
Processor<String> processor = Processor //
    .behaviour(Microwave.class, behaviour) //
    .processingScheduler(Schedulers.immediate()) //
    .signalScheduler(signalScheduler) //
    .signals(signals) //
    .build();

then subscribe to the processor:

processor.observable().subscribe(subscriber);

By the way Flowable is RxJava2, the equivalent in RxJava1 is Observable and this project supports RxJava1 at the moment, I'll probably migrate to RxJava2 this week seeing as you're asking about it Suppose now you have a Observable which is a stream of JSON messages as you've described then signals is constructed like this:

Observable<Signal<?, String>> signals = json.map(s -> {
    Class<?> cls = ...; //from json 
    String id = ...; //from json
    Event<?> event = ...;//from json    
    return Signal.create(cls, id, event)) });

I think we need a fully worked example in the docs/tests for this. Does that help?

davidmoten commented 7 years ago

From Ewin:

First, yes. Happy to move the conversation to GitHub.

Second. Thank you so much for your generous answer!

Third: Let me work my way through your answer, pondering much along the way. And yes, it would be great to develop a working example,

Cheers! --Ewin

davidmoten commented 7 years ago

I've created an example at https://github.com/davidmoten/state-machine/blob/master/state-machine-test/src/test/java/com/github/davidmoten/fsm/rx/StreamingTest.java which accepts JSON stream and pushes the events through the state machine using RxJava. Have a look at that.

davidmoten commented 7 years ago

RxJava2 migration done in branch rx2. Let me know if you'd be happy to be cutting edge and use that version. If so then I can release to Maven Central.

davidmoten commented 7 years ago

version 0.2 released to Maven Central supporting RxJava 2 (Flowables).

vlxewin commented 7 years ago

David-

Super. I will take a look.
BTW, where do I fetch java.util.function.Supplier from? My quest to locate it has been fruitless. Must be my male refrigerator blindness.

--Ewin

davidmoten commented 7 years ago

It's part of java 8 Dave

On 20 June 2017 at 23:39, vlxewin notifications@github.com wrote:

David-

Super. I will take a look. BTW, where do I fetch java.util.function.Supplier from? My quest to locate it has been fruitless. Must be my male refrigerator blindness.

--Ewin

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/davidmoten/state-machine/issues/1#issuecomment-309756191, or mute the thread https://github.com/notifications/unsubscribe-auth/AATa6zulq-ko7eaoAiYIkfgwrEsJ-F7gks5sF8uDgaJpZM4N-73- .

vlxewin commented 7 years ago

Yes, yes. Brain fart question. --Ewin

vlxewin commented 7 years ago

David - Question on how to adapt my states and events to you architecture.

"As you can see the definition is pretty concise. This is largely because of the advisable constraint that any one State can only be arrived at via one Event type."

I have asynchronous status events from a number of different units that are received by this IOT box. There are several sequences. Most have only one event. One has more than one event in the sequence.

One event is "powerup" (PU event). Another event is "powerup-complete-unit-ready".(PUCR event) Another is "fault" (F). Another event is a chain of several events that each follow each other in a fixed format (DiagA, DiagB, DiagC events).

After each of these sequences, the State for that unitID is "ready for input". That seems to violate your architecture because I can arrive at this State from a PU, a PUCR, a F and a DiagC event.

Can you give any guidance about how to approach defining this?

--Ewin

vlxewin commented 7 years ago

David-

Please let me add that the units I am monitoring are remote and may suffer from an internet/power outage. So, if I start to receive events for a sequence that is supposed to have three events (like DiagA, DiagB and DiagC), but I only receive DiagA, I need to have a timeout that can take me to an ErrorReport state, and from there back to "ready for input" state. This only adds to the number of States that route me back to the single "ready for input" State, which appear to violate your architecture.

I didn't yet say that the unit emits a heartbeat event every 60 seconds and the "ready for input" state has a 70 second idle timer to alert when a heartbeat is not received. But this is why I must use a FSM as I am sure you can see.

Cheers! --Ewin

davidmoten commented 7 years ago

There are at least a couple of approaches that you can use:

If you use inheritance the critical thing is that the onEntry procedure for the ReadyForInput state should not have

if event is of type blah
  do this
else if event is of type blah2 
  do that
...

That sort of treatment should ideally be extracted into extra states and events.

To use extra states, suppose you want

A -> R via E1
B -> R via E2

As you've noted that violates the rule so you can create extra states A', B':

A -> A' via E1
A -> B' via E2
A' -> R via E3
B' -> R via E3

The onEntry procedures for A', B' look very simple:

/onEntry
send E3 to self

I'd be glad (in fact quite interested) to review the full state diagram in a form like above if you like.

davidmoten commented 7 years ago

BTW the fact that PowerUp and Fault events both take us to ReadyForInput looks like a prime candidate for extracting intermediate states especially as I imagine that a Fault event might normally involve some sort of side-effect for a lot of systems (like logging etc).

State1 -> PoweringUp via PowerUp
State2 -> FaultOccurred via Fault
PoweringUp -> ReadyForInput via Ready
FaultOccurred -> ReadyForInput via Ready

FaultOccurred onEntry procedure:

/entry
log failure
send Ready to self

PoweringUp onEntry procedure:

/entry
send Ready to self
vlxewin commented 7 years ago

David-

You are very kind to reply. Let me work up a little better example today and I will post it. And yes, the Fault event does have some side effects as do a few other events, but I wanted to give you as simple an example as possible.

Cheers!

--Ewin

vlxewin commented 7 years ago

David-

I am happy to take you up on your offer. I have attached a graphic to help show the states and transitions.

Kindest regards,

Ewin

Simplified Event Flow.

The following is a simplified flow of events for the purpose of arriving at a design that can be implement used David Moten's State Machine project.

We have an IOT application where we are monitoring some remote units. There can be up to a few hundred remote units at any one time. The remote unit can emit some data or alarm events asynchronously. It also can emit one of several sequences of events in response to a query command that we send to it from a separate process that is processing human input.

Here are the events:

1.

A data event is emitted in response to a command sent to the unit. In
essence the data event is asynchronous, at least as far as this state
machine is concerned. It is a sequence of one, after which we would
return to the Idle state to await any other events.

2.

A Diagnostic event that is asynchronously emitted. It is a sequence of
either two or three events. Event DiagA could be followed by an
InstrumentFailed event or it could be followed by a DiagB event and a
DiagC event. There might be more than one DiagB event but the sequence
always ends with a DiagC event. In either case, we would return to Idle
to await new events.

3.

An alarm event that is asynchronously emitted. It is a sequence of one,
after which we would return to the idle state. The alarm event causes
some logging activity.

The Event Sequences would be:

1) Idle → Data → Idle.

2) Idle → DiagA → InstrumentFailed → Idle.

3) Idle → DiagA → DiagB → DiagC → Idle

4) Idle → DiagA → DiagB → DiagB → DiagC → Idle (any number of DiagB events)

5) Idle → Alarm → Idle

Case 1:

Idle → Data via Idle-Data

Data (persists data) → Idle via Data-Idle

Case 2:

Idle → DiagA via Idle-DiagA

DiagA (persists some data) → InstrumentFail via DiagA-IFail

InstrumentFail → Idle via Ifail-Idle

Case 3:

Idle → DiagA via Idle-DiagA

DiagA → DiagB (persists data) via DiagA-DiagB

DiagB → DiagC (persists data) via DiagB-DiagC

DiagC → Idle (persists data) via DiagC-Idle

Case 4:

Idle → DiagA via Idle-DiagA

DiagA → DiagB (persists data) via DiagA-DiagB

DiagB → DiagB (persists data) via AdditionalDiagB (loopback)

DiagB → DiagC (persists data) via DiagB-DiagC

DiagC → Idle (persists data) via DiagC-Idle

Case 5:

Idle → Alarm via Idle-Alarm

Alarm (persists data) → Idle via Alarm-Idle.

Constraint: In the case of a sequence that has more than one event, I need to have a timeout so that if I don't receive the next event in sequence, I can go to error state where I emit a log entry and then return to Idle.

This is a contrived schemata so that any answer that is developed can be openly shared with others. I am grateful for your help.

davidmoten commented 7 years ago

Thanks, I'll take a stab at it.

The first thing I'm looking for is independence. The alarm event needs to be logged no matter what else is going on and presumably doesn't directly affect any other interaction with the device. The alarm event certainly may indicate something worrying has happened to the device but cancellation of other conversations with the device might be expected to happen via timeout rather than as a consequence of the alarm. So based on that assumption I treat alarm events completely separately.

The command conversation looks to be modelled by a standard looking state machine for DeviceCommand (I model each command conversation separately because they could presumably happen concurrently and I assume that each Data response is returned with a unique id to associate it with the command).

Symbology wise I represent a transition like this:

State1 -> State2: Event

DeviceCommand state machine

Transitions:

Initial State -> CommandSent: Command
CommandSent -> HasResponse: Data
HasResponse -> Terminal State: Done
CommandSent -> TimedOut: Timeout
TimedOut -> Terminal State: Done

onEntry procedures:

CommandSent:

send Timeout to self in N seconds
send Command to device

HasResponse:

cancel delayed signal to self 
persist Data
send Done to self

TimedOut:

persist timeout result
send Done to self

With regard to the Diagnostic events I assume that any sequence is uniquely identified so that we can handle two concurrently emitted sequences. If so then I would expect a state machine for each sequence that would look as below.

DeviceDiagnosticSequence state machine

Transitions:

Initial State -> DiagA Received: DiagA
DiagA Received -> DiagB Received: DiagB
DiagB Received -> DiagB Received: DiagB
DiagB Received -> DiagC Received: DiagC
DiagA Received -> Instrument Failed: InstrumentFailure
DiagC Received -> Terminal State: Done
DiagA Received -> Timed Out: Timeout
DiagB Received -> Timed Out: Timeout
DiagC Received -> Timed Out: Timeout
Timeout -> Terminal State: Done
Instrument Failed -> Terminal State: Done

onEntry procedures:

DiagA Received:

persist DiagA
send Timeout to self in N seconds

DiagB Received:

cancel delayed signal to self
persist DiagB
send Timeout to self in N seconds

DiagC Received:

cancel delayed signal to self
persist DiagC
send Done to self

Timed Out:

persist timeout info
send Done to self

Instrument Failed:

cancel delayed signal to self
persist InstrumentFailure
send Done to self

Bear in mind that what I've put above are short lived state machines that spring into existence when prompted to by an event that uniquely identifies a new device command or new device diagnostic sequence and then die when something terminal happens to them like a timeout or a finalising event.

If all the incoming command and diagnostic events from all devices were in one incoming stream then the stream would be grouped by commandId/diagnosticSequenceId and the member streams then would be processsed synchronously against new state machine instances as described above.

I think it helps mentally to distinguish event names from state names and I usually achieve this by using the odd verb in the state name like Has or Received.

vlxewin commented 7 years ago

David-

Let me code up one of these and get your input.

--Ewin