fanliang11 / surging

Surging is a micro-service engine that provides a lightweight, high-performance, modular RPC request pipeline. support Event-based Asynchronous Pattern and reactive programming ,The service engine supports http, TCP, WS,Grpc, Thrift,Mqtt, UDP, and DNS protocols. It uses ZooKeeper and Consul as a registry, and integrates it. Hash, random, polling, Fair Polling as a load balancing algorithm, built-in service governance to ensure reliable RPC communication, the engine contains Diagnostic, link tracking for protocol and middleware calls, and integration SkyWalking Distributed APM
MIT License
3.24k stars 924 forks source link

Surging 数据返回大小有限制么? #34

Open daofenglg opened 6 years ago

daofenglg commented 6 years ago

服务端启动成功,2018/1/3 17:36:13。 fail: Surging.Core.DotNetty.DotNettyServerMessageListener[0] 与服务器:[::ffff:127.0.0.1]:34691通信时发送了错误。 System.Net.Sockets.SocketException (0x80004005): 远程主机强迫关闭了一个现有的连 接。 at DotNetty.Transport.Channels.Sockets.SocketChannelAsyncOperation.Validate()

at DotNetty.Transport.Channels.Sockets.AbstractSocketByteChannel.SocketByteCh annelUnsafe.FinishRead(SocketChannelAsyncOperation operation) fail: Surging.Core.DotNetty.DotNettyServerMessageListener[0] 与服务器:[::ffff:127.0.0.1]:57473通信时发送了错误。 System.Net.Sockets.SocketException (0x80004005): 远程主机强迫关闭了一个现有的连 接。 at DotNetty.Transport.Channels.Sockets.SocketChannelAsyncOperation.Validate()

at DotNetty.Transport.Channels.Sockets.AbstractSocketByteChannel.SocketByteCh annelUnsafe.FinishRead(SocketChannelAsyncOperation operation)

fanliang11 commented 6 years ago

surging 使用的是netty ,netty用的socket, 我记得Socket.ReceiveBufferSize默认大小应该是10248字节 如果超过了netty 应该会分成多个进行传输,如果要修改传输的限制大小,可以设置netty的bootstrap.Option(ChannelOption.SoRcvbuf, 1024 100) surging 下版本增加配置,会把这些选项放到配置文件中。

daofenglg commented 6 years ago

收到,谢谢您

daofenglg commented 6 years ago

private static Bootstrap GetBootstrap() { var bootstrap = new Bootstrap(); bootstrap .Channel() .Option(ChannelOption.SoRcvbuf, 1024 * 100) .Option(ChannelOption.TcpNodelay, true) .Option(ChannelOption.Allocator, PooledByteBufferAllocator.Default) .Group(new MultithreadEventLoopGroup(1));

        return bootstrap;
    }

我将bootstrap.Option(ChannelOption.SoRcvbuf, 1024 * 100)添加后问题依旧,我请求30-40条数据时,数据大小为30+K 是可以的,数据增加到50条数据时,报请求超时,在网关处没有收到服务器返回的数据信息

异常: 捕获了异常: Surging.Core.CPlatform.dll 中的“System.OperationCanceledException”(“请求超时”). 捕获了异常: Surging.Core.CPlatform.dll 中的“System.OperationCanceledException”(“请求超时”)    202.49 秒        [17920] 工作线程 
fanliang11 commented 6 years ago

@daofenglg 是在client 端加的吗?这个文件中的代码 DotNettyTransportClientFactory

daofenglg commented 6 years ago

