fanliang11 / surging

Surging is a micro-service engine that provides a lightweight, high-performance, modular RPC request pipeline. support Event-based Asynchronous Pattern and reactive programming ,The service engine supports http, TCP, WS,Grpc, Thrift,Mqtt, UDP, and DNS protocols. It uses ZooKeeper and Consul as a registry, and integrates it. Hash, random, polling, Fair Polling as a load balancing algorithm, built-in service governance to ensure reliable RPC communication, the engine contains Diagnostic, link tracking for protocol and middleware calls, and integration SkyWalking Distributed APM
MIT License
3.24k stars 923 forks source link

TCP network components that support configuring properties #424

Open fanliang11 opened 2 years ago

fanliang11 commented 2 years ago

The following code is to create a TCP network component with configuration properties and configure the rule engine script to decode messages

///The following code is to create a TCP network component with configuration properties and configure the rule engine script to ///decode messages
      var config =new Dictionary<string, object>();
            config.Add("script", @"parser.Fixed(4).Handler(""function(buffer){
                    var buf = BytesUtils.Slice(buffer,1,4);
                    parser.Fixed(buffer.ReadableBytes).Result(buf);
             }"").Handler(""function(buffer){parser.Result(buffer).Complete();}"")");
           var network=  serviceProvider.ServiceProvoider.GetInstances<INetworkProvider<TcpServerProperties>>().CreateNetwork(new TcpServerProperties
           {
               ParserType = PayloadParserType.Script,
               PayloadType = PayloadType.String,
               Host = "127.0.0.1",
               Port = 322,
               ParserConfiguration = config
           });
            network.StartAsync();

The following code is used to create a TCP network component with configuration properties and configure direct to decode messages

         var network1 = serviceProvider.ServiceProvoider.GetInstances<INetworkProvider<TcpServerProperties>>().CreateNetwork(new TcpServerProperties
            {
                ParserType = PayloadParserType.Direct,
                PayloadType = PayloadType.String,
                Host = "127.0.0.1",
                Port = 321 
            });
            network1.StartAsync();

There are also two types of payload parser: fixedlength and delimited

Create tcp service for message processing

 public class TcpService : TcpBehavior, ITcpService
    { 
        private readonly IDeviceProvider _deviceProvider;
        public TcpService(IDeviceProvider deviceProvider)
        {
            _deviceProvider = deviceProvider;
        }

        public override void Load(string clientId,TcpServerProperties tcpServerProperties)
        { 
         var deviceStatus =_deviceProvider.IsConnected(clientId);

            this.Parser.HandlePayload().Subscribe(buffer=>ParserBuffer(buffer));
        }

        public override void DeviceStatusProcess(DeviceStatus status, string clientId, TcpServerProperties tcpServerProperties)
        {
            //throw new NotImplementedException();
        }

        public async Task ParserBuffer(IByteBuffer buffer)
        {
            List<string> result = new List<string>();
            while (buffer.ReadableBytes > 0)
            {
                result.Add(buffer.ReadString(this.Parser.GetNextFixedRecordLength(),
                    Encoding.GetEncoding("gb2312")));
            }

            //  var str= buffer.ReadString(buffer.ReadableBytes, Encoding.GetEncoding("gb2312"));

            var byteBuffer=  Unpooled.Buffer();
            byteBuffer.WriteString("\r\n", Encoding.UTF8); 
            byteBuffer.WriteString("processing complete", Encoding.UTF8);
            await Sender.SendAndFlushAsync(byteBuffer);
            buffer.Release();
            //  await Sender.SendAndFlushAsync("message received",Encoding.UTF8);
            this.Parser.Close(); 
        }

    }

script parser test 图片

direct parser test 图片