Open Jarrah-libremfg opened 2 weeks ago
Do you have more logs of the restate-server? You can get more logs of the kafka component using the following env variable: RUST_LOG="info,restate_ingress_kafka=trace"
.
Perhaps could the problem be that you're trying to create a subscription before the service is actually registered? In that case, the subscription creating will return back an error.
Do you have more logs of the restate-server? You can get more logs of the kafka component using the following env variable:
RUST_LOG="info,restate_ingress_kafka=trace"
.
Even with that there really aren't a lot of relevant logs. The following is out of a test run. There's a bunch of info level registration logs above.
[runtime] | 2024-11-12T21:57:07.924005Z INFO restate_invoker_impl::invocation_task::service_protocol_runner
[runtime] | Executing invocation at deployment
[runtime] | invocation.id: inv_17UMsllpxV0Q2QAaOy3gzSuAQKiG4h7wUp
[runtime] | deployment.address: http://172.18.0.1:29080/
[runtime] | deployment.service_protocol_version: 1
[runtime] | path: /invoke/WorkflowSpecification/Set
[runtime] | on rt:pp-6
[runtime] | 2024-11-12T21:57:07.948503Z INFO restate_ingress_http::handler::service_handler
[runtime] | Processing ingress request
[runtime] | on rs:ingress-16
[runtime] | in restate_ingress_http::handler::service_handler::ingress
[runtime] | restate.invocation.id: inv_17UMsllpxV0Q3w5MYku7MqOADl68iEpGg1
[runtime] | restate.invocation.target: message_start/{key}/Enable
[runtime] | 2024-11-12T21:57:07.954056Z INFO restate_invoker_impl::invocation_task::service_protocol_runner
[runtime] | Executing invocation at deployment
[runtime] | invocation.id: inv_17UMsllpxV0Q3w5MYku7MqOADl68iEpGg1
[runtime] | deployment.address: http://172.18.0.1:29080/
[runtime] | deployment.service_protocol_version: 1
[runtime] | path: /invoke/message_start/Enable
[runtime] | on rt:pp-6
[runtime] | 2024-11-12T21:57:07.967561Z DEBUG restate_ingress_kafka::subscription_controller::task_orchestrator
[runtime] | Spawning the consumer task for subscription id sub_16hfvhIEXwevdkyKeNrv1Vn
[runtime] | on rs:worker-13
[runtime] | 2024-11-12T21:57:07.967672Z DEBUG restate_ingress_kafka::consumer_task
[runtime] | Starting consumer for topics ["message-start"] with configuration ClientConfig { conf_map: {"enable.auto.commit": "true", "enable.auto.offset.store": "false", "group.id": "sub_16hfvhIEXwevdkyKeNrv1Vn", "client.id": "restate", "metadata.broker.list": "PLAINTEXT://redpanda:29092"}, log_level: Debug }
[runtime] | on rs:worker-13
If I create a listener to that Kafka topic I would see a message on the above topic shortly after that subscription is created, but there's nothing in the logs or execution of the service that would indicate it was received by the Restate consumer.
Perhaps could the problem be that you're trying to create a subscription before the service is actually registered? In that case, the subscription creating will return back an error.
Not unless the deployment process can take longer than 2 seconds after the HTTP call replies. The very first thing the test code does is start and register the service. I've added pauses in in case there is some delay after registration with no change.
It's probably worth noting that I call other parts of the service to load data in before creating the subscription and that succeeds. So Restate absolutely knows it's there and how to call it.
I've added in a bunch of my own logging to try to work out where the issue is. It would seem the first time no message comes from the rdkafka::consumer::StreamConsumer.
Failing logs:
[runtime] | 2024-11-13T04:23:08.070524Z INFO restate_invoker_impl::invocation_task::service_protocol_runner
[runtime] | Executing invocation at deployment
[runtime] | invocation.id: inv_17UMsllpxV0Q5cfVpSh5n187gfEkFaMKpX
[runtime] | deployment.address: http://172.18.0.1:29080/
[runtime] | deployment.service_protocol_version: 1
[runtime] | path: /invoke/message_start/Enable
[runtime] | on rt:pp-6
[runtime] | 2024-11-13T04:23:08.125171Z DEBUG restate_ingress_kafka::subscription_controller::task_orchestrator
[runtime] | Spawning the consumer task for subscription id sub_12x4St13qmf4QBVUXCEcdjj
[runtime] | on rs:worker-0
[runtime] | 2024-11-13T04:23:08.125240Z DEBUG restate_ingress_kafka::consumer_task
[runtime] | Starting consumer for topics ["message-start"] with configuration ClientConfig { conf_map: {"client.id": "restate", "metadata.broker.list": "PLAINTEXT://redpanda:29092", "group.id": "sub_12x4St13qmf4QBVUXCEcdjj", "enable.auto.commit": "true", "enable.auto.offset.store": "false"}, log_level: Debug }
[runtime] | on rs:worker-0
[runtime] | 2024-11-13T04:23:08.126405Z DEBUG restate_ingress_kafka::consumer_task
[runtime] | Creating consumer with topics [
[runtime] | "message-start",
[runtime] | ]
[runtime] | on rs:worker-0
[runtime] | 2024-11-13T04:23:08.126533Z DEBUG restate_ingress_kafka::consumer_task
[runtime] | Starting listener loop for topics: [
[runtime] | "message-start",
[runtime] | ]
[runtime] | on rs:worker-0
Exactly the same test run a second time results in:
runtime] | 2024-11-13T04:24:05.121916Z INFO restate_invoker_impl::invocation_task::service_protocol_runner
[runtime] | Executing invocation at deployment
[runtime] | invocation.id: inv_17UMsllpxV0Q02UYrhb54uozzoaMAjTxqV
[runtime] | deployment.address: http://172.18.0.1:29080/
[runtime] | deployment.service_protocol_version: 1
[runtime] | path: /invoke/message_start/Enable
[runtime] | on rt:pp-6
[runtime] | 2024-11-13T04:24:05.133712Z DEBUG restate_ingress_kafka::subscription_controller::task_orchestrator
[runtime] | Spawning the consumer task for subscription id sub_14NOwbMfb9oSsTPpLxEYhJn
[runtime] | on rs:worker-10
[runtime] | 2024-11-13T04:24:05.133772Z DEBUG restate_ingress_kafka::consumer_task
[runtime] | Starting consumer for topics ["message-start"] with configuration ClientConfig { conf_map: {"metadata.broker.list": "PLAINTEXT://redpanda:29092", "group.id": "sub_14NOwbMfb9oSsTPpLxEYhJn", "client.id": "restate", "enable.auto.offset.store": "false", "enable.auto.commit": "true"}, log_level: Debug }
[runtime] | on rs:worker-10
[runtime] | 2024-11-13T04:24:05.134379Z DEBUG restate_ingress_kafka::consumer_task
[runtime] | Creating consumer with topics [
[runtime] | "message-start",
[runtime] | ]
[runtime] | on rs:worker-10
[runtime] | 2024-11-13T04:24:05.134529Z DEBUG restate_ingress_kafka::consumer_task
[runtime] | Starting listener loop for topics: [
[runtime] | "message-start",
[runtime] | ]
[runtime] | on rs:worker-10
[runtime] | 2024-11-13T04:24:05.654514Z DEBUG restate_ingress_kafka::consumer_task
[runtime] | Received message
[runtime] | on rs:worker-0
[runtime] | 2024-11-13T04:24:05.654551Z DEBUG restate_ingress_kafka::consumer_task
[runtime] | Received Message on topic = message-start, partition = 0, offset = 1
[runtime] | on rs:worker-0
[runtime] | 2024-11-13T04:24:05.654577Z DEBUG restate_ingress_kafka::consumer_task
[runtime] | Received message
[runtime] | on rs:worker-10
[runtime] | 2024-11-13T04:24:05.654603Z DEBUG restate_ingress_kafka::consumer_task
[runtime] | Received Message on topic = message-start, partition = 0, offset = 1
[runtime] | on rs:worker-10
[runtime] | 2024-11-13T04:24:05.654636Z INFO restate_ingress_kafka::consumer_task
[runtime] | Processing Kafka ingress request
[runtime] | on rs:worker-0
[runtime] | in restate_ingress_kafka::consumer_task::kafka_ingress_consume
[runtime] | otel.name: "kafka_ingress_consume"
[runtime] | messaging.system: "kafka"
[runtime] | messaging.operation: "receive"
[runtime] | messaging.source.name: "message-start"
[runtime] | messaging.destination.name: service://message_start_service/Handle
My reading of this is that the subscription is being created the first time but nothing is being sent on it. The second time a subscription is created everything starts flowing correctly. Both runs create new subscriptions, which can be seen in the admin API:
{
"subscriptions": [
{
"id": "sub_12x4St13qmf4QBVUXCEcdjj",
"source": "kafka://redpanda/message-start",
"sink": "service://message_start_service/Handle",
"options": {
"client.id": "restate",
"group.id": "sub_12x4St13qmf4QBVUXCEcdjj"
}
},
{
"id": "sub_14NOwbMfb9oSsTPpLxEYhJn",
"source": "kafka://redpanda/message-start",
"sink": "service://message_start_service/Handle",
"options": {
"client.id": "restate",
"group.id": "sub_14NOwbMfb9oSsTPpLxEYhJn"
}
}
]
}
I have another hypothesis, that actually comes from our own Kafka tests: https://github.com/restatedev/sdk-test-suite/blob/main/src/main/kotlin/dev/restate/sdktesting/tests/KafkaIngress.kt#L99
The subscription process is completely asynchronous, after restate accepts the subscription it replies with 200 and then it takes some time before the consumer group is created in Kafka. Due to that, if you send a kafka record before the consumer group starts (essentially you do step 3 of your test before step 2 creates the consumer group). Now consumer groups default configuration is just to start from latest record, rather than from first record, thus you won't see records that were published before the consumer group actually started. Most likely in the second test attempt, you get now the request but from the first subscription, and not the second one you created in another attempt.
Unfortunately the log lines you posted miss some details to verify this hypothesis (I need to fix that on our side), but one thing you can do (and perhaps it's a good practice to do in the tests anyway) is to set auto.offset.reset
to earliest
(what's shown here https://docs.restate.dev/operate/invocation#managing-kafka-subscriptions)
I've just tried a few things to test that hypothesis. Some good news: adding the auto.offset.reset
option does make the test work. Sadly it also has the side effect of replaying all messages whenever a subscription is created on an existing topic, which breaks our runtime goal.
The subscription process is completely asynchronous, after restate accepts the subscription it replies with 200 and then it takes some time before the consumer group is created in Kafka. Due to that, if you send a kafka record before the consumer group starts (essentially you do step 3 of your test before step 2 creates the consumer group)
To test this theory I added a 15 second pause between requesting the subscription and sending the message. That did seem to work correctly.
Most likely in the second test attempt, you get now the request but from the first subscription, and not the second one you created in another attempt.
I don't believe this is what is happening. If it were I'd expect that deleting the first subscription then running the test would result in a test failure (no messages received). However if I start everything, run the test twice so it passes, then delete all subscriptions and re-run the test it still passes. It seems to me more like there is a slower startup the first time a new subscription is created and I was hitting that on the cold startup attempt.
Unfortunately the log lines you posted miss some details to verify this hypothesis (I need to fix that on our side)
With your additional logs added:
[runtime] | 2024-11-14T03:25:59.740571Z DEBUG restate_ingress_kafka::subscription_controller::task_orchestrator
[runtime] | Spawning the consumer task for subscription id sub_17Jj4g7zV5F3OChvjyNImRz
[runtime] | on rs:worker-12
[runtime] | 2024-11-14T03:25:59.740661Z DEBUG restate_ingress_kafka::consumer_task
[runtime] | Starting consumer for topics ["message-start"] with configuration ClientConfig { conf_map: {"client.id": "restate", "enable.auto.offset.store": "false", "metadata.broker.list": "PLAINTEXT://redpanda:29092", "group.id": "sub_17Jj4g7zV5F3OChvjyNImRz", "enable.auto.commit": "true"}, log_level: Debug }
[runtime] | restate.subscription.id: sub_17Jj4g7zV5F3OChvjyNImRz
[runtime] | messaging.consumer.group.name: "sub_17Jj4g7zV5F3OChvjyNImRz"
[runtime] | on rs:worker-12
[runtime] | 2024-11-14T03:25:59.741857Z DEBUG restate_ingress_kafka::consumer_task
[runtime] | Creating consumer with topics [
[runtime] | "message-start",
[runtime] | ]
[runtime] | on rs:worker-12
[runtime] | 2024-11-14T03:25:59.742573Z DEBUG restate_ingress_kafka::consumer_task
[runtime] | Assigned topic/partitions/offset: TPL {}
[runtime] | restate.subscription.id: sub_17Jj4g7zV5F3OChvjyNImRz
[runtime] | messaging.consumer.group.name: "sub_17Jj4g7zV5F3OChvjyNImRz"
[runtime] | on rs:worker-12
[runtime] | 2024-11-14T03:25:59.742598Z DEBUG restate_ingress_kafka::consumer_task
[runtime] | Starting listener loop for topics: [
[runtime] | "message-start",
[runtime] | ]
[runtime] | on rs:worker-12
Thanks for your help on this by the way.
Some good news: adding the auto.offset.reset option does make the test work. Sadly it also has the side effect of replaying all messages whenever a subscription is created on an existing topic, which breaks our runtime goal.
Agreed, for production this probably doesn't make sense, but in tests where you have two processes involved the consumer booting is generally always asynchronous, that is true in the "usual" kafka APIs too, so in that case auto.offset.reset
is probably legit, or a long sleep, or even better you could "await" in the test the moment the consumer group exists in the kafka admin api.
Not sure if there's anything else actionable in this task now :)
Description
When using Restate subscriptions with the Restate Go SDK messages don't seem to trigger execution until a restart of the deployed application.
Reproduction
message_start_service/Handle
function is never triggered.Attempted fixes
In my case I have a test that does all of these steps automatically. I've tried adding pauses into the system to ensure messages aren't being sent too early. I've also tried sending multiple messages in case the first one always gets skipped. Neither of these have resolved the issue.
If I run the test twice without rebuilding the restate/Kafka deployment (effectively step 2 and down) everything works perfectly.