dapr / components-contrib

Community driven, reusable components for distributed apps
Apache License 2.0
534 stars 468 forks source link

[Proposal] [JetStream] Add JetStream Parallel Processing Support #3115

Open Admiralkheir opened 10 months ago

Admiralkheir commented 10 months ago

Hi all, We have been using nats streaming for a long time. With the end of support in Nats Streaming, we decided to switch to JetStream. As a result of our tests, we are opening issues related to the situations we noticed. (#3079 resolved ty for effort)

Expected Behavior

Sidecar should sends the events without any limitation on its side, for the previous event It should forward the event without waiting for a success or fail response. We make this limitation with the Max Ack Pending parameter on the JetStream's side. If Nats does not receive an ack, it transmits the event itself.

Actual Behavior and Problem

We noticed sidecar is sending events to app one by one. Sending one and waiting for a response (success or failure) if success or fail it is sending a second event but JetStream component has Max Ack Pending parameter for this (unack message count). We are setting this parameter 5 and noticed in the subscription log from nats consumer info command, JetStream is sending 5 messages to sidecar but sidecar is sending one by one and waiting for processing event. This behaviour impact Parallelism I think.

According the JetStream doc;

Defines the maximum number of messages, without an acknowledgement, that can be outstanding

jetstream-component.yaml

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: jetstream-pubsub
spec:
  type: pubsub.jetstream
  version: v1
  metadata:
  - name: natsURL
    value: "nats://user:generaluser123@localhost:4222"
  - name: name
    value: "dfapijetstreamtest"
  - name: durableName
    value: "durable"
  - name: queueGroupName
    value: "queueGroup"
  - name: deliverPolicy
    value: all
  - name: ackWait
    value: 30s
  - name: maxDeliver
    value: 5
  - name: replicas
    value: 3
  - name: maxAckPending
    value: 5
  - name: ackPolicy
    value: "explicit"

dapr's log:

time="2023-09-06T12:35:13.7631402+03:00" level=info msg="starting Dapr Runtime -- version 1.11.2 -- commit 83ca1abb11ffe34211db55dcd36d96b94252827a" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.7631402+03:00" level=info msg="log level set to: debug" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.7631402+03:00" level=info msg="metrics server started on :9090/" app_id=app instance=G4YS8S3 scope=dapr.metrics type=log ver=1.11.2
time="2023-09-06T12:35:13.7658914+03:00" level=debug msg="Loading config from file: C:\\Users\\tugay.ersoy\\.dapr\\config.yaml" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.768659+03:00" level=debug msg="Found 0 resiliency configurations in resources path" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.768659+03:00" level=info msg="Resiliency configuration loaded" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.768659+03:00" level=info msg="standalone mode configured" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.768659+03:00" level=info msg="app id: app" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.768659+03:00" level=info msg="mTLS is disabled. Skipping certificate request and tls validation" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.7692033+03:00" level=info msg="Dapr trace sampler initialized: DaprTraceSampler(P=1.000000)" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.8092052+03:00" level=info msg="local service entry announced: app -> 192.168.1.38:61814" app_id=app component="mdns (nameResolution/v1)" instance=G4YS8S3 scope=dapr.contrib type=log ver=1.11.2
time="2023-09-06T12:35:13.8092052+03:00" level=info msg="Initialized name resolution to mdns" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.8092052+03:00" level=debug msg="the current OS does not support pluggable components feature, skipping initialization" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.8092052+03:00" level=info msg="Loading components…" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.812492+03:00" level=debug msg="Found component: jetstream-pubsub (pubsub.jetstream/v1)" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.812492+03:00" level=debug msg="Found component: statestore (state.redis/v1)" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.8130294+03:00" level=debug msg="Loading component: jetstream-pubsub (pubsub.jetstream/v1)" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.8325498+03:00" level=debug msg="Connected to nats at nats://user:generaluser123@localhost:4222" app_id=app component="jetstream-pubsub (pubsub.jetstream/v1)" instance=G4YS8S3 scope=dapr.contrib type=log ver=1.11.2
time="2023-09-06T12:35:13.8325498+03:00" level=debug msg="JetStream initialization complete" app_id=app component="jetstream-pubsub (pubsub.jetstream/v1)" instance=G4YS8S3 scope=dapr.contrib type=log ver=1.11.2
time="2023-09-06T12:35:13.8325498+03:00" level=info msg="Component loaded: jetstream-pubsub (pubsub.jetstream/v1)" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.8325498+03:00" level=debug msg="Loading component: statestore (state.redis/v1)" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.8325498+03:00" level=info msg="Waiting for all outstanding components to be processed" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.8462464+03:00" level=info msg="Using 'statestore' as actor state store" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.8469734+03:00" level=info msg="Component loaded: statestore (state.redis/v1)" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.8469734+03:00" level=info msg="All outstanding components processed" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.8469734+03:00" level=info msg="Loading endpoints" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.8486245+03:00" level=info msg="Waiting for all outstanding http endpoints to be processed" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.8486245+03:00" level=info msg="All outstanding http endpoints processed" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.8486245+03:00" level=info msg="gRPC proxy enabled" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.8491736+03:00" level=info msg="gRPC server listening on TCP address: :50001" app_id=app instance=G4YS8S3 scope=dapr.runtime.grpc.api type=log ver=1.11.2
time="2023-09-06T12:35:13.8491736+03:00" level=info msg="Enabled gRPC tracing middleware" app_id=app instance=G4YS8S3 scope=dapr.runtime.grpc.api type=log ver=1.11.2
time="2023-09-06T12:35:13.8491736+03:00" level=info msg="Enabled gRPC metrics middleware" app_id=app instance=G4YS8S3 scope=dapr.runtime.grpc.api type=log ver=1.11.2
time="2023-09-06T12:35:13.8491736+03:00" level=info msg="Registering workflow engine for gRPC endpoint: [::]:50001" app_id=app instance=G4YS8S3 scope=dapr.runtime.grpc.api type=log ver=1.11.2
time="2023-09-06T12:35:13.8491736+03:00" level=info msg="API gRPC server is running on port 50001" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.8497119+03:00" level=info msg="enabled metrics http middleware" app_id=app instance=G4YS8S3 scope=dapr.runtime.http type=log ver=1.11.2
time="2023-09-06T12:35:13.8497119+03:00" level=info msg="enabled tracing http middleware" app_id=app instance=G4YS8S3 scope=dapr.runtime.http type=log ver=1.11.2
time="2023-09-06T12:35:13.8502417+03:00" level=info msg="HTTP server listening on TCP address: :3500" app_id=app instance=G4YS8S3 scope=dapr.runtime.http type=log ver=1.11.2
time="2023-09-06T12:35:13.8502417+03:00" level=info msg="http server is running on port 3500" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.8502417+03:00" level=info msg="The request body size parameter is: 4" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.8507812+03:00" level=info msg="gRPC server listening on TCP address: :61814" app_id=app instance=G4YS8S3 scope=dapr.runtime.grpc.internal type=log ver=1.11.2
time="2023-09-06T12:35:13.8507812+03:00" level=info msg="Enabled gRPC tracing middleware" app_id=app instance=G4YS8S3 scope=dapr.runtime.grpc.internal type=log ver=1.11.2
time="2023-09-06T12:35:13.8507812+03:00" level=info msg="Enabled gRPC metrics middleware" app_id=app instance=G4YS8S3 scope=dapr.runtime.grpc.internal type=log ver=1.11.2
time="2023-09-06T12:35:13.8507812+03:00" level=info msg="internal gRPC server is running on port 61814" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:13.8507812+03:00" level=info msg="application protocol: http. waiting on port 5000.  This will block until the app is listening on that port." app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
You're up and running! Dapr logs will appear here.

time="2023-09-06T12:35:19.5627856+03:00" level=info msg="application discovered on port 5000" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:19.679386+03:00" level=info msg="Application configuration loaded" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:19.6797729+03:00" level=info msg="actor runtime started. actor idle timeout: 1h0m0s. actor scan interval: 30s" app_id=app instance=G4YS8S3 scope=dapr.runtime.actor type=log ver=1.11.2
time="2023-09-06T12:35:19.6797729+03:00" level=info msg="Configuring workflow engine with actors backend" app_id=app instance=G4YS8S3 scope=dapr.runtime.wfengine type=log ver=1.11.2
time="2023-09-06T12:35:19.6797729+03:00" level=info msg="Registering component for dapr workflow engine..." app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:19.6797729+03:00" level=info msg="initializing Dapr workflow component" app_id=app component="dapr (workflow.dapr/v1)" instance=G4YS8S3 scope=dapr.contrib type=log ver=1.11.2
time="2023-09-06T12:35:19.6797729+03:00" level=debug msg="try to connect to placement service: dns:///localhost:6050" app_id=app instance=G4YS8S3 scope=dapr.runtime.actor.internal.placement type=log ver=1.11.2
time="2023-09-06T12:35:19.7224238+03:00" level=debug msg="established connection to placement service at dns:///localhost:6050" app_id=app instance=G4YS8S3 scope=dapr.runtime.actor.internal.placement type=log ver=1.11.2
time="2023-09-06T12:35:19.7245676+03:00" level=debug msg="placement order received: lock" app_id=app instance=G4YS8S3 scope=dapr.runtime.actor.internal.placement type=log ver=1.11.2
time="2023-09-06T12:35:19.7245676+03:00" level=debug msg="placement order received: update" app_id=app instance=G4YS8S3 scope=dapr.runtime.actor.internal.placement type=log ver=1.11.2
time="2023-09-06T12:35:19.7245676+03:00" level=info msg="placement tables updated, version: 0" app_id=app instance=G4YS8S3 scope=dapr.runtime.actor.internal.placement type=log ver=1.11.2
time="2023-09-06T12:35:19.7245676+03:00" level=debug msg="placement order received: unlock" app_id=app instance=G4YS8S3 scope=dapr.runtime.actor.internal.placement type=log ver=1.11.2
time="2023-09-06T12:35:19.7584539+03:00" level=debug msg="app responded with subscriptions [{jetstream-pubsub WeatherForecast.*  map[] [0xc000d04c60] [] 0xc000baf760}]" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:19.762355+03:00" level=info msg="app is subscribed to the following topics: [WeatherForecast.*] through pubsub=jetstream-pubsub" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:19.762355+03:00" level=debug msg="subscribing to topic='WeatherForecast.*' on pubsub='jetstream-pubsub'" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:19.9197597+03:00" level=debug msg="nats: subscribed to subject WeatherForecast.* with queue group queueGroup" app_id=app component="jetstream-pubsub (pubsub.jetstream/v1)" instance=G4YS8S3 scope=dapr.contrib type=log ver=1.11.2
time="2023-09-06T12:35:19.9219125+03:00" level=info msg="dapr initialized. Status: Running. Init Elapsed 6153ms" app_id=app instance=G4YS8S3 scope=dapr.runtime type=log ver=1.11.2
time="2023-09-06T12:35:19.9240077+03:00" level=debug msg="Processing JetStream message WeatherForecast.new/{1 1}" app_id=app component="jetstream-pubsub (pubsub.jetstream/v1)" instance=G4YS8S3 scope=dapr.contrib type=log ver=1.11.2
time="2023-09-06T12:35:25.1461735+03:00" level=debug msg="Processing JetStream message WeatherForecast.new/{2 2}" app_id=app component="jetstream-pubsub (pubsub.jetstream/v1)" instance=G4YS8S3 scope=dapr.contrib type=log ver=1.11.2

Note

dapr's version: 1.11.2 JetStream version: 2.9.21

1669 was a similar issue with this issue in the past, @yaron2 helped on that issue, maybe this will help

HeikoS-Trumpf commented 10 months ago

@Jarema , maybe you can have a quick look?

Jarema commented 9 months ago

Yes. Will take a look into this one. Sorry for late reaponse.

Jarema commented 9 months ago

In Conext of Dapr, does it make sense to implement this feature as BulkPublish interaface impl @berndverst ?

That way we could still have access to ack result for all messages, but handled in async manner (publish everything and wait for all acks to come back).

Admiralkheir commented 5 months ago

Hello, is there any news regarding this issue? According to the log provided by dapr runtime in 1.12.4, NATS streaming will be removed with version 1.13, but we could not fully switch to JetStream due to parallel processing and dependency on this component throughout our applications. I submit it for your information @yaron2 time="2024-02-02T11:36:37.0821027+03:00" level=warning msg="⚠️ The NATS Streaming PubSub component is deprecated due to the deprecation of NATS Server, and will be removed from Dapr 1.13" app_id=servicea component="serviceapublisher (pubsub.natsstreaming/v1)" instance=G4YS8S3 scope=dapr.contrib type=log ver=1.12.4

yaron2 commented 5 months ago

Hello, is there any news regarding this issue? According to the log provided by dapr runtime in 1.12.4, NATS streaming will be removed with version 1.13, but we could not fully switch to JetStream due to parallel processing and dependency on this component throughout our applications. I submit it for your information @yaron2 time="2024-02-02T11:36:37.0821027+03:00" level=warning msg="⚠️ The NATS Streaming PubSub component is deprecated due to the deprecation of NATS Server, and will be removed from Dapr 1.13" app_id=servicea component="serviceapublisher (pubsub.natsstreaming/v1)" instance=G4YS8S3 scope=dapr.contrib type=log ver=1.12.4

This component will indeed be removed in 1.13. Dapr has a support policy for previous two releases so you have time to switch your apps to using Jetstream. I highly advise to move away from NATS streaming as the product itself is considered legacy

yaron2 commented 5 months ago

@Admiralkheir we can certainly enable this for 1.14, the next version. Which means you can stay with 1.12.x as it will still be a supported version

Admiralkheir commented 4 months ago

@yaron2 ty, we will be waitting

Admiralkheir commented 2 months ago

Hello, is there any development on this issue?

Jarema commented 2 months ago

I'll try to find time to pick it up in May.

Admiralkheir commented 1 month ago

Hello again, is there any development on this issue? @Jarema

Jarema commented 1 month ago

I'm still planning to work on this, but unfortunately my schedule shifted.