是的 ` ///

/// 基于DotNetty的传输客户端工厂。 /// public class DotNettyTransportClientFactory : ITransportClientFactory, IDisposable {

region Field

    private readonly ITransportMessageEncoder _transportMessageEncoder;
    private readonly ITransportMessageDecoder _transportMessageDecoder;
    private readonly ILogger<DotNettyTransportClientFactory> _logger;
    private readonly IServiceExecutor _serviceExecutor;
    private readonly ConcurrentDictionary<EndPoint, Lazy<ITransportClient>> _clients = new ConcurrentDictionary<EndPoint, Lazy<ITransportClient>>();
    private readonly Bootstrap _bootstrap;

    private static readonly AttributeKey<IMessageSender> messageSenderKey = AttributeKey<IMessageSender>.ValueOf(typeof(DotNettyTransportClientFactory), nameof(IMessageSender));
    private static readonly AttributeKey<IMessageListener> messageListenerKey = AttributeKey<IMessageListener>.ValueOf(typeof(DotNettyTransportClientFactory), nameof(IMessageListener));
    private static readonly AttributeKey<EndPoint> origEndPointKey = AttributeKey<EndPoint>.ValueOf(typeof(DotNettyTransportClientFactory), nameof(EndPoint));

    #endregion Field

    #region Constructor

    public DotNettyTransportClientFactory(ITransportMessageCodecFactory codecFactory, ILogger<DotNettyTransportClientFactory> logger)
        : this(codecFactory, logger, null)
    {
    }

    public DotNettyTransportClientFactory(ITransportMessageCodecFactory codecFactory, ILogger<DotNettyTransportClientFactory> logger, IServiceExecutor serviceExecutor)
    {
        _transportMessageEncoder = codecFactory.GetEncoder();
        _transportMessageDecoder = codecFactory.GetDecoder();
        _logger = logger;
        _serviceExecutor = serviceExecutor;
        _bootstrap = GetBootstrap();

        _bootstrap.Handler(new ActionChannelInitializer<ISocketChannel>(c =>
        {
            var pipeline = c.Pipeline;
            pipeline.AddLast(new LengthFieldPrepender(4));
            pipeline.AddLast(new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4));
            pipeline.AddLast(new TransportMessageChannelHandlerAdapter(_transportMessageDecoder));
            pipeline.AddLast(new DefaultChannelHandler(this));
        }));
    }

    #endregion Constructor

    #region Implementation of ITransportClientFactory

    /// <summary>
    /// 创建客户端。
    /// </summary>
    /// <param name="endPoint">终结点。</param>
    /// <returns>传输客户端实例。</returns>
    public ITransportClient CreateClient(EndPoint endPoint)
    {
        var key = endPoint;
        if (_logger.IsEnabled(LogLevel.Debug))
            _logger.LogDebug($"准备为服务端地址:{key}创建客户端。");
        try
        {
            return _clients.GetOrAdd(key
                , k => new Lazy<ITransportClient>(() =>
                {
                    var bootstrap = _bootstrap;
                    var channel = bootstrap.ConnectAsync(k).Result;
                    var messageListener = new MessageListener();
                    channel.GetAttribute(messageListenerKey).Set(messageListener);
                    var messageSender = new DotNettyMessageClientSender(_transportMessageEncoder, channel);
                    channel.GetAttribute(messageSenderKey).Set(messageSender);
                    channel.GetAttribute(origEndPointKey).Set(k);
                    var client = new TransportClient(messageSender, messageListener, _logger, _serviceExecutor);
                    return client;
                }
                )).Value;
        }
        catch
        {
            _clients.TryRemove(key, out var value);
            throw;
        }
    }

    #endregion Implementation of ITransportClientFactory

    #region Implementation of IDisposable

    /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
    public void Dispose()
    {
        foreach (var client in _clients.Values.Where(i => i.IsValueCreated))
        {
            (client.Value as IDisposable)?.Dispose();
        }
    }

    #endregion Implementation of IDisposable

    private static Bootstrap GetBootstrap()
    {
        var bootstrap = new Bootstrap();
        bootstrap
            .Channel<TcpSocketChannel>()
            .Option(ChannelOption.SoRcvbuf, 1024 * 100)
            .Option(ChannelOption.TcpNodelay, true)
            .Option(ChannelOption.Allocator, PooledByteBufferAllocator.Default)
            .Group(new MultithreadEventLoopGroup(1));

        return bootstrap;
    }

    protected class DefaultChannelHandler : ChannelHandlerAdapter
    {
        private readonly DotNettyTransportClientFactory _factory;

        public DefaultChannelHandler(DotNettyTransportClientFactory factory)
        {
            this._factory = factory;
        }

        #region Overrides of ChannelHandlerAdapter

        public override void ChannelInactive(IChannelHandlerContext context)
        {
            _factory._clients.TryRemove(context.Channel.GetAttribute(origEndPointKey).Get(), out var value);
        }

        public override void ChannelRead(IChannelHandlerContext context, object message)
        {
            var transportMessage = message as TransportMessage;

            var messageListener = context.Channel.GetAttribute(messageListenerKey).Get();
            var messageSender = context.Channel.GetAttribute(messageSenderKey).Get();
            messageListener.OnReceived(messageSender, transportMessage);
        }

        #endregion Overrides of ChannelHandlerAdapter
    }
}`
fanliang11 commented 6 years ago

你试试再设置下Socket.SendBufferSize,通过以下代码设置 .Option(ChannelOption.SoSndbuf, 1024*100)

daofenglg commented 6 years ago

还是不可以,我跟了一下程序 服务端的数据已经返回大概在40K左右,但在客户端没有收到数据 断点:Surging.Core\Surging.Core.DotNetty\DotNettyTransportClientFactory.cs->ChannelRead 没有进入,程序最后在TransportClient->RegisterResultCallbackAsync->var result = await task.Task; 最后由于超时退出 WithCancellation

fanliang11 commented 6 years ago

你不要调试啊,这是异步的。你先看下能不能调用

daofenglg commented 6 years ago

断点去掉,还是不可以正常返回

daofenglg commented 6 years ago

我在类(DefaultChannelHandler)中添加了方法 public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) { Console.WriteLine("Exception: " + exception); context.CloseAsync(); } 捕获到的异常为negative pre-adjustment length field: -32120 at DotNetty.Codecs.LengthFieldBasedFrameDecoder.Decode(IChannelHandlerContext context, IByteBuffer input) at DotNetty.Codecs.LengthFieldBasedFrameDecoder.Decode(IChannelHandlerContext context, IByteBuffer input, List1 output) at DotNetty.Codecs.ByteToMessageDecoder.CallDecode(IChannelHandlerContext context, IByteBuffer input, List1 output) at DotNetty.Codecs.ByteToMessageDecoder.ChannelRead(IChannelHandlerContext context, Object message) at DotNetty.Transport.Channels.AbstractChannelHandlerContext.InvokeChannelRead(Object msg)

fanliang11 commented 6 years ago

我晚上来测试下,明天答复您

daofenglg commented 6 years ago

好的,谢谢您,您辛苦了

fanliang11 commented 6 years ago

不辛苦,有人用我非常高兴,我知道的就有几个,这几天都在提问题

896722590 commented 6 years ago

@fanliang11 同样关注这个问题。我处理的是一些格点数据,经过lz4压缩后,还有4M左右。是修改 options.Limits.MaxRequestBufferSize 还是 options.Limits.MaxResponseBufferSize

还是bodyszie

fanliang11 commented 6 years ago

这是dotnetty 0.4.7版本的bug,最新版本已经修复,经过测试发送40mb可以成功,这是我之前的提问https://github.com/Azure/DotNetty/issues/341