edjCase / JsonRpc

Json Rpc Router for Asp.Net Core
MIT License
141 stars 42 forks source link

Nats support #114

Open zykovkirill opened 1 year ago

zykovkirill commented 1 year ago

Will there be Nats support in the future? For Example

using JsonRpc.Core;
using JsonRpc.Router;
using JsonRpc.Router.Abstractions;
using JsonRpc.Router.Defaults;

namespace TecNotificationProcessing.NatsProcessing
{
    public interface IJRpcService
    {

        Task<RpcResponse> ProcessRequestAsync(RpcRequest request);
    }

    public class JRpcService : IJRpcService
    {
        private readonly IServiceProvider _serviceProvider;
        private readonly IRpcInvoker _invoker;
        private readonly IRouteContext _routeContext;

        public JRpcService(IRpcRouteProvider routeProvider, IServiceProvider serviceProvider, IRpcInvoker invoker)
        {
            _invoker = invoker ?? throw new ArgumentNullException(nameof(invoker)); ;
            _serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider)); ;
            _routeContext = new DefaultRouteContext(null, null, routeProvider ?? throw new ArgumentNullException(nameof(routeProvider)));
        }
        public async Task<RpcResponse> ProcessRequestAsync(RpcRequest request)
        {
            return await _invoker.InvokeRequestAsync(request, RpcPath.Default, _routeContext, _serviceProvider);
        }
    }
}
Gekctek commented 1 year ago

@zykovkirill Can you elaborate, I'm not familiar with Nats.

amoraller commented 1 year ago

We have alternative channel for JSON rpc. We want resolve IRpcRequestHandler interface and invoke Task<bool> HandleRequestAsync(Stream requestBody, Stream responseBody); for processing jsonrpc protocol. I think IRpcRequestHandler and another interfaces must be public for using without aspnet core infrastructure.

zykovkirill commented 1 year ago

I managed to use the nuts with the help of reflection

public async Task<byte[]> ProcessRequestAsync(byte[] npt)
        {
            string path = "jrpcapi";
            var routeContext = new RpcContext(_serviceProvider, path);

            using MemoryStream requestStream = new MemoryStream(npt);
            using MemoryStream responseStream = new MemoryStream();
            var ttt = _reue.FirstOrDefault( c => c.FullName == "EdjCase.JsonRpc.Router.Abstractions.IRpcRequestHandler");

            using var scope = _serviceProvider.CreateScope();

            var ty = _reue.FirstOrDefault(c => c.FullName == "EdjCase.JsonRpc.Router.Abstractions.IRpcContextAccessor");
            var routeContextService = _serviceProvider.GetRequiredService(ty);
            var method = ty.GetMethods().FirstOrDefault(m => m.Name == "Set");
            method.Invoke(routeContextService, new object[] { routeContext });

            var tm = ttt.GetMethods();
            var tmm = tm.FirstOrDefault(m => m.Name == "HandleRequestAsync");
            var  t = await (Task<bool>)tmm.Invoke(Instance,  new object[] { requestStream, responseStream });

            responseStream.Position = 0L;
            return responseStream.ToArray();

        }

using var sub = await _natsConnection.SubscribeRequestAsync<byte[], byte[]>(_config.Value.NatsChannel, _jRpcService.ProcessRequestAsync);

zykovkirill commented 1 year ago

@zykovkirillМожно поподробнее, я не знаком с Натсом. In short, the data bus https://docs.nats.io/nats-concepts/what-is-nats I use this library in my project This one is the most popular (C#) https://stackoverflow.com/questions/63418503/does-grpc-vs-nats-or-kafka-make-any-sense

Similar question https://github.com/edjCase/JsonRpc/issues/37

zykovkirill commented 1 year ago

У нас есть альтернативный канал для JSON rpc. Мы хотим разрешить интерфейс IRpcRequestHandler и вызвать его Task<bool> HandleRequestAsync(Stream requestBody, Stream responseBody);для обработки протокола jsonrpc. Я думаю, что IRpcRequestHandler и другие интерфейсы должны быть общедоступными для использования без базовой инфраструктуры aspnet.

I agree with you

zykovkirill commented 1 year ago

Example

using EdjCase.JsonRpc.Router.Abstractions;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.IO;
using System.Threading.Tasks;

