dotnet / runtime

.NET is a cross-platform runtime for cloud, mobile, desktop, and IoT apps.
https://docs.microsoft.com/dotnet/core/
MIT License
15.05k stars 4.69k forks source link

System.Threading.Tasks.Dataflow.DataflowBlock.ReceiveAsync does not use custom scheduler #83159

Open cretz opened 1 year ago

cretz commented 1 year ago

Description

ReceiveAsync creates a ReceiveTarget at https://github.com/dotnet/runtime/blob/14123c989d6de088ac0ea24bc1d7f1d6b507f06f/src/libraries/System.Threading.Tasks.Dataflow/src/Base/DataflowBlock.cs#L1073 that does not allow any configuration of a task scheduler like https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/how-to-specify-a-task-scheduler-in-a-dataflow-block would have one believe.

Reproduction Steps

Create block, e.g. var block = new BufferBlock<bool>(new() { TaskScheduler = TaskScheduler.Current }); and confirm that block.ReceiveAsync() does not use it..

Expected behavior

Should use configured task scheduler or at least be well documented that there is no configuration of task scheduling for some dataflow extension methods.

Actual behavior

Surprising behavior of using unconfigurable global default scheduler. While global default may be sensible default, should either allow customization or document the limitation.

Regression?

No response

Known Workarounds

None, have to not use the extension methods.

Configuration

No response

Other information

No response

ghost commented 1 year ago

Tagging subscribers to this area: @dotnet/area-system-threading-tasks See info in area-owners.md if you want to be subscribed.

Issue Details
### Description `ReceiveAsync` creates a `ReceiveTarget` at https://github.com/dotnet/runtime/blob/14123c989d6de088ac0ea24bc1d7f1d6b507f06f/src/libraries/System.Threading.Tasks.Dataflow/src/Base/DataflowBlock.cs#L1073 that does not allow any configuration of a task scheduler like https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/how-to-specify-a-task-scheduler-in-a-dataflow-block would have one believe. ### Reproduction Steps Create block, e.g. `var block = new BufferBlock(new() { TaskScheduler = TaskScheduler.Current });` and confirm that `block.ReceiveAsync()` does not use it.. ### Expected behavior Should use configured task scheduler or at least be well documented that there is no configuration of task scheduling for some dataflow extension methods. ### Actual behavior Surprising behavior of using unconfigurable global default scheduler. While global default may be sensible default, should either allow customization or document the limitation. ### Regression? _No response_ ### Known Workarounds None, have to not use the extension methods. ### Configuration _No response_ ### Other information _No response_
Author: cretz
Assignees: -
Labels: `area-System.Threading.Tasks`
Milestone: -
stephentoub commented 1 year ago

Which tasks aren't being scheduled to the block's scheduler that you'd expect to be?

cretz commented 1 year ago

SendAsync causes it for a receive target on ReceiveAsync. I can explain my use case.

I am making a deterministic manual task scheduler for Temporal workflows at https://github.com/temporalio/sdk-dotnet. I already have a custom event source listener that checks that all tasks are created on my scheduler when running with my scheduler as the current scheduler to prevent anyone from using the default scheduler in workflows. And of course https://learn.microsoft.com/en-us/dotnet/fundamentals/code-analysis/quality-rules/ca2008 catches some (though I request they actually do use Current).

But this triggered my detector because the task for SendAsync/ReceiveAsync schedules a task on a different scheduler. https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/how-to-specify-a-task-scheduler-in-a-dataflow-block implies I can customize the scheduler for dataflow block use. Here's the stack:

Temporalio.Exceptions.InvalidWorkflowOperationException: Task scheduled during workflow run was not scheduled on workflow scheduler
   at System.Environment.get_StackTrace()
   at Temporalio.Worker.WorkflowTaskEventListener.OnEventWritten(EventWrittenEventArgs eventData) in c:\work\tem\sdk-dotnet\temporal-sdk-dotnet\src\Temporalio\Worker\WorkflowTaskEventListener.cs:line 120
   at System.Diagnostics.Tracing.EventSource.DispatchToAllListeners(EventWrittenEventArgs eventCallbackArgs)
   at System.Diagnostics.Tracing.EventSource.WriteToAllListeners(EventWrittenEventArgs eventCallbackArgs, Int32 eventDataCount, EventData* data)
   at System.Diagnostics.Tracing.EventSource.WriteEventWithRelatedActivityIdCore(Int32 eventId, Guid* relatedActivityId, Int32 eventDataCount, EventData* data)
   at System.Threading.Tasks.TplEventSource.TaskScheduled(Int32 OriginatingTaskSchedulerID, Int32 OriginatingTaskID, Int32 TaskID, Int32 CreatingTaskID, Int32 TaskCreationOptions, Int32 appDomain)
   at System.Threading.Tasks.Task.FireTaskScheduledIfNeeded(TaskScheduler ts)
   at System.Threading.Tasks.TaskScheduler.InternalQueueTask(Task task)
   at System.Threading.Tasks.Task.ScheduleAndStart(Boolean needsProtection)
   at System.Threading.Tasks.Task.InternalStartNew(Task creatingTask, Delegate action, Object state, CancellationToken cancellationToken, TaskScheduler scheduler, TaskCreationOptions options, InternalTaskOptions internalOptions)
   at System.Threading.Tasks.TaskFactory.StartNew(Action`1 action, Object state, CancellationToken cancellationToken, TaskCreationOptions creationOptions, TaskScheduler scheduler)
   at System.Threading.Tasks.Dataflow.DataflowBlock.ReceiveTarget`1.CleanupAndComplete(ReceiveCoreByLinkingCleanupReason reason)
   at System.Threading.Tasks.Dataflow.DataflowBlock.ReceiveTarget`1.System.Threading.Tasks.Dataflow.ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock`1 source, Boolean consumeToAccept)
   at System.Threading.Tasks.Dataflow.Internal.SourceCore`1.OfferToTargets(ITargetBlock`1 linkToTarget)
   at System.Threading.Tasks.Dataflow.Internal.SourceCore`1.OfferMessagesLoopCore()
   at System.Threading.Tasks.Dataflow.Internal.SourceCore`1.<>c.<OfferAsyncIfNecessary_Slow>b__44_0(Object thisSourceCore)
   at System.Threading.Tasks.Task.InnerInvoke()
   at System.Threading.Tasks.Task.<>c.<.cctor>b__272_0(Object obj)
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
   at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot, Thread threadPoolThread)
   at System.Threading.Tasks.Task.ExecuteEntry()
   at System.Threading.Tasks.TaskScheduler.TryExecuteTask(Task task)

I can understand if this won't be supported, but maybe at least some docs saying dataflow extension calls cannot use custom scheduling.

cretz commented 1 year ago

Specifically https://github.com/dotnet/runtime/blob/5f94bffeff62f4b767a311a4505d6d40d86279d9/src/libraries/System.Threading.Tasks.Dataflow/src/Base/DataflowBlock.cs#L1323-L1329

ericstj commented 1 year ago

@stephentoub what's your thought on this one. Is there an issue here with the scheduling or should we just document this?

stephentoub commented 9 months ago

@stephentoub what's your thought on this one. Is there an issue here with the scheduling or should we just document this?

This is behaving as designed, so from that perspective, it would just be further clarification as needed in the docs. Factoring in a different scheduler would require new APIs / overloads; if that's what's desired, it'd be good to go through the API proposal route.

I can explain my use case.

Thanks for the details. If you'd like to propose a new overload, please feel free to open an API issue for that.

cretz commented 9 months ago

:+1: I don't have a concrete proposal, so just some docs clarity would be ideal (thanks!)