akka / akka-samples

Akka Sample Projects
http://akka.io
789 stars 441 forks source link

akka-sample-kafka-to-sharding-scala doesn't work as expected #219

Open evgkarasev opened 4 years ago

evgkarasev commented 4 years ago

Versions used

val AkkaVersion = "2.6.6" val AlpakkaKafkaVersion = "2.0.3" val AkkaManagementVersion = "1.0.5" val AkkaHttpVersion = "10.1.11" val KafkaVersion = "2.4.0" val LogbackVersion = "1.2.3"

Expected Behavior

[info] [2020-01-16 09:48:51,040] [INFO] [akka://KafkaToSharding/user/kafka-event-processor/rebalancerRef] - Partition [1] assigned to current node. Updating shard allocation [info] [2020-01-16 09:48:51,040] [INFO] [akka://KafkaToSharding/user/kafka-event-processor/rebalancerRef] - Partition [25] assigned to current node. Updating shard allocation [info] [2020-01-16 09:48:51,043] [INFO] [akka://KafkaToSharding/user/kafka-event-processor/rebalancerRef] - Partition [116] assigned to current node. Updating shard allocation

After producer started, in the single processor node the messages should be:

[info] [2020-01-16 09:51:38,672] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-26] [akka://KafkaToSharding/user/kafka-event-processor] - entityId->partition 29->45 [info] [2020-01-16 09:51:38,672] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-26] [akka://KafkaToSharding/user/kafka-event-processor] - Forwarding message for entity 29 to cluster sharding [info] [2020-01-16 09:51:38,673] [INFO] [sample.sharding.kafka.UserEvents$] [KafkaToSharding-akka.actor.default-dispatcher-26] [akka://KafkaToSharding/system/sharding/user-processing/75/29] - user 29 purchase cat t-shirt, quantity 0, price 8874 [info] [2020-01-16 09:51:39,702] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-17] [akka://KafkaToSharding/user/kafka-event-processor] - entityId->partition 60->111 [info] [2020-01-16 09:51:39,703] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-17] [akka://KafkaToSharding/user/kafka-event-processor] - Forwarding message for entity 60 to cluster sharding [info] [2020-01-16 09:51:39,706] [INFO] [sample.sharding.kafka.UserEvents$] [KafkaToSharding-akka.actor.default-dispatcher-17] [akka://KafkaToSharding/system/sharding/user-processing/2/60] - user 60 purchase cat t-shirt, quantity 2, price 9375 [info] [2020-01-16 09:51:40,732] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-17] [akka://KafkaToSharding/user/kafka-event-processor] - entityId->partition 75->1 [info] [2020-01-16 09:51:40,732] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-17] [akka://KafkaToSharding/user/kafka-event-processor] - Forwarding message for entity 75 to cluster sharding

Actual Behavior

[info] [2020-07-08 18:56:40,587] [INFO] [akka.actor.RepointableActorRef] [KafkaToSharding-akka.actor.default-dispatcher-5] [akka://KafkaToSharding/system/kafka-consumer-1] - Message [akka.kafka.internal.KafkaConsumerActor$Internal$StopFromStage] from Actor[akka://KafkaToSharding/system/Materializers/StreamSupervisor-0/$$a#1729317594] to Actor[akka://KafkaToSharding/system/kafka-consumer-1#1483160912] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://KafkaToSharding/system/kafka-consumer-1#1483160912] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Relevant logs

~/ScalaProjects/akka-sample-kafka-to-sharding-scala $ sbt "processor / run 2551 8551 8081" [info] Loading global plugins from /home/eugene/.sbt/1.0/plugins [info] Loading settings for project akka-sample-kafka-to-sharding-scala-build from plugins.sbt ... [info] Loading project definition from /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/project [info] Loading settings for project akka-sample-kafka-to-sharding from build.sbt ... [info] Set current project to akka-sample-kafka-to-sharding (in build file:/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/) [info] Compiling 2 protobuf files to /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main,/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main,/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main,/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main [info] Compiling schema /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/protobuf/users.proto [info] Compiling schema /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/protobuf/user-events.proto protoc-jar: protoc version: 3.7.1, detected platform: linux-x86_64 (linux/amd64) protoc-jar: embedded: bin/3.7.1/protoc-3.7.1-linux-x86_64.exe protoc-jar: executing: [/tmp/protocjar5741868647544472609/bin/protoc.exe, --plugin=protoc-gen-scala_0=/tmp/protocbridge4871978644069516449, --plugin=protoc-gen-akka-grpc-scaladsl-trait_1=/tmp/protocbridge4453544094864461486, --plugin=protoc-gen-akka-grpc-scaladsl-client_2=/tmp/protocbridge2444707610667080398, --plugin=protoc-gen-akka-grpc-scaladsl-server_3=/tmp/protocbridge1023303769398452281, --scala_0_out=flat_package:/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main, --akka-grpc-scaladsl-trait_1_out=flat_package:/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main, --akka-grpc-scaladsl-client_2_out=flat_package:/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main, --akka-grpc-scaladsl-server_3_out=flat_package:/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main, -I/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/protobuf, -I/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/proto, -I/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/protobuf_external, -I/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/protobuf_external, -I/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/protobuf, -I/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/proto, -I/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/protobuf_external, /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/protobuf/users.proto, /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/protobuf/user-events.proto]le / protocGenerate 0s /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/proto: warning: directory does not exist. /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/protobuf_external: warning: directory does not exist. /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/protobuf_external: warning: directory does not exist. /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/proto: warning: directory does not exist. /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/protobuf_external: warning: directory does not exist. [info] Generating Akka gRPC service interface for sample.sharding.kafka.UserService [info] Generating Akka gRPC client for sample.sharding.kafka.UserService [info] Generating Akka gRPC service handler for sample.sharding.kafka.UserService [info] Compiling protobuf [info] Protoc target directory: /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main [info] Protoc target directory: /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main [info] Protoc target directory: /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main [info] Protoc target directory: /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main [info] Compiling 15 Scala sources to /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/classes ... [warn] /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main/sample/sharding/kafka/UserServiceClient.scala:36:91: class ActorMaterializer in package stream is deprecated (since 2.6.0): The Materializer now has all methods the ActorMaterializer used to have [warn] private val clientState = new ClientState(settings, akka.event.Logging(mat.asInstanceOf[ActorMaterializer].system, this.getClass)) [warn] ^ [warn] one warning found [info] running (fork) sample.sharding.kafka.Main 2551 8551 8081 [info] [2020-07-08 18:55:32,397] [INFO] [akka.event.slf4j.Slf4jLogger] [KafkaToSharding-akka.actor.default-dispatcher-3] [] - Slf4jLogger started [error] SLF4J: A number (1) of logging calls during the initialization phase have been intercepted and are [error] SLF4J: now being replayed. These are subject to the filtering rules of the underlying logging system. [error] SLF4J: See also http://www.slf4j.org/codes.html#replay [info] [2020-07-08 18:55:32,610] [INFO] [akka.remote.artery.tcp.ArteryTcpTransport] [KafkaToSharding-akka.actor.default-dispatcher-3] [ArteryTcpTransport(akka://KafkaToSharding)] - Remoting started with transport [Artery tcp]; listening on address [akka://KafkaToSharding@127.0.0.1:2551] with UID [1857486658713966714] [info] [2020-07-08 18:55:32,629] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-3] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - Starting up, Akka version [2.6.6] ... [info] [2020-07-08 18:55:32,739] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-3] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - Registered cluster JMX MBean [akka:type=Cluster] [info] [2020-07-08 18:55:32,739] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-3] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - Started up successfully [info] [2020-07-08 18:55:32,786] [INFO] [akka.cluster.sbr.SplitBrainResolver] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka://KafkaToSharding/system/cluster/core/daemon/downingProvider] - SBR started. Config: stableAfter: 20000 ms, strategy: KeepMajority, selfUniqueAddress: UniqueAddress(akka://KafkaToSharding@127.0.0.1:2551,1857486658713966714), selfDc: default [info] [2020-07-08 18:55:33,314] [INFO] [akka.management.internal.HealthChecksImpl] [KafkaToSharding-akka.actor.default-dispatcher-14] [HealthChecksImpl(akka://KafkaToSharding)] - Loading readiness checks List(NamedHealthCheck(cluster-membership,akka.management.cluster.scaladsl.ClusterMembershipCheck)) [info] [2020-07-08 18:55:33,315] [INFO] [akka.management.internal.HealthChecksImpl] [KafkaToSharding-akka.actor.default-dispatcher-14] [HealthChecksImpl(akka://KafkaToSharding)] - Loading liveness checks List() [info] [2020-07-08 18:55:33,401] [WARN] [akka.stream.Materializer] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka.stream.Log(akka://KafkaToSharding/system/Materializers/StreamSupervisor-1)] - [outbound connection to [akka://KafkaToSharding@127.0.0.1:2552], control stream] Upstream failed, cause: StreamTcpException: Tcp command [Connect(127.0.0.1:2552,None,List(),Some(5000 milliseconds),true)] failed because of java.net.ConnectException: Connection refused [info] [2020-07-08 18:55:33,401] [WARN] [akka.stream.Materializer] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka.stream.Log(akka://KafkaToSharding/system/Materializers/StreamSupervisor-1)] - [outbound connection to [akka://KafkaToSharding@127.0.0.1:2552], message stream] Upstream failed, cause: StreamTcpException: Tcp command [Connect(127.0.0.1:2552,None,List(),Some(5000 milliseconds),true)] failed because of java.net.ConnectException: Connection refused [info] [2020-07-08 18:55:33,416] [INFO] [akka.management.scaladsl.AkkaManagement] [KafkaToSharding-akka.actor.default-dispatcher-14] [AkkaManagement(akka://KafkaToSharding)] - Binding Akka Management (HTTP) endpoint to: 127.0.0.1:8551 [info] [2020-07-08 18:55:33,479] [INFO] [akka.management.scaladsl.AkkaManagement] [KafkaToSharding-akka.actor.default-dispatcher-14] [AkkaManagement(akka://KafkaToSharding)] - Including HTTP management routes for ClusterHttpManagementRouteProvider [info] [2020-07-08 18:55:33,521] [INFO] [akka.management.scaladsl.AkkaManagement] [KafkaToSharding-akka.actor.default-dispatcher-14] [AkkaManagement(akka://KafkaToSharding)] - Including HTTP management routes for HealthCheckRoutes [info] [2020-07-08 18:55:34,053] [INFO] [akka.management.scaladsl.AkkaManagement] [KafkaToSharding-akka.actor.default-dispatcher-21] [AkkaManagement(akka://KafkaToSharding)] - Bound Akka Management (HTTP) endpoint to: 127.0.0.1:8551 [info] [2020-07-08 18:55:34,498] [INFO] [akka.actor.ActorSystemImpl] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka.actor.ActorSystemImpl(KafkaToSharding)] - Retrieved 128 partitions for topic 'user-events' [info] [2020-07-08 18:55:34,498] [INFO] [akka.actor.typed.ActorSystem] [KafkaToSharding-akka.actor.default-dispatcher-5] [] - Message extractor created. Initializing sharding [info] [2020-07-08 18:55:34,520] [INFO] [akka.cluster.sharding.typed.scaladsl.ClusterSharding] [KafkaToSharding-akka.actor.default-dispatcher-14] [ClusterSharding(akka://KafkaToSharding)] - Starting Shard Region [user-processing]... [info] [2020-07-08 18:55:34,549] [INFO] [sample.sharding.kafka.Main$] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka://KafkaToSharding/user] - Sharding has started [info] [2020-07-08 18:55:34,556] [INFO] [akka.cluster.sharding.ShardRegion] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka://KafkaToSharding@127.0.0.1:2551/system/sharding/user-processing] - user-processing: Idle entities will be passivated after [2.000 min] [info] [2020-07-08 18:55:37,891] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-5] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - Node [akka://KafkaToSharding@127.0.0.1:2551] is JOINING itself (with roles [dc-default]) and forming new cluster [info] [2020-07-08 18:55:37,892] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-5] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - is the new leader among reachable nodes (more leaders may exist) [info] [2020-07-08 18:55:37,898] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-5] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - Leader is moving node [akka://KafkaToSharding@127.0.0.1:2551] to [Up] [info] [2020-07-08 18:55:37,903] [INFO] [sample.sharding.kafka.Main$] [KafkaToSharding-akka.actor.default-dispatcher-5] [akka://KafkaToSharding/user] - Member has joined the cluster [info] [2020-07-08 18:55:37,904] [INFO] [sample.sharding.kafka.Main$] [KafkaToSharding-akka.actor.default-dispatcher-5] [akka://KafkaToSharding/user] - Sharding started and joined cluster. Starting event processor [info] [2020-07-08 18:55:37,908] [INFO] [akka.cluster.sbr.SplitBrainResolver] [KafkaToSharding-akka.actor.default-dispatcher-3] [akka://KafkaToSharding/system/cluster/core/daemon/downingProvider] - This node is now the leader responsible for taking SBR decisions among the reachable nodes (more leaders may exist). [info] [2020-07-08 18:55:37,917] [INFO] [akka.cluster.singleton.ClusterSingletonManager] [KafkaToSharding-akka.actor.default-dispatcher-3] [akka://KafkaToSharding@127.0.0.1:2551/system/sharding/user-processingCoordinator] - Singleton manager starting singleton actor [akka://KafkaToSharding/system/sharding/user-processingCoordinator/singleton] [info] [2020-07-08 18:55:37,920] [INFO] [akka.cluster.singleton.ClusterSingletonManager] [KafkaToSharding-akka.actor.default-dispatcher-3] [akka://KafkaToSharding@127.0.0.1:2551/system/sharding/user-processingCoordinator] - ClusterSingletonManager state change [Start -> Oldest] [info] [2020-07-08 18:55:37,936] [INFO] [akka.cluster.sharding.DDataShardCoordinator] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka://KafkaToSharding@127.0.0.1:2551/system/sharding/user-processingCoordinator/singleton/coordinator] - ShardCoordinator was moved to the active state State(Map()) [info] [2020-07-08 18:55:37,942] [INFO] [akka.kafka.internal.SingleSourceLogic] [KafkaToSharding-akka.actor.default-dispatcher-21] [SingleSourceLogic(akka://KafkaToSharding)] - [117ff] Starting. StageActor Actor[akka://KafkaToSharding/system/Materializers/StreamSupervisor-0/$$a#1729317594] [info] [2020-07-08 18:55:42,312] [INFO] [akka.kafka.cluster.sharding.KafkaClusterSharding$RebalanceListener$] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka://KafkaToSharding/system/kafka-cluster-sharding-rebalance-listener-user-processing] - Consumer group 'user-processing' assigned topic partitions to cluster member 'akka://KafkaToSharding@127.0.0.1:2551': [user-events-16,user-events-29,user-events-44,user-events-101,user-events-112,user-events-94,user-events-114,user-events-83,user-events-0,user-events-13,user-events-75,user-events-8,user-events-72,user-events-37,user-events-42,user-events-108,user-events-34,user-events-64,user-events-58,user-events-61,user-events-113,user-events-87,user-events-119,user-events-77,user-events-71,user-events-95,user-events-50,user-events-24,user-events-124,user-events-53,user-events-47,user-events-20,user-events-106,user-events-122,user-events-27,user-events-104,user-events-7,user-events-115,user-events-3,user-events-15,user-events-4,user-events-23,user-events-19,user-events-88,user-events-84,user-events-11,user-events-68,user-events-38,user-events-105,user-events-118,user-events-33,user-events-49,user-events-67,user-events-100,user-events-62,user-events-111,user-events-121,user-events-127,user-events-41,user-events-54,user-events-57,user-events-78,user-events-10,user-events-14,user-events-2,user-events-6,user-events-117,user-events-123,user-events-107,user-events-98,user-events-126,user-events-81,user-events-21,user-events-5,user-events-32,user-events-97,user-events-96,user-events-74,user-events-66,user-events-103,user-events-40,user-events-110,user-events-26,user-events-85,user-events-79,user-events-69,user-events-48,user-events-93,user-events-63,user-events-92,user-events-56,user-events-55,user-events-45,user-events-18,user-events-82,user-events-120,user-events-31,user-events-35,user-events-39,user-events-99,user-events-90,user-events-109,user-events-125,user-events-76,user-events-80,user-events-43,user-events-12,user-events-1,user-events-25,user-events-116,user-events-17,user-events-28,user-events-70,user-events-36,user-events-89,user-events-73,user-events-51,user-events-60,user-events-102,user-events-9,user-events-65,user-events-91,user-events-30,user-events-59,user-events-86,user-events-46,user-events-52,user-events-22] [info] [2020-07-08 18:55:42,443] [ERROR] [akka.dispatch.Dispatcher] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka.dispatch.Dispatcher] - Unsupported access to ActorContext from the outside of Actor[akka://KafkaToSharding/system/kafka-cluster-sharding-rebalance-listener-user-processing#-748606125]. No message is currently processed by the actor, but ActorContext was called from Thread[KafkaToSharding-akka.actor.default-dispatcher-14,5,main]. [info] java.lang.UnsupportedOperationException: Unsupported access to ActorContext from the outside of Actor[akka://KafkaToSharding/system/kafka-cluster-sharding-rebalance-listener-user-processing#-748606125]. No message is currently processed by the actor, but ActorContext was called from Thread[KafkaToSharding-akka.actor.default-dispatcher-14,5,main]. [info] at akka.actor.typed.internal.ActorContextImpl.checkCurrentActorThread(ActorContextImpl.scala:317) [info] at akka.actor.typed.internal.ActorContextImpl.checkCurrentActorThread$(ActorContextImpl.scala:305) [info] at akka.actor.typed.internal.adapter.ActorContextAdapter.checkCurrentActorThread(ActorContextAdapter.scala:49) [info] at akka.actor.typed.internal.ActorContextImpl.log(ActorContextImpl.scala:161) [info] at akka.actor.typed.internal.ActorContextImpl.log$(ActorContextImpl.scala:160) [info] at akka.actor.typed.internal.adapter.ActorContextAdapter.log(ActorContextAdapter.scala:49) [info] at akka.kafka.cluster.sharding.KafkaClusterSharding$RebalanceListener$.$anonfun$apply$4(KafkaClusterSharding.scala:314) [info] at akka.kafka.cluster.sharding.KafkaClusterSharding$RebalanceListener$.$anonfun$apply$4$adapted(KafkaClusterSharding.scala:312) [info] at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:447) [info] at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:56) [info] at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:93) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) [info] at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:94) [info] at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:93) [info] at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:48) [info] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) [info] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [info] at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [info] at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [info] at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) [info] [2020-07-08 18:55:52,896] [WARN] [akka.remote.artery.Association] [KafkaToSharding-akka.actor.default-dispatcher-14] [Association(akka://KafkaToSharding)] - Outbound control stream to [akka://KafkaToSharding@127.0.0.1:2552] failed. Restarting it. akka.remote.artery.OutboundHandshake$HandshakeTimeoutException: Handshake with [akka://KafkaToSharding@127.0.0.1:2552] did not complete within 20000 ms [info] [2020-07-08 18:56:40,565] [INFO] [akka.kafka.internal.SingleSourceLogic] [KafkaToSharding-akka.actor.default-dispatcher-5] [SingleSourceLogic(akka://KafkaToSharding)] - [117ff] Completing [info] [2020-07-08 18:56:40,566] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka://KafkaToSharding/user/kafka-event-processor] - Consumer stopped Failure(java.lang.UnsupportedOperationException: Unsupported access to ActorContext from the outside of Actor[akka://KafkaToSharding/user/kafka-event-processor#1200911144]. No message is currently processed by the actor, but ActorContext was called from Thread[KafkaToSharding-akka.actor.default-dispatcher-14,5,main].) [info] [2020-07-08 18:56:40,569] [INFO] [akka.kafka.cluster.sharding.KafkaClusterSharding$RebalanceListener$] [KafkaToSharding-akka.actor.default-dispatcher-5] [akka://KafkaToSharding/system/kafka-cluster-sharding-rebalance-listener-user-processing] - Consumer group 'user-processing' revoked topic partitions from cluster member 'akka://KafkaToSharding@127.0.0.1:2551': [user-events-16,user-events-29,user-events-44,user-events-101,user-events-112,user-events-94,user-events-114,user-events-83,user-events-0,user-events-13,user-events-75,user-events-8,user-events-72,user-events-37,user-events-42,user-events-108,user-events-34,user-events-64,user-events-58,user-events-61,user-events-113,user-events-87,user-events-119,user-events-77,user-events-71,user-events-95,user-events-50,user-events-24,user-events-124,user-events-53,user-events-47,user-events-20,user-events-106,user-events-122,user-events-27,user-events-104,user-events-7,user-events-115,user-events-3,user-events-15,user-events-4,user-events-23,user-events-19,user-events-88,user-events-84,user-events-11,user-events-68,user-events-38,user-events-105,user-events-118,user-events-33,user-events-49,user-events-67,user-events-100,user-events-62,user-events-111,user-events-121,user-events-127,user-events-41,user-events-54,user-events-57,user-events-78,user-events-10,user-events-14,user-events-2,user-events-6,user-events-117,user-events-123,user-events-107,user-events-98,user-events-126,user-events-81,user-events-21,user-events-5,user-events-32,user-events-97,user-events-96,user-events-74,user-events-66,user-events-103,user-events-40,user-events-110,user-events-26,user-events-85,user-events-79,user-events-69,user-events-48,user-events-93,user-events-63,user-events-92,user-events-56,user-events-55,user-events-45,user-events-18,user-events-82,user-events-120,user-events-31,user-events-35,user-events-39,user-events-99,user-events-90,user-events-109,user-events-125,user-events-76,user-events-80,user-events-43,user-events-12,user-events-1,user-events-25,user-events-116,user-events-17,user-events-28,user-events-70,user-events-36,user-events-89,user-events-73,user-events-51,user-events-60,user-events-102,user-events-9,user-events-65,user-events-91,user-events-30,user-events-59,user-events-86,user-events-46,user-events-52,user-events-22] [info] [2020-07-08 18:56:40,587] [INFO] [akka.actor.RepointableActorRef] [KafkaToSharding-akka.actor.default-dispatcher-5] [akka://KafkaToSharding/system/kafka-consumer-1] - Message [akka.kafka.internal.KafkaConsumerActor$Internal$StopFromStage] from Actor[akka://KafkaToSharding/system/Materializers/StreamSupervisor-0/$$a#1729317594] to Actor[akka://KafkaToSharding/system/kafka-consumer-1#1483160912] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://KafkaToSharding/system/kafka-consumer-1#1483160912] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Reproducible Test Case

Please provide a PR with a failing test.

If the issue is more complex or requires configuration, please provide a link to a project that reproduces the issue.

raboof commented 4 years ago

Thanks for the report!

This is a problem with the combination of alpakka-kafka and the actorcontext protection feature added in Akka 2.6.6 (https://github.com/akka/akka/pull/27112)

This particular problem has been fixed in alpakka-kafka in https://github.com/akka/alpakka-kafka/pull/1138/files, and a similar problem in the example has been fixed in #218.

For now the workaround is to downgrade to Akka 2.6.5 - but we hope to release alpakka-kafka 2.0.4 soon, which will also fix the issue.

evgkarasev commented 4 years ago

Thank you for prompt response!

Actually downgrading to Akka 2.6.5 doesn't work - cluster doesn't start. As working workaround could confirm just commenting out all ctx.log.info("...") calls in processor module. It saves complete use case scenario in working state.

marciomarinho commented 4 years ago

I can confirm the sample does not work. I have just checked out the project and ran all steps according to the README.md

Versions used:

val AkkaVersion = "2.6.8" val AlpakkaKafkaVersion = "2.0.4" val AkkaManagementVersion = "1.0.5" val AkkaHttpVersion = "10.1.11" val EmbeddedKafkaVersion = "2.4.1.1" val LogbackVersion = "1.2.3"

Tested with JDKs:

openjdk version "11.0.7" 2020-04-14 LTS OpenJDK Runtime Environment Corretto-11.0.7.10.1 (build 11.0.7+10-LTS) OpenJDK 64-Bit Server VM Corretto-11.0.7.10.1 (build 11.0.7+10-LTS, mixed mode)

java version "1.8.0_261" Java(TM) SE Runtime Environment (build 1.8.0_261-b12) Java HotSpot(TM) 64-Bit Server VM (build 25.261-b12, mixed mode)


sbt "kafka / run"

[info] Loading settings for project akka-sample-kafka-to-sharding-scala-build from plugins.sbt ...
[info] Loading project definition from /Users/marciomarinho/projects/akka/akka-samples/akka-sample-kafka-to-sharding-scala/project
[info] Loading settings for project akka-sample-kafka-to-sharding from build.sbt ...
[info] Set current project to akka-sample-kafka-to-sharding (in build file:/Users/marciomarinho/projects/akka/akka-samples/akka-sample-kafka-to-sharding-scala/)
[info] Compiling 1 Scala source to /Users/marciomarinho/projects/akka/akka-samples/akka-sample-kafka-to-sharding-scala/kafka/target/scala-2.13/classes ...
[info] running sample.sharding.embeddedkafka.KafkaBroker
09:29:11.668 INFO  [run-main-0          ] s.s.embeddedkafka.KafkaBroker$        Kafka running: localhost:9092
09:29:11.668 INFO  [run-main-0          ] s.s.embeddedkafka.KafkaBroker$        Topic 'user-events' with 128 partitions created

sbt "processor / run 2551 8551 8081"

marciomarinho@MacBook-Pro-3 akka-sample-kafka-to-sharding-scala % sbt "processor / run 2551 8551 8081"
[info] Loading settings for project akka-sample-kafka-to-sharding-scala-build from plugins.sbt ...
[info] Loading project definition from /Users/marciomarinho/projects/akka/akka-samples/akka-sample-kafka-to-sharding-scala/project
[info] Loading settings for project akka-sample-kafka-to-sharding from build.sbt ...
[info] Set current project to akka-sample-kafka-to-sharding (in build file:/Users/marciomarinho/projects/akka/akka-samples/akka-sample-kafka-to-sharding-scala/)
[info] Compiling 15 Scala sources to /Users/marciomarinho/projects/akka/akka-samples/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/classes ...
[warn] /Users/marciomarinho/projects/akka/akka-samples/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main/sample/sharding/kafka/UserServiceClient.scala:36:91: class ActorMaterializer in package stream is deprecated (since 2.6.0): The Materializer now has all methods the ActorMaterializer used to have
[warn]   private val clientState = new ClientState(settings, akka.event.Logging(mat.asInstanceOf[ActorMaterializer].system, this.getClass))
[warn]                                                                                           ^
[warn] one warning found
[info] running (fork) sample.sharding.kafka.Main 2551 8551 8081
[error] [jetty-alpn-agent] Could not find a matching alpn-boot JAR for Java version: 11.0.7
[error] SLF4J: A number (1) of logging calls during the initialization phase have been intercepted and are
[error] SLF4J: now being replayed. These are subject to the filtering rules of the underlying logging system.
[error] SLF4J: See also http://www.slf4j.org/codes.html#replay
[info] [2020-08-28 09:29:34,390] [INFO] [akka.event.slf4j.Slf4jLogger] [KafkaToSharding-akka.actor.default-dispatcher-3] [] - Slf4jLogger started
[info] [2020-08-28 09:29:34,614] [INFO] [akka.remote.artery.tcp.ArteryTcpTransport] [KafkaToSharding-akka.actor.default-dispatcher-3] [ArteryTcpTransport(akka://KafkaToSharding)] - Remoting started with transport [Artery tcp]; listening on address [akka://KafkaToSharding@127.0.0.1:2551] with UID [1441903511253970268]
[info] [2020-08-28 09:29:34,629] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-3] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - Starting up, Akka version [2.6.8] ...
[info] [2020-08-28 09:29:34,746] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-3] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - Registered cluster JMX MBean [akka:type=Cluster]
[info] [2020-08-28 09:29:34,747] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-3] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - Started up successfully
[info] [2020-08-28 09:29:34,795] [INFO] [akka.cluster.sbr.SplitBrainResolver] [KafkaToSharding-akka.actor.default-dispatcher-5] [akka://KafkaToSharding/system/cluster/core/daemon/downingProvider] - SBR started. Config: stableAfter: 20000 ms, strategy: KeepMajority, selfUniqueAddress: UniqueAddress(akka://KafkaToSharding@127.0.0.1:2551,1441903511253970268), selfDc: default
[info] [2020-08-28 09:29:35,188] [INFO] [akka.management.internal.HealthChecksImpl] [KafkaToSharding-akka.actor.default-dispatcher-5] [HealthChecksImpl(akka://KafkaToSharding)] - Loading readiness checks List(NamedHealthCheck(cluster-membership,akka.management.cluster.scaladsl.ClusterMembershipCheck))
[info] [2020-08-28 09:29:35,189] [INFO] [akka.management.internal.HealthChecksImpl] [KafkaToSharding-akka.actor.default-dispatcher-5] [HealthChecksImpl(akka://KafkaToSharding)] - Loading liveness checks List()
[info] [2020-08-28 09:29:35,254] [WARN] [akka.stream.Materializer] [KafkaToSharding-akka.actor.default-dispatcher-5] [akka.stream.Log(akka://KafkaToSharding/system/Materializers/StreamSupervisor-1)] - [outbound connection to [akka://KafkaToSharding@127.0.0.1:2552], message stream] Upstream failed, cause: StreamTcpException: Tcp command [Connect(127.0.0.1:2552,None,List(),Some(5000 milliseconds),true)] failed because of java.net.ConnectException: Connection refused
[info] [2020-08-28 09:29:35,254] [WARN] [akka.stream.Materializer] [KafkaToSharding-akka.actor.default-dispatcher-5] [akka.stream.Log(akka://KafkaToSharding/system/Materializers/StreamSupervisor-1)] - [outbound connection to [akka://KafkaToSharding@127.0.0.1:2552], control stream] Upstream failed, cause: StreamTcpException: Tcp command [Connect(127.0.0.1:2552,None,List(),Some(5000 milliseconds),true)] failed because of java.net.ConnectException: Connection refused
[info] [2020-08-28 09:29:35,255] [INFO] [akka.management.scaladsl.AkkaManagement] [KafkaToSharding-akka.actor.default-dispatcher-26] [AkkaManagement(akka://KafkaToSharding)] - Binding Akka Management (HTTP) endpoint to: 127.0.0.1:8551
[info] [2020-08-28 09:29:35,298] [INFO] [akka.management.scaladsl.AkkaManagement] [KafkaToSharding-akka.actor.default-dispatcher-26] [AkkaManagement(akka://KafkaToSharding)] - Including HTTP management routes for ClusterHttpManagementRouteProvider
[info] [2020-08-28 09:29:35,332] [INFO] [akka.management.scaladsl.AkkaManagement] [KafkaToSharding-akka.actor.default-dispatcher-26] [AkkaManagement(akka://KafkaToSharding)] - Including HTTP management routes for HealthCheckRoutes
[info] [2020-08-28 09:29:35,686] [INFO] [akka.management.scaladsl.AkkaManagement] [KafkaToSharding-akka.actor.default-dispatcher-5] [AkkaManagement(akka://KafkaToSharding)] - Bound Akka Management (HTTP) endpoint to: 127.0.0.1:8551
[info] [2020-08-28 09:29:35,963] [INFO] [akka.actor.ActorSystemImpl] [KafkaToSharding-akka.actor.default-dispatcher-3] [akka.actor.ActorSystemImpl(KafkaToSharding)] - Retrieved 128 partitions for topic 'user-events'
[info] [2020-08-28 09:29:35,963] [INFO] [akka.actor.typed.ActorSystem] [KafkaToSharding-akka.actor.default-dispatcher-27] [] - Message extractor created. Initializing sharding
[info] [2020-08-28 09:29:35,982] [INFO] [akka.cluster.sharding.typed.scaladsl.ClusterSharding] [KafkaToSharding-akka.actor.default-dispatcher-3] [ClusterSharding(akka://KafkaToSharding)] - Starting Shard Region [user-processing]...
[info] [2020-08-28 09:29:36,006] [INFO] [sample.sharding.kafka.Main$] [KafkaToSharding-akka.actor.default-dispatcher-30] [akka://KafkaToSharding/user] - Sharding has started
[info] [2020-08-28 09:29:36,012] [INFO] [akka.cluster.sharding.ShardRegion] [KafkaToSharding-akka.actor.default-dispatcher-30] [akka://KafkaToSharding@127.0.0.1:2551/system/sharding/user-processing] - user-processing: Idle entities will be passivated after [2.000 min]
[info] [2020-08-28 09:29:39,906] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-27] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - Node [akka://KafkaToSharding@127.0.0.1:2551] is JOINING itself (with roles [dc-default]) and forming new cluster
[info] [2020-08-28 09:29:39,907] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-27] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - is the new leader among reachable nodes (more leaders may exist)
[info] [2020-08-28 09:29:39,913] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-27] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - Leader is moving node [akka://KafkaToSharding@127.0.0.1:2551] to [Up]
[info] [2020-08-28 09:29:39,918] [INFO] [sample.sharding.kafka.Main$] [KafkaToSharding-akka.actor.default-dispatcher-30] [akka://KafkaToSharding/user] - Member has joined the cluster
[info] [2020-08-28 09:29:39,918] [INFO] [sample.sharding.kafka.Main$] [KafkaToSharding-akka.actor.default-dispatcher-30] [akka://KafkaToSharding/user] - Sharding started and joined cluster. Starting event processor
[info] [2020-08-28 09:29:39,920] [INFO] [akka.cluster.sbr.SplitBrainResolver] [KafkaToSharding-akka.actor.default-dispatcher-5] [akka://KafkaToSharding/system/cluster/core/daemon/downingProvider] - This node is now the leader responsible for taking SBR decisions among the reachable nodes (more leaders may exist).
[info] [2020-08-28 09:29:39,926] [INFO] [akka.cluster.singleton.ClusterSingletonManager] [KafkaToSharding-akka.actor.default-dispatcher-3] [akka://KafkaToSharding@127.0.0.1:2551/system/sharding/user-processingCoordinator] - Singleton manager starting singleton actor [akka://KafkaToSharding/system/sharding/user-processingCoordinator/singleton]
[info] [2020-08-28 09:29:39,926] [INFO] [akka.cluster.singleton.ClusterSingletonManager] [KafkaToSharding-akka.actor.default-dispatcher-3] [akka://KafkaToSharding@127.0.0.1:2551/system/sharding/user-processingCoordinator] - ClusterSingletonManager state change [Start -> Oldest]
[info] [2020-08-28 09:29:39,939] [INFO] [akka.cluster.sharding.DDataShardCoordinator] [KafkaToSharding-akka.actor.default-dispatcher-5] [akka://KafkaToSharding@127.0.0.1:2551/system/sharding/user-processingCoordinator/singleton/coordinator] - ShardCoordinator was moved to the active state State(Map())
[info] [2020-08-28 09:29:39,944] [INFO] [akka.kafka.internal.SingleSourceLogic] [KafkaToSharding-akka.actor.default-dispatcher-27] [SingleSourceLogic(akka://KafkaToSharding)] - [0af7c] Starting. StageActor Actor[akka://KafkaToSharding/system/Materializers/StreamSupervisor-0/$$a#-690489210]
[info] [2020-08-28 09:29:44,368] [INFO] [akka.kafka.cluster.sharding.KafkaClusterSharding$RebalanceListener$] [KafkaToSharding-akka.actor.default-dispatcher-5] [] - Consumer group 'user-processing' assigned topic partitions to cluster member 'akka://KafkaToSharding@127.0.0.1:2551': [user-events-16,user-events-29,user-events-44,user-events-101,user-events-112,user-events-94,user-events-114,user-events-83,user-events-0,user-events-13,user-events-75,user-events-8,user-events-72,user-events-37,user-events-42,user-events-108,user-events-34,user-events-64,user-events-58,user-events-61,user-events-113,user-events-87,user-events-119,user-events-77,user-events-71,user-events-95,user-events-50,user-events-24,user-events-124,user-events-53,user-events-47,user-events-20,user-events-106,user-events-122,user-events-27,user-events-104,user-events-7,user-events-115,user-events-3,user-events-15,user-events-4,user-events-23,user-events-19,user-events-88,user-events-84,user-events-11,user-events-68,user-events-38,user-events-105,user-events-118,user-events-33,user-events-49,user-events-67,user-events-100,user-events-62,user-events-111,user-events-121,user-events-127,user-events-41,user-events-54,user-events-57,user-events-78,user-events-10,user-events-14,user-events-2,user-events-6,user-events-117,user-events-123,user-events-107,user-events-98,user-events-126,user-events-81,user-events-21,user-events-5,user-events-32,user-events-97,user-events-96,user-events-74,user-events-66,user-events-103,user-events-40,user-events-110,user-events-26,user-events-85,user-events-79,user-events-69,user-events-48,user-events-93,user-events-63,user-events-92,user-events-56,user-events-55,user-events-45,user-events-18,user-events-82,user-events-120,user-events-31,user-events-35,user-events-39,user-events-99,user-events-90,user-events-109,user-events-125,user-events-76,user-events-80,user-events-43,user-events-12,user-events-1,user-events-25,user-events-116,user-events-17,user-events-28,user-events-70,user-events-36,user-events-89,user-events-73,user-events-51,user-events-60,user-events-102,user-events-9,user-events-65,user-events-91,user-events-30,user-events-59,user-events-86,user-events-46,user-events-52,user-events-22]
[info] [2020-08-28 09:29:44,429] [INFO] [akka.kafka.cluster.sharding.KafkaClusterSharding$RebalanceListener$] [KafkaToSharding-akka.actor.default-dispatcher-5] [] - Completed consumer group 'user-processing' assignment of topic partitions to cluster member 'akka://KafkaToSharding@127.0.0.1:2551': [user-events-16,user-events-29,user-events-44,user-events-101,user-events-112,user-events-94,user-events-114,user-events-83,user-events-0,user-events-13,user-events-75,user-events-8,user-events-72,user-events-37,user-events-42,user-events-108,user-events-34,user-events-64,user-events-58,user-events-61,user-events-113,user-events-87,user-events-119,user-events-77,user-events-71,user-events-95,user-events-50,user-events-24,user-events-124,user-events-53,user-events-47,user-events-20,user-events-106,user-events-122,user-events-27,user-events-104,user-events-7,user-events-115,user-events-3,user-events-15,user-events-4,user-events-23,user-events-19,user-events-88,user-events-84,user-events-11,user-events-68,user-events-38,user-events-105,user-events-118,user-events-33,user-events-49,user-events-67,user-events-100,user-events-62,user-events-111,user-events-121,user-events-127,user-events-41,user-events-54,user-events-57,user-events-78,user-events-10,user-events-14,user-events-2,user-events-6,user-events-117,user-events-123,user-events-107,user-events-98,user-events-126,user-events-81,user-events-21,user-events-5,user-events-32,user-events-97,user-events-96,user-events-74,user-events-66,user-events-103,user-events-40,user-events-110,user-events-26,user-events-85,user-events-79,user-events-69,user-events-48,user-events-93,user-events-63,user-events-92,user-events-56,user-events-55,user-events-45,user-events-18,user-events-82,user-events-120,user-events-31,user-events-35,user-events-39,user-events-99,user-events-90,user-events-109,user-events-125,user-events-76,user-events-80,user-events-43,user-events-12,user-events-1,user-events-25,user-events-116,user-events-17,user-events-28,user-events-70,user-events-36,user-events-89,user-events-73,user-events-51,user-events-60,user-events-102,user-events-9,user-events-65,user-events-91,user-events-30,user-events-59,user-events-86,user-events-46,user-events-52,user-events-22]
[info] [2020-08-28 09:29:54,875] [WARN] [akka.remote.artery.Association] [KafkaToSharding-akka.actor.default-dispatcher-5] [Association(akka://KafkaToSharding)] - Outbound control stream to [akka://KafkaToSharding@127.0.0.1:2552] failed. Restarting it. akka.remote.artery.OutboundHandshake$HandshakeTimeoutException: Handshake with [akka://KafkaToSharding@127.0.0.1:2552] did not complete within 20000 ms

sbt "producer / run"

marciomarinho@MacBook-Pro-3 akka-sample-kafka-to-sharding-scala % sbt "producer / run"
[info] Loading settings for project akka-sample-kafka-to-sharding-scala-build from plugins.sbt ...
[info] Loading project definition from /Users/marciomarinho/projects/akka/akka-samples/akka-sample-kafka-to-sharding-scala/project
[info] Loading settings for project akka-sample-kafka-to-sharding from build.sbt ...
[info] Set current project to akka-sample-kafka-to-sharding (in build file:/Users/marciomarinho/projects/akka/akka-samples/akka-sample-kafka-to-sharding-scala/)
[info] Compiling 5 Scala sources to /Users/marciomarinho/projects/akka/akka-samples/akka-sample-kafka-to-sharding-scala/producer/target/scala-2.13/classes ...
[info] running sharding.kafka.producer.UserEventProducer
[INFO] [08/28/2020 09:32:28.256] [UserEventProducer-akka.actor.default-dispatcher-5] [UserEventProducer(akka://UserEventProducer)] Sending message to user 125
[INFO] [08/28/2020 09:32:29.260] [UserEventProducer-akka.actor.default-dispatcher-4] [UserEventProducer(akka://UserEventProducer)] Sending message to user 56
[INFO] [08/28/2020 09:32:30.282] [UserEventProducer-akka.actor.default-dispatcher-4] [UserEventProducer(akka://UserEventProducer)] Sending message to user 48
[INFO] [08/28/2020 09:32:31.302] [UserEventProducer-akka.actor.default-dispatcher-5] [UserEventProducer(akka://UserEventProducer)] Sending message to user 55
[INFO] [08/28/2020 09:32:32.323] [UserEventProducer-akka.actor.default-dispatcher-5] [UserEventProducer(akka://UserEventProducer)] Sending message to user 134
[INFO] [08/28/2020 09:32:33.342] [UserEventProducer-akka.actor.default-dispatcher-4] [UserEventProducer(akka://UserEventProducer)] Sending message to user 122
[INFO] [08/28/2020 09:32:34.363] [UserEventProducer-akka.actor.default-dispatcher-4] [UserEventProducer(akka://UserEventProducer)] Sending message to user 199
[INFO] [08/28/2020 09:32:35.381] [UserEventProducer-akka.actor.default-dispatcher-4] [UserEventProducer(akka://UserEventProducer)] Sending message to user 17
[INFO] [08/28/2020 09:32:36.401] [UserEventProducer-akka.actor.default-dispatcher-5] [UserEventProducer(akka://UserEventProducer)] Sending message to user 137
[INFO] [08/28/2020 09:32:37.422] [UserEventProducer-akka.actor.default-dispatcher-4] [UserEventProducer(akka://UserEventProducer)] Sending message to user 60
[INFO] [08/28/2020 09:32:38.440] [UserEventProducer-akka.actor.default-dispatcher-5] [UserEventProducer(akka://UserEventProducer)] Sending message to user 48
[INFO] [08/28/2020 09:32:39.461] [UserEventProducer-akka.actor.default-dispatcher-5] [UserEventProducer(akka://UserEventProducer)] Sending message to user 199
[INFO] [08/28/2020 09:32:40.480] [UserEventProducer-akka.actor.default-dispatcher-5] [UserEventProducer(akka://UserEventProducer)] Sending message to user 70
[INFO] [08/28/2020 09:32:41.500] [UserEventProducer-akka.actor.default-dispatcher-5] [UserEventProducer(akka://UserEventProducer)] Sending message to user 15
[INFO] [08/28/2020 09:32:42.520] [UserEventProducer-akka.actor.default-dispatcher-4] [UserEventProducer(akka://UserEventProducer)] Sending message to user 11
[INFO] [08/28/2020 09:32:43.541] [UserEventProducer-akka.actor.default-dispatcher-4] [UserEventProducer(akka://UserEventProducer)] Sending message to user 108
[INFO] [08/28/2020 09:32:44.561] [UserEventProducer-akka.actor.default-dispatcher-5] [UserEventProducer(akka://UserEventProducer)] Sending message to user 165

After running the producer, I get a weird error on the Processor :

[info] [2020-08-28 09:32:28,372] [INFO] [akka.kafka.internal.SingleSourceLogic] [KafkaToSharding-akka.actor.default-dispatcher-5] [SingleSourceLogic(akka://KafkaToSharding)] - [0af7c] Completing
[info] [2020-08-28 09:32:28,373] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-30] [akka://KafkaToSharding/user/kafka-event-processor] - Consumer stopped Failure(java.lang.UnsupportedOperationException: Unsupported access to ActorContext from the outside of Actor[akka://KafkaToSharding/user/kafka-event-processor#-911055316]. No message is currently processed by the actor, but ActorContext was called from Thread[KafkaToSharding-akka.actor.default-dispatcher-30,5,main].)
[info] [2020-08-28 09:32:28,375] [INFO] [akka.kafka.cluster.sharding.KafkaClusterSharding$RebalanceListener$] [KafkaToSharding-akka.actor.default-dispatcher-5] [] - Consumer group 'user-processing' revoked topic partitions from cluster member 'akka://KafkaToSharding@127.0.0.1:2551': [user-events-16,user-events-29,user-events-44,user-events-101,user-events-112,user-events-94,user-events-114,user-events-83,user-events-0,user-events-13,user-events-75,user-events-8,user-events-72,user-events-37,user-events-42,user-events-108,user-events-34,user-events-64,user-events-58,user-events-61,user-events-113,user-events-87,user-events-119,user-events-77,user-events-71,user-events-95,user-events-50,user-events-24,user-events-124,user-events-53,user-events-47,user-events-20,user-events-106,user-events-122,user-events-27,user-events-104,user-events-7,user-events-115,user-events-3,user-events-15,user-events-4,user-events-23,user-events-19,user-events-88,user-events-84,user-events-11,user-events-68,user-events-38,user-events-105,user-events-118,user-events-33,user-events-49,user-events-67,user-events-100,user-events-62,user-events-111,user-events-121,user-events-127,user-events-41,user-events-54,user-events-57,user-events-78,user-events-10,user-events-14,user-events-2,user-events-6,user-events-117,user-events-123,user-events-107,user-events-98,user-events-126,user-events-81,user-events-21,user-events-5,user-events-32,user-events-97,user-events-96,user-events-74,user-events-66,user-events-103,user-events-40,user-events-110,user-events-26,user-events-85,user-events-79,user-events-69,user-events-48,user-events-93,user-events-63,user-events-92,user-events-56,user-events-55,user-events-45,user-events-18,user-events-82,user-events-120,user-events-31,user-events-35,user-events-39,user-events-99,user-events-90,user-events-109,user-events-125,user-events-76,user-events-80,user-events-43,user-events-12,user-events-1,user-events-25,user-events-116,user-events-17,user-events-28,user-events-70,user-events-36,user-events-89,user-events-73,user-events-51,user-events-60,user-events-102,user-events-9,user-events-65,user-events-91,user-events-30,user-events-59,user-events-86,user-events-46,user-events-52,user-events-22]
[info] [2020-08-28 09:32:28,395] [INFO] [akka.actor.RepointableActorRef] [KafkaToSharding-akka.actor.default-dispatcher-30] [akka://KafkaToSharding/system/kafka-consumer-1] - Message [akka.kafka.internal.KafkaConsumerActor$Internal$StopFromStage] from Actor[akka://KafkaToSharding/system/Materializers/StreamSupervisor-0/$$a#-690489210] to Actor[akka://KafkaToSharding/system/kafka-consumer-1#429195228] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://KafkaToSharding/system/kafka-consumer-1#429195228] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

It stops and does nothing.


I can also confirm that the workaround of downgrading Akka to 2.6.5 produces the following error, preventing the cluster to start:

marciomarinho@MacBook-Pro-3 akka-sample-kafka-to-sharding-scala % sbt "processor / run 2551 8551 8081"
[info] Loading settings for project akka-sample-kafka-to-sharding-scala-build from plugins.sbt ...
[info] Loading project definition from /Users/marciomarinho/projects/akka/akka-samples/akka-sample-kafka-to-sharding-scala/project
[info] Loading settings for project akka-sample-kafka-to-sharding from build.sbt ...
[info] Set current project to akka-sample-kafka-to-sharding (in build file:/Users/marciomarinho/projects/akka/akka-samples/akka-sample-kafka-to-sharding-scala/)
[info] running (fork) sample.sharding.kafka.Main 2551 8551 8081
[error] [jetty-alpn-agent] Could not find a matching alpn-boot JAR for Java version: 11.0.7
[error] SLF4J: A number (1) of logging calls during the initialization phase have been intercepted and are
[error] SLF4J: now being replayed. These are subject to the filtering rules of the underlying logging system.
[error] SLF4J: See also http://www.slf4j.org/codes.html#replay
[info] [2020-08-28 09:49:37,261] [INFO] [akka.event.slf4j.Slf4jLogger] [KafkaToSharding-akka.actor.default-dispatcher-3] [] - Slf4jLogger started
[info] [2020-08-28 09:49:37,442] [INFO] [akka.remote.artery.tcp.ArteryTcpTransport] [KafkaToSharding-akka.actor.default-dispatcher-3] [ArteryTcpTransport(akka://KafkaToSharding)] - Remoting started with transport [Artery tcp]; listening on address [akka://KafkaToSharding@127.0.0.1:2551] with UID [-4790482410178823478]
[info] [2020-08-28 09:49:37,455] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-3] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - Starting up, Akka version [2.6.6] ...
[info] [2020-08-28 09:49:37,571] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-3] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - Registered cluster JMX MBean [akka:type=Cluster]
[info] [2020-08-28 09:49:37,571] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-3] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - Started up successfully
[info] [2020-08-28 09:49:37,619] [INFO] [akka.cluster.sbr.SplitBrainResolver] [KafkaToSharding-akka.actor.default-dispatcher-6] [akka://KafkaToSharding/system/cluster/core/daemon/downingProvider] - SBR started. Config: stableAfter: 20000 ms, strategy: KeepMajority, selfUniqueAddress: UniqueAddress(akka://KafkaToSharding@127.0.0.1:2551,-4790482410178823478), selfDc: default
[info] [2020-08-28 09:49:37,621] [WARN] [akka.util.ManifestInfo] [KafkaToSharding-akka.actor.default-dispatcher-6] [ManifestInfo(akka://KafkaToSharding)] - You are using version 2.6.6 of Akka, but it appears you (perhaps indirectly) also depend on older versions of related artifacts. You can solve this by adding an explicit dependency on version 2.6.6 of the [akka-discovery, akka-stream-typed] artifacts to your project. See also: https://doc.akka.io/docs/akka/current/common/binary-compatibility-rules.html#mixed-versioning-is-not-allowed
[error] Exception in thread "main" java.lang.IllegalStateException: You are using version 2.6.6 of Akka, but it appears you (perhaps indirectly) also depend on older versions of related artifacts. You can solve this by adding an explicit dependency on version 2.6.6 of the [akka-discovery, akka-stream-typed] artifacts to your project. See also: https://doc.akka.io/docs/akka/current/common/binary-compatibility-rules.html#mixed-versioning-is-not-allowed
[error]     at akka.util.ManifestInfo.checkSameVersion(ManifestInfo.scala:217)
[error]     at akka.util.ManifestInfo.checkSameVersion(ManifestInfo.scala:195)
[error]     at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:1024)
[error]     at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:1013)
[error]     at akka.actor.ActorSystemImpl._start(ActorSystem.scala:1013)
[error]     at akka.actor.ActorSystemImpl.start(ActorSystem.scala:1036)
[error]     at akka.actor.typed.ActorSystem$.createInternal(ActorSystem.scala:290)
[error]     at akka.actor.typed.ActorSystem$.apply(ActorSystem.scala:204)
[error]     at sample.sharding.kafka.Main$.init(Main.scala:51)
[error]     at sample.sharding.kafka.Main$.main(Main.scala:32)
[error]     at sample.sharding.kafka.Main.main(Main.scala)
[info] [2020-08-28 09:49:37,627] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-3] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - Exiting completed
[info] [2020-08-28 09:49:37,628] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-3] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - Shutting down...
[info] [2020-08-28 09:49:37,632] [INFO] [akka.actor.LocalActorRef] [KafkaToSharding-akka.actor.default-dispatcher-3] [akka://KafkaToSharding/system/cluster/core/daemon] - Message [akka.cluster.ClusterUserAction$Leave] to Actor[akka://KafkaToSharding/system/cluster/core/daemon#-1463608750] was unhandled. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[info] [2020-08-28 09:49:37,632] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-3] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - Successfully shut down
[info] [2020-08-28 09:49:37,668] [INFO] [akka.remote.RemoteActorRefProvider$RemotingTerminator] [KafkaToSharding-akka.actor.default-dispatcher-3] [akka://KafkaToSharding@127.0.0.1:2551/system/remoting-terminator] - Shutting down remote daemon.
[info] [2020-08-28 09:49:37,669] [INFO] [akka.remote.RemoteActorRefProvider$RemotingTerminator] [KafkaToSharding-akka.actor.default-dispatcher-3] [akka://KafkaToSharding@127.0.0.1:2551/system/remoting-terminator] - Remote daemon shut down; proceeding with flushing remote transports.
[info] [2020-08-28 09:49:38,024] [ERROR] [akka.remote.artery.Encoder] [KafkaToSharding-akka.actor.default-dispatcher-3] [Encoder(akka://KafkaToSharding)] - Failed to serialize message [akka.remote.artery.OutboundHandshake$HandshakeReq].
[info] java.lang.IllegalArgumentException: Serializer identifier [32] of [akka.serialization.jackson.JacksonJsonSerializer] is not unique. It is also used by [akka.serialization.jackson.JacksonJsonSerializer].
[info]  at akka.serialization.Serialization.$anonfun$serializerByIdentity$1(Serialization.scala:515)
[info]  at scala.collection.IterableOnceOps.foldLeft(IterableOnce.scala:636)
[info]  at scala.collection.IterableOnceOps.foldLeft$(IterableOnce.scala:632)
[info]  at scala.collection.AbstractIterable.foldLeft(Iterable.scala:921)
[info]  at akka.serialization.Serialization.<init>(Serialization.scala:508)
[info]  at akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:18)
[info]  at akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:14)
[info]  at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:1145)
[info]  at akka.actor.ExtensionId.apply(Extension.scala:78)
[info]  at akka.actor.ExtensionId.apply$(Extension.scala:77)
[info]  at akka.serialization.SerializationExtension$.apply(SerializationExtension.scala:14)
[info]  at akka.remote.artery.Encoder$$anon$1.serialization(Codecs.scala:81)
[info]  at akka.remote.artery.Encoder$$anon$1.onPush(Codecs.scala:123)
[info]  at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:541)
[info]  at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:423)
[info]  at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:625)
[info]  at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:502)
[info]  at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:600)
[info]  at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:769)
[info]  at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:784)
[info]  at akka.actor.Actor.aroundReceive(Actor.scala:535)
[info]  at akka.actor.Actor.aroundReceive$(Actor.scala:533)
[info]  at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:691)
[info]  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
[info]  at akka.actor.ActorCell.invoke(ActorCell.scala:547)
[info]  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
[info]  at akka.dispatch.Mailbox.run(Mailbox.scala:231)
[info]  at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
[info]  at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
[info]  at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
[info]  at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
[info]  at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
[info]  at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
[info] [2020-08-28 09:49:38,024] [ERROR] [akka.remote.artery.Encoder] [KafkaToSharding-akka.actor.default-dispatcher-3] [Encoder(akka://KafkaToSharding)] - Failed to serialize message [akka.remote.artery.OutboundHandshake$HandshakeReq].
[info] java.lang.IllegalArgumentException: Serializer identifier [32] of [akka.serialization.jackson.JacksonJsonSerializer] is not unique. It is also used by [akka.serialization.jackson.JacksonJsonSerializer].
[info]  at akka.serialization.Serialization.$anonfun$serializerByIdentity$1(Serialization.scala:515)
[info]  at scala.collection.IterableOnceOps.foldLeft(IterableOnce.scala:636)
[info]  at scala.collection.IterableOnceOps.foldLeft$(IterableOnce.scala:632)
[info]  at scala.collection.AbstractIterable.foldLeft(Iterable.scala:921)
[info]  at akka.serialization.Serialization.<init>(Serialization.scala:508)
[info]  at akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:18)
[info]  at akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:14)
[info]  at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:1145)
[info]  at akka.actor.ExtensionId.apply(Extension.scala:78)
[info]  at akka.actor.ExtensionId.apply$(Extension.scala:77)
[info]  at akka.serialization.SerializationExtension$.apply(SerializationExtension.scala:14)
[info]  at akka.remote.artery.Encoder$$anon$1.serialization(Codecs.scala:81)
[info]  at akka.remote.artery.Encoder$$anon$1.onPush(Codecs.scala:123)
[info]  at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:541)
[info]  at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:423)
[info]  at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:625)
[info]  at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:502)
[info]  at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:600)
[info]  at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:769)
[info]  at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:784)
[info]  at akka.actor.Actor.aroundReceive(Actor.scala:535)
[info]  at akka.actor.Actor.aroundReceive$(Actor.scala:533)
[info]  at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:691)
[info]  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
[info]  at akka.actor.ActorCell.invoke(ActorCell.scala:547)
[info]  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
[info]  at akka.dispatch.Mailbox.run(Mailbox.scala:231)
[info]  at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
[info]  at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
[info]  at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
[info]  at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
[info]  at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
[info]  at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
[info] [2020-08-28 09:49:38,033] [INFO] [akka.remote.RemoteActorRefProvider$RemotingTerminator] [KafkaToSharding-akka.actor.default-dispatcher-3] [akka://KafkaToSharding@127.0.0.1:2551/system/remoting-terminator] - Remoting shut down.
[error] Nonzero exit code returned from runner: 1
[error] (processor / Compile / run) Nonzero exit code returned from runner: 1
[error] Total time: 3 s, completed 28 Aug. 2020, 9:49:38 am

Downgrading Akka to 2.6.6 produces the same behaviour of 2.6.8