datalust / seq-api

HTTP API client for Seq
https://datalust.co/seq
Apache License 2.0
80 stars 20 forks source link

Can't get the streaming example to work #131

Closed joost00719 closed 8 months ago

joost00719 commented 8 months ago

I've been trying to get the streaming example to work. However I don't have a Select method on the ObservableStream, and the stream does not have a GetAwaiter.

image

Code:

using Newtonsoft.Json.Linq;
using Seq.Api;
using Serilog.Formatting.Compact.Reader;

namespace SeqTest
{
    internal class Program
    {
        static async Task Main(string[] args)
        {
            var filter = "@Level = 'Error'";

            var connection = new SeqConnection("http://localhost:5341");

            using (var stream = await connection.Events.StreamAsync<JObject>(filter: filter))
            using (stream.Select(jObject => LogEventReader.ReadFromJObject(jObject))
                         .Subscribe(evt => Log.Write(evt)))
            {
                await stream;
            }
        }
    }
}

Help will be much appreciated!

nblumhardt commented 8 months ago

Hi! You'll need to install the System.Reactive NuGet package for these:

dotnet add package system.reactive

and add:

using System.Reactive.Linq;

Let me know if this does the trick!

joost00719 commented 8 months ago

Hey, thanks for the feedback! You might wanna add that to the readme.md.

However, I took a different approach and my code looks like this now, and it works (still work in progress tho):

    public class SeqService(SeqConnection seqConnection) : IDisposable
    {
        private readonly Dictionary<IObserver<EventEntity>, IDisposable> references = new();
        private ObservableStream<EventEntity> stream;

        public void Dispose()
        {
            foreach (var observer in references)
            {
                observer.Value?.Dispose();
            }

            references.Clear();

            stream?.Dispose();
            seqConnection.Dispose();
        }

        public async Task Subscribe(IObserver<EventEntity> observer)
        {
            stream ??= await seqConnection.Events.StreamAsync<EventEntity>();

            stream.Subscribe(observer);
        }

        public bool Unsubscribe(IObserver<EventEntity> observer)
        {
            if (references.TryGetValue(observer, out var disposable))
            {
                disposable.Dispose();
                references.Remove(observer);

                if (!references.Any())
                {
                    stream?.Dispose(); // No need to keep the stream open if there are no observers
                    stream = null;
                }

                return true;
            }

            return false;
        }
    }