akka / alpakka

Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/libraries/alpakka/current/
Other
1.26k stars 644 forks source link

[Failure] MQTT streaming: publish twice with a QoS of 1 so that the second is queued #1723

Open ennru opened 5 years ago

ennru commented 5 years ago

"MQTT client connector should publish twice with a QoS of 1 so that the second is queued."

https://travis-ci.com/akka/alpakka/jobs/202157323#L890

Stages didn't stop in time

activeShells (actor: akka://mqtt-spec/system/StreamSupervisor-3/flow-105-0-ignoreSink):
  GraphInterpreterShell(
  logics: [
    akka.stream.impl.fusing.GraphStages$IgnoreSink$@6db721cf attrs: [Name(ignoreSink), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    LazyFlow attrs: [Name(lazyFlow), Name(lazyFlow), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    MapAsync(10,akka.stream.alpakka.mqtt.streaming.scaladsl.ActorMqttClientSession$$Lambda$10779/1372930626@6f1ddca5) attrs: [Name(mapAsync), LogLevels(LogLevel(4),LogLevel(4),LogLevel(4)), LogLevels(LogLevel(4),LogLevel(4),LogLevel(4)), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    Log attrs: [LogLevels(LogLevel(4),LogLevel(4),LogLevel(4)), LogLevels(LogLevel(4),LogLevel(4),LogLevel(4)), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    Map(akka.stream.alpakka.mqtt.streaming.scaladsl.ActorMqttClientSession$$Lambda$10778/686177776@6ecec637) attrs: [Name(map), LogLevels(LogLevel(4),LogLevel(4),LogLevel(4)), LogLevels(LogLevel(4),LogLevel(4),LogLevel(4)), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    akka.stream.alpakka.mqtt.streaming.impl.MqttFrameStage@6ebfc6b2 attrs: [LogLevels(LogLevel(4),LogLevel(4),LogLevel(4)), LogLevels(LogLevel(4),LogLevel(4),LogLevel(4)), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    TerminationWatcher attrs: [Name(terminationWatcher), LogLevels(LogLevel(4),LogLevel(4),LogLevel(4)), LogLevels(LogLevel(4),LogLevel(4),LogLevel(4)), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    Watch(Actor[akka://mqtt-spec/user/client-connector-20#2106597224]) attrs: [Name(watch), LogLevels(LogLevel(4),LogLevel(4),LogLevel(4)), LogLevels(LogLevel(4),LogLevel(4),LogLevel(4)), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    akka.stream.alpakka.mqtt.streaming.scaladsl.CoupledTerminationBidi@3ad22bf3 attrs: [InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    MapAsyncUnordered(1,docs.scaladsl.MqttSessionSpec$$Lambda$11258/1117482543@3b2b30f8) attrs: [Name(mapAsyncUnordered), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)],
    akka.stream.impl.QueueSource@23cca6d4 attrs: [Name(queueSource), InputBuffer(4,16), SupervisionStrategy(<function1>), Dispatcher(akka.actor.default-dispatcher)]
  ],
  connections: [
    Connection(0, akka.stream.impl.fusing.GraphStages$IgnoreSink$@6db721cf, MapAsync(10,akka.stream.alpakka.mqtt.streaming.scaladsl.ActorMqttClientSession$$Lambda$10779/1372930626@6f1ddca5), ShouldPush)
    Connection(1, LazyFlow, akka.stream.impl.QueueSource@23cca6d4, Closed)
    Connection(2, MapAsync(10,akka.stream.alpakka.mqtt.streaming.scaladsl.ActorMqttClientSession$$Lambda$10779/1372930626@6f1ddca5), Log, Closed)
    Connection(3, Log, Map(akka.stream.alpakka.mqtt.streaming.scaladsl.ActorMqttClientSession$$Lambda$10778/686177776@6ecec637), Closed)
    Connection(4, Map(akka.stream.alpakka.mqtt.streaming.scaladsl.ActorMqttClientSession$$Lambda$10778/686177776@6ecec637), akka.stream.alpakka.mqtt.streaming.impl.MqttFrameStage@6ebfc6b2, Closed)
    Connection(5, akka.stream.alpakka.mqtt.streaming.impl.MqttFrameStage@6ebfc6b2, TerminationWatcher, Closed)
    Connection(6, TerminationWatcher, Watch(Actor[akka://mqtt-spec/user/client-connector-20#2106597224]), Closed)
    Connection(7, Watch(Actor[akka://mqtt-spec/user/client-connector-20#2106597224]), akka.stream.alpakka.mqtt.streaming.scaladsl.CoupledTerminationBidi@3ad22bf3, Closed)
    Connection(8, akka.stream.alpakka.mqtt.streaming.scaladsl.CoupledTerminationBidi@3ad22bf3, LazyFlow, Closed)
    Connection(9, akka.stream.alpakka.mqtt.streaming.scaladsl.CoupledTerminationBidi@3ad22bf3, MapAsyncUnordered(1,docs.scaladsl.MqttSessionSpec$$Lambda$11258/1117482543@3b2b30f8), Closed)
    Connection(10, MapAsyncUnordered(1,docs.scaladsl.MqttSessionSpec$$Lambda$11258/1117482543@3b2b30f8), akka.stream.alpakka.mqtt.streaming.scaladsl.CoupledTerminationBidi@3ad22bf3, Closed
  ]
)
dot format graph for deadlock analysis:
================================================================
digraph waits {
  N0 [label="akka.stream.impl.fusing.GraphStages$IgnoreSink$@6db721cf"];
  N1 [label="LazyFlow"];
  N2 [label="MapAsync(10,akka.stream.alpakka.mqtt.streaming.scaladsl.ActorMqttClientSession$$Lambda$10779/1372930626@6f1ddca5)"];
  N3 [label="Log"];
  N4 [label="Map(akka.stream.alpakka.mqtt.streaming.scaladsl.ActorMqttClientSession$$Lambda$10778/686177776@6ecec637)"];
  N5 [label="akka.stream.alpakka.mqtt.streaming.impl.MqttFrameStage@6ebfc6b2"];
  N6 [label="TerminationWatcher"];
  N7 [label="Watch(Actor[akka://mqtt-spec/user/client-connector-20#2106597224])"];
  N8 [label="akka.stream.alpakka.mqtt.streaming.scaladsl.CoupledTerminationBidi@3ad22bf3"];
  N9 [label="MapAsyncUnordered(1,docs.scaladsl.MqttSessionSpec$$Lambda$11258/1117482543@3b2b30f8)"];
  N10 [label="akka.stream.impl.QueueSource@23cca6d4"];
  N0 -> N2 [label=shouldPush, color=red];
  N1 -> N10 [style=dotted, label=closed, dir=both];
  N2 -> N3 [style=dotted, label=closed, dir=both];
  N3 -> N4 [style=dotted, label=closed, dir=both];
  N4 -> N5 [style=dotted, label=closed, dir=both];
  N5 -> N6 [style=dotted, label=closed, dir=both];
  N6 -> N7 [style=dotted, label=closed, dir=both];
  N7 -> N8 [style=dotted, label=closed, dir=both];
  N8 -> N1 [style=dotted, label=closed, dir=both];
  N8 -> N9 [style=dotted, label=closed, dir=both];
  N9 -> N8 [style=dotted, label=closed, dir=both];
}
================================================================
raboof commented 4 years ago

again in https://travis-ci.com/akka/alpakka/jobs/283306789

ennru commented 4 years ago

https://travis-ci.com/akka/alpakka/jobs/283306862#L715 in #2118

ennru commented 4 years ago

https://travis-ci.com/github/akka/alpakka/jobs/384666580#L649