namespace EdjCase.JsonRpc.Router
{
    /// <summary>
    /// Custom Invoker 
    /// </summary>
    public class RpcCustomInvoker
    {
        private readonly IServiceProvider serviceProvider;
        public RpcCustomInvoker(IServiceProvider services)
        {
            this.serviceProvider = services ?? throw new ArgumentNullException(nameof(services));
        }

        public async Task<byte[]> ProcessRequestAsync(byte[] request, string controllerName)
        {

            if (controllerName.EndsWith("Controller"))
            {
                controllerName = controllerName.Substring(0, controllerName.IndexOf("Controller", StringComparison.Ordinal));
            }

            var routeContext = new RpcContext(this.serviceProvider, controllerName);

            using MemoryStream requestStream = new MemoryStream(request);
            using MemoryStream responseStream = new MemoryStream();

            using var scope = this.serviceProvider.CreateScope();
            var rpcContextAccessor = scope.ServiceProvider.GetRequiredService<IRpcContextAccessor>();
            rpcContextAccessor.Set(routeContext);

            var rpcRequestHandler = scope.ServiceProvider.GetRequiredService<IRpcRequestHandler>();
            await rpcRequestHandler.HandleRequestAsync(requestStream, responseStream);

            responseStream.Position = 0L;
            return responseStream.ToArray();

        }

    }
}

Nats service

using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using AlterNats;
using Core;
using EdjCase.JsonRpc.Router;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace Test
{
    public class NatsService : BackgroundService
    {
        private readonly ILogger _logger;
        private readonly NatsConnection _natsConnection;
        private readonly IOptions<NatsConfiguration> _config;
        private readonly RpcCustomInvoker _invoker;
        public NatsService(ILogger<NatsService> logger,
            IOptions<NatsConfiguration> config,
            INatsCommand command,
            RpcCustomInvoker invoker)
        {
            _config = config;
            _logger = logger;
            _natsConnection = (NatsConnection)command;
            _invoker = invoker;
        }
        private const string Error = "Ошибка при работе с натс";
        public bool HealthState { get; private set; } = false;

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            var connectionDisconnected = Observable.FromEventPattern<string>(a => _natsConnection.ConnectionDisconnected += a, a => _natsConnection.ConnectionDisconnected -= a).Select(p => Unit.Default);
            var reconnectFailed = Observable.FromEventPattern<string>(a => _natsConnection.ReconnectFailed += a, a => _natsConnection.ReconnectFailed -= a).Select(p => Unit.Default);

            using var _1 = reconnectFailed.Merge(connectionDisconnected).Subscribe(_ => HealthState = false);

            while (!stoppingToken.IsCancellationRequested)
            {
                try
                {

                    using var sub = await _natsConnection.SubscribeRequestAsync<byte[], byte[]>(_config.Value.NatsChannel, Handle);
                    HealthState = true;
                    await Observable.Merge(Observable.FromAsync(() => stoppingToken.AsTaskAsync())).FirstOrDefaultAsync();

                }
                catch (Exception ex) when (!stoppingToken.IsCancellationRequested)
                {
                    _logger.LogError(ex, Error);
                    await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
                }
            }
        }

        /// <summary>
        /// Обработчик запросов 
        /// </summary>
        /// <param name="rpcRequest">Запрос</param>
        private async Task<byte[]> Handle(byte[] rpcRequest)
        {
             _logger.LogInformation($"Принят запрос - {rpcRequest}");
            return await  _invoker.ProcessRequestAsync(rpcRequest, nameof(JrpcApiController));

        }

    }
}
Gekctek commented 1 year ago

I have put up a PR for a potential fix to help out your situation in #115 Take a look and let me know what you think @zykovkirill

zykovkirill commented 1 year ago

I have put up a PR for a potential fix to help out your situation in #115 Take a look and let me know what you think @zykovkirill

It works well, thanks

zykovkirill commented 1 year ago

In the near future I will formulate a question on integration NATS(Client) into EdjCase.JsonRpc.Client.Preliminary questions: 1) Make the RpcClient constructor public. 2) Make IRpcTransportClient public. 3) Analyze the code for flexibility. 4) Make alternative implementations of NatsClientBuilder, NatsRpcTransportClient.

Gekctek commented 1 year ago

@zykovkirill Version 5.1.5 has the PR changes in it

For your questions, if you can supply me with some code for a NatsClientBuilder and NatsRpcTransportClient, I can integrate them in. Im just not familiar with Nats. Then we can go back and forth on any feedback