Closed aramdata closed 1 month ago
Hi @aramdata Can you share some of your codes including how you register the masstransit dependencies and the IRtmpServerStreamEventHandler implementation
I apologize for the delay.
liveStreamingServer = LiveStreamingServerBuilder.Create()
.ConfigureRtmpServer(options =>
{
options.AddStandaloneServices();
options.AddFlv();
options.AddVideoCodecFilter(builder => builder.Include(VideoCodec.AVC));
options.AddAudioCodecFilter(builder => builder.Include(AudioCodec.AAC));
options.AddStreamProcessor(options =>
{
if (liveStreamingConfiguration.UseS3Storage)
options.AddHlsUploader(uploaderOptions =>
{
uploaderOptions.AddHlsStorageEventHandler<HlsStorageEventListener>();
uploaderOptions.AddAmazonS3(AmazonS3Client, liveBucket.BucketName, x => x.ObjectPathResolver = new HlsTransmuxerObjectPathResolver());
});
})
.AddAdaptiveHlsTranscoder(config =>
{
config.FFmpegPath = ExecutableFinder.FindExecutableFromPATH("ffmpeg")!;
config.AudioEncodingArguments = "-c:a copy";
config.OutputPathResolver = new HlsTransmuxerOutputPathResolver();
config.DownsamplingFilters = [
new DownsamplingFilter(
Name: "720p",
Height: 720,
MaxVideoBitrate: "3000k",
MaxAudioBitrate: "256k"
),
new DownsamplingFilter(
Name: "480p",
Height: 480,
MaxVideoBitrate: "1500k",
MaxAudioBitrate: "128k"
)
];
});
options.Services.AddMassTransit();
options.AddStreamEventHandler<HookApiForRtmpStream>();
})
.ConfigureLogging(options => options.AddSerilog())
.Build();
private static void AddMassTransit(this IServiceCollection services)
{
services.AddMassTransit(x =>
{
Assembly infrastructureAssembly = Assembly.GetExecutingAssembly();
Assembly[] assemblyCollection = [infrastructureAssembly];
x.AddConsumers(assemblyCollection);
x.AddSagaStateMachines(assemblyCollection);
x.AddSagas(assemblyCollection);
x.AddActivities(assemblyCollection);
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(xxxx);
cfg.ConfigureEndpoints(context);
});
});
}
Hook service implementation
public class HookApiForRtmpStream(IBus bus, ILogger<HookApiForRtmpStream> logger) : IRtmpServerStreamEventHandler
{
public async ValueTask OnRtmpStreamPublishedAsync(IEventContext context, uint clientId, string streamPath, IReadOnlyDictionary<string, string> streamArguments)
{
string streamKey = StreamServerExtension.GetStreamKeyFromStreamPath(streamPath);
await bus.Publish(new LiveStreamStartedIntegrationEvent(streamKey));
logger.LogInformation("stream [{streamkey}] published event raised", streamKey);
}
public async ValueTask OnRtmpStreamUnpublishedAsync(IEventContext context, uint clientId, string streamPath)
{
string streamKey = StreamServerExtension.GetStreamKeyFromStreamPath(streamPath);
await bus.Publish(new LiveStreamStopedIntegrationEvent(streamKey));
logger.LogInformation("stream [{streamkey}] unPublished event raised", streamKey);
}
public ValueTask OnRtmpStreamMetaDataReceivedAsync(
IEventContext context, uint clientId, string streamPath, IReadOnlyDictionary<string, object> metaData)
=> ValueTask.CompletedTask;
public ValueTask OnRtmpStreamSubscribedAsync(
IEventContext context, uint clientId, string streamPath, IReadOnlyDictionary<string, string> streamArguments)
=> ValueTask.CompletedTask;
public ValueTask OnRtmpStreamUnsubscribedAsync(IEventContext context, uint clientId, string streamPath)
=> ValueTask.CompletedTask;
}
And I add this point that this problem occurred just in docker container .
Hi @aramdata I can reproduce the issue when running the app in docker container.
I have tried adding a Task.Yield() like below, the event can then be published to RabbitMQ. But I don't think this is a proper solution.
public async ValueTask OnRtmpStreamUnpublishedAsync(IEventContext context, uint clientId, string streamPath)
{
await Task.Yield();
string streamKey = StreamServerExtension.GetStreamKeyFromStreamPath(streamPath);
await bus.Publish(new LiveStreamStopedIntegrationEvent(streamKey));
logger.LogInformation("stream [{streamkey}] unPublished event raised", streamKey);
}
I guess the bus.Publish is a blocking call, which causes a deadlock. It is like that the original thread is waiting for bus.Publish to complete, but bus.Publish is also waiting for the original thread to be available.
Thanks for your answer . Why just in docker container it's occured?
Is it related to separated ioc and service collection?
Because docker container has limited CPU resources and hence has limited worker threads.
The dead lock is not related to the ioc container, but should be an issue in the masstransit library.
You may try using Task.Yield()
or await Task.Run(() => bus.Publish(new LiveStreamStopedIntegrationEvent(streamKey));
But it may still potentially encounter the same issue in the situation of severe thread starvation, so I suggest you to either:
I have just tried the script below. If the thread pool resources are limited, masstransit cannot even start correctly. This implies that there could be a deadlock situation where the single thread is waiting for a task that can only be completed by another thread, but there are no other threads available.
ThreadPool.SetMinThreads(1, 1);
ThreadPool.SetMaxThreads(1, 1);
var builder = Host.CreateApplicationBuilder();
builder.Services.AddMassTransit(x =>
{
Assembly infrastructureAssembly = Assembly.GetExecutingAssembly();
Assembly[] assemblyCollection = [infrastructureAssembly];
x.AddConsumers(assemblyCollection);
x.AddSagaStateMachines(assemblyCollection);
x.AddSagas(assemblyCollection);
x.AddActivities(assemblyCollection);
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", h =>
{
h.Username("user");
h.Password("password");
});
cfg.ConfigureEndpoints(context);
});
});
await builder.Build().RunAsync();
However, the live streaming server itself can still work without any issue
ThreadPool.SetMinThreads(1, 1);
ThreadPool.SetMaxThreads(1, 1);
using var liveStreamingServer = LiveStreamingServerBuilder.Create()
.ConfigureLogging(options => options.AddConsole())
.Build();
await liveStreamingServer.RunAsync(new IPEndPoint(IPAddress.Any, 1935));
Because docker container has limited CPU resources and hence has limited worker threads.
The dead lock is not related to the ioc container, but should be an issue in the masstransit library.
You may try using
Task.Yield()
orawait Task.Run(() => bus.Publish(new LiveStreamStopedIntegrationEvent(streamKey));
But it may still potentially encounter the same issue in the situation of severe thread starvation, so I suggest you to either:1. Use Task.Run or Task.Factory.StartNew for publishing the event with a retry mechanism for the timeout 2. Use a dedicated thread pool for publishing the event 3. Use the transactional outbox feature of masstransit, which firstly stores the event into db and publish the event in background.
There were also some side issues: First, when he was busy sending events when starting a new stream, the processing of other streams was also disrupted due to masstransit timeout. The next point is that the disruption of any process in OnRtmpStreamPublishedAsync causes the main task of receiving the stream from the user and the process to be disrupted, and this is not a good idea for my work at all.
My need for this implementation is only to publish a series of events to other services, and the main work, which is to receive and process the stream, should be in the main priority and stable.
I will probably use the first or second method, and the next point is that I have several consumes, for example, to interrupt a stream, so maybe the same problem will happen to me there.
First, when he was busy sending events when starting a new stream, the processing of other streams was also disrupted due to masstransit timeout. The next point is that the disruption of any process in OnRtmpStreamPublishedAsync causes the main task of receiving the stream from the user and the process to be disrupted, and this is not a good idea for my work at all.
This is because bus.Publish is blocking the worker thread, which disrupts the execution of other tasks.
It seems that rabbitmq-dotnet-client will provide asynchronous APIs in version 7.0.0. If MassTransit adopts the asynchronous APIs, it should resolve the issue.
interface ICustomThreadPool : IDisposable
{
Task Run(Action work);
Task<T> Run<T>(Func<T> work);
}
class CustomThreadPool : ICustomThreadPool
{
private readonly Thread[] _threads;
private readonly ConcurrentQueue<WorkItemBase> _workItems;
private readonly SemaphoreSlim _signal;
private readonly CancellationTokenSource _cts;
private bool _disposing;
public CustomThreadPool(int threadCount)
{
_workItems = new ConcurrentQueue<WorkItemBase>();
_signal = new SemaphoreSlim(0);
_cts = new CancellationTokenSource();
_threads = Enumerable.Range(0, threadCount)
.Select(_ => new Thread(Worker))
.ToArray();
foreach (var thread in _threads)
{
thread.IsBackground = true;
thread.Start();
}
}
public async Task Run(Action work)
{
if (_disposing)
throw new ObjectDisposedException(nameof(CustomThreadPool));
var tcs = new TaskCompletionSource();
_workItems.Enqueue(new WorkItem(work, tcs));
_signal.Release();
await tcs.Task;
}
public async Task<T> Run<T>(Func<T> work)
{
if (_disposing)
throw new ObjectDisposedException(nameof(CustomThreadPool));
var tcs = new TaskCompletionSource<T>();
_workItems.Enqueue(new WorkItem<T>(work, tcs));
_signal.Release();
return await tcs.Task;
}
private void Worker()
{
while (!_cts.IsCancellationRequested)
{
try
{
_signal.Wait(_cts.Token);
}
catch (OperationCanceledException) { }
while (_workItems.TryDequeue(out var workItem))
{
workItem.Work();
}
}
}
public void Dispose()
{
if (_disposing)
return;
_disposing = true;
_cts.Cancel();
foreach (var thread in _threads)
thread.Join();
_signal.Dispose();
_cts.Dispose();
}
private abstract class WorkItemBase
{
public abstract void Work();
}
private class WorkItem(Action work, TaskCompletionSource tcs) : WorkItemBase
{
private readonly Action _work = work;
private readonly TaskCompletionSource _tcs = tcs;
public override void Work()
{
try
{
_work();
_tcs.SetResult();
}
catch (Exception ex)
{
_tcs.SetException(ex);
}
}
}
private class WorkItem<T>(Func<T> work, TaskCompletionSource<T> tcs) : WorkItemBase
{
private readonly Func<T> _work = work;
private readonly TaskCompletionSource<T> _tcs = tcs;
public override void Work()
{
try
{
_tcs.SetResult(_work());
}
catch (Exception ex)
{
_tcs.SetException(ex);
}
}
}
}
class HookApiForRtmpStream(IBus bus, ICustomThreadPool customThreadPool, ILogger<HookApiForRtmpStream> logger) : IRtmpServerStreamEventHandler
{
public async ValueTask OnRtmpStreamPublishedAsync(IEventContext context, uint clientId, string streamPath, IReadOnlyDictionary<string, string> streamArguments)
{
await customThreadPool.Run(() => bus.Publish(new LiveStreamStartedIntegrationEvent(streamPath)).Wait());
}
public async ValueTask OnRtmpStreamUnpublishedAsync(IEventContext context, uint clientId, string streamPath)
{
await customThreadPool.Run(() => bus.Publish(new LiveStreamStopedIntegrationEvent(streamPath)).Wait());
}
public ValueTask OnRtmpStreamMetaDataReceivedAsync(
IEventContext context, uint clientId, string streamPath, IReadOnlyDictionary<string, object> metaData)
=> ValueTask.CompletedTask;
public ValueTask OnRtmpStreamSubscribedAsync(
IEventContext context, uint clientId, string streamPath, IReadOnlyDictionary<string, string> streamArguments)
=> ValueTask.CompletedTask;
public ValueTask OnRtmpStreamUnsubscribedAsync(IEventContext context, uint clientId, string streamPath)
=> ValueTask.CompletedTask;
}
This works well for me by offloading the blocking method to another dedicated threads
perfect . thank you
I have a problem that I don't know for sure whether it is a problem with live-streaming-server-net or not. Description of the problem: I made an implementation of IRtmpServerStreamEventHandler to send event to rabbitmq when stream start and end. The implementation for rabbitmq client is with masstransit and U use IBus interface . The OnRtmpStreamPublishedAsync method sends the event correctly , but on OnRtmpStreamUnpublishedAsync method , IBus.Publish gets TimeoutException when sending event. And the interesting thing is that the next time OnRtmpStreamPublishedAsync gives the same error. I registered masstransit with ILiveStreamingServer's internal IServiceCollection. I don't have such a problem when I register with the main IServiceCollection of the program. Could the problem be from ILiveStreamingServer and its internal DI issues?
i develop in windows and in develop mode i don't have this problem. in production and container environment with internal rabbit or cloud rabbit i have problem.