lf-lang / lingua-franca

Intuitive concurrent programming in any language
https://www.lf-lang.org
Other
240 stars 63 forks source link

Federates wait for STP before exiting in decentralized coordination #2373

Closed Depetrol closed 3 months ago

Depetrol commented 4 months ago

In decentralized execution, when STP(STAA) is set, the federation doesn't exit immediately after request_stop(). It waits until the STAA has passed and then exit.

Here's a minimal example to reproduce:

target Python {
    coordination: decentralized
  }

  preamble {=
    import time
  =}

  reactor Client(STP_offset = {= FOREVER =}) {
    input server_message
    output client_message

    reaction(startup) {=
      print("Client Startup!")
    =}

    reaction(server_message) -> client_message {=
      val = server_message.value
      time.sleep(0.1)
      val += 1
      print("client:", val)
      if val==13:
          print("client done")
          request_stop()
      if val<13:
          client_message.set(val)
    =} STP(1000s) {=
      print("Client STP Violated!")
      exit(1)
    =}
  }

  reactor Server(STP_offset = {= FOREVER =}) {
    output server_message
    input client_message

    reaction(startup) -> server_message {=
      print("Server Startup!")
      server_message.set(0)
    =}

    reaction(client_message) -> server_message {=
      val = client_message.value
      time.sleep(0.1)
      val += 1
      print("server:", val)
      if val==12:
          print("server done")
          server_message.set(val)
          request_stop()
      if val<12:
          server_message.set(val)
    =} STP(1000s) {=
      print("Server STP Violated!")
      exit(1)
    =}
  }

  federated reactor(STP_offset = {= FOREVER =}) {
    client = new Client()
    server = new Server()
    server.server_message -> client.server_message
    client.client_message -> server.client_message after 0
  }

Output logs:

...
server: 10
client: 11
server: 12
server done
DEBUG: RTI: Received message type 10 from federate 1.
DEBUG: RTI handling stop_request from federate 1.
LOG: RTI received from federate 1 a MSG_TYPE_STOP_REQUEST message with tag (0, 7).
LOG: RTI forwarded to federates MSG_TYPE_STOP_REQUEST with tag (0, 7).
DEBUG: RTI: Received message type 11 from federate 0.
LOG: RTI received from federate 0 STOP reply tag (0, 7).
LOG: RTI sent to federates MSG_TYPE_STOP_GRANTED with tag (0, 7)
client: 13
client done
DEBUG: Reading from socket 5 failed with error: `Resource temporarily unavailable`. Will try again.
DEBUG: Reading from socket 4 failed with error: `Resource temporarily unavailable`. Will try again.
DEBUG: Reading from socket 5 failed with error: `Resource temporarily unavailable`. Will try again.
...

When the STP is set to 10s, the federation exits in about ~10s. When it is set to a large value, the federation doesn't exit in reasonable time.

Depetrol commented 3 months ago

Solved by #2394 and correctly shutting down the reactions with the following code:

target Python {
  coordination: decentralized
}

preamble {=
  import time
=}

reactor Client(STP_offset = {= FOREVER =}) {
  input server_message
  output client_message

  reaction(startup) {=
    print("Client Startup!")
  =}

  reaction(server_message) -> client_message {=
    val = server_message.value
    time.sleep(0.1)
    val += 1
    print("client:", val)
    if val==13:
        print("client done")
        request_stop()
    client_message.set(val)
  =} STP(1000s) {=
    print("Client STP Violated!")
    exit(1)
  =}
}

reactor Server(STP_offset = {= FOREVER =}) {
  output server_message
  input client_message

  reaction(startup) -> server_message {=
    print("Server Startup!")
    server_message.set(0)
  =}

  reaction(client_message) -> server_message {=
    val = client_message.value
    time.sleep(0.1)
    val += 1
    print("server:", val)
    if val==12:
        print("server done")
        server_message.set(val)
        request_stop()
    server_message.set(val)
  =} STP(1000s) {=
    print("Server STP Violated!")
    exit(1)
  =}
}

federated reactor(STP_offset = {= FOREVER =}) {
  client = new Client()
  server = new Server()
  server.server_message -> client.server_message
  client.client_message -> server.client_message after 0
}