dapr / dotnet-sdk

Dapr SDK for .NET
Apache License 2.0
1.12k stars 340 forks source link

NullReferenceException when trying to sub with streaming subscriptions #1412

Open Aimless321 opened 3 days ago

Aimless321 commented 3 days ago

Expected Behavior

Receive messages in the registered handler when using streaming subscriptions.

Actual Behavior

Running version 1.14 of the Dapr runtime and using the master branch of the .NET SDK, i receive a NullRefrenceException when trying to subscribe to a topic using the new Messaging Client.

The call to messagingClient.SubscribeAsync never returns, and gives me an error which i traced back to a NullReferenceException on response.EventMessage in PublishSubscribeReceiver.cs line 252.

== APP ==       System.NullReferenceException: Object reference not set to an instance of an object.
== APP ==          at Dapr.Messaging.PublishSubscribe.PublishSubscribeReceiver.FetchDataFromSidecarAsync(AsyncDuplexStreamingCall`2 stream, ChannelWriter`1 channelWriter, CancellationToken cancellationToken)
== APP ==          at Dapr.Messaging.PublishSubscribe.PublishSubscribeReceiver.FetchDataFromSidecarAsync(AsyncDuplexStreamingCall`2 stream, ChannelWriter`1 channelWriter, CancellationToken cancellationToken)
== APP ==          at Dapr.Messaging.PublishSubscribe.PublishSubscribeReceiver.SubscribeAsync(CancellationToken cancellationToken)
== APP ==          at Dapr.Messaging.PublishSubscribe.DaprPublishSubscribeGrpcClient.SubscribeAsync(String pubSubName, String topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken)
== APP ==          at Worker.Services.WorkerService.ExecuteAsync(CancellationToken stoppingToken) in C:\Users\WesleyB\Projects\StrykerOrchestrator\Worker\Services\WorkerService.cs:line 37
== APP ==          at Microsoft.Extensions.Hosting.Internal.Host.TryExecuteBackgroundServiceAsync(BackgroundService backgroundService)

In WireShark i can see the GRPC call to the sidecar, and i also receive logging from the sidecar:

time="2024-11-22T13:54:48.3234217+01:00" level=info msg="Subscribing to pubsub 'task-queue' topic 'myTopic'" app_id=worker scope=dapr.runtime.pubsub.streamer type=log ver=1.14.4
time="2024-11-22T13:54:55.3082658+01:00" level=info msg="Unsubscribed from pubsub 'task-queue' topic 'myTopic'" app_id=worker scope=dapr.runtime.pubsub.streamer type=log ver=1.14.4

Even in Redis i can see a subscriber group, which says the messages have been delivered.

Steps to Reproduce the Problem

pubsub.yaml:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: task-queue
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: ""
  - name: consumerID
    value: "{appId}"
  - name: concurrency
    value: "1"
  - name: maxInFlightMessages
    value: "1"
  - name: logReceiveMessageErrors
    value: "true"
  1. Run the streaming subscription example.

Release Note

RELEASE NOTE:

Aimless321 commented 15 hours ago

Diving some more into it. It seems like this code is receiving some kind of initial response where the response.EventMessage is null.

Adding a simple:

if (response.EventMessage is null)
{
    continue;
}

Lets me receive events correctly in the delegate.

Also the DaprPublishSubscribeClient.SubscribeAsync method is blocking, not sure if that is intended. It only returns after the subscription has been closed.