jgauffin / Griffin.Framework

Application framework for Business Applications
http://griffinframework.net
168 stars 63 forks source link

ChannelTcpListener MessageReceived not triggering #52

Closed CallMeAreks closed 9 years ago

CallMeAreks commented 9 years ago

I added the project via NuGet and copied and pasted the "Server" example as shown in the project website.

When I try to connect using a TCP Client for Android I receive the log related to the connection event but when I send text I do not get anything at all. No exceptions, the MessageReceived callback does not execute.

I tried setting the encoder/decoder as StringDecoder, StringEncoder (since text is the only thing I need) and nothing happens.

I downloaded the project and added as a normal reference and trying to debug the ProcessReadBytes method from StringDecoder I noticed that it never goes beyond this:

if (_bytesLeftForCurrentMsg != 0) continue;

I also noticed the _bytesLeftForCurrentMsg variable has 1969648996 as value which seems weird since I sent a little message...

Is there a known issue about this? What am I be doing wrong?

Tochemey commented 9 years ago

You are not implementing well the IMessagingListener. The MessageReceived event does fire when a message is sent by the connected client.

CallMeAreks commented 9 years ago

Can you give me a practical example please? I don't quite understand.

Tochemey commented 9 years ago

Hello Alejandro, I will provide some samples that you can study. These samples are extracted from a project I have built using the network library. Looking at your issue I think you have to implement yourself the ChannelTcpListener as I will be providing in my samples: The samples will have the following classes: CustomListener, CustomServer, ClientContext and CommandAsyncEvent (this one is a kind of way to send command asynchronously).

CustomerListener.cs

    public class CustomListener : IMessagingListener {
        private Logger _log = LogManager.GetCurrentClassLogger();
        private readonly ConcurrentStack<ITcpChannel> _channels = new ConcurrentStack<ITcpChannel>();
        private IBufferSlicePool _bufferPool;
        private ChannelTcpListenerConfiguration _configuration;
        private TcpListener _listener;
        private MessageHandler _messageReceived;
        private MessageHandler _messageSent;

        /// <summary>
        /// </summary>
        /// <param name="configuration"></param>
        public CustomListener(ChannelTcpListenerConfiguration configuration) {
            if (configuration == null) throw new ArgumentNullException("configuration");

            Configure(configuration);
            ChannelFactory = new TcpChannelFactory();
        }

        /// <summary>
        /// </summary>
        public CustomListener() {
            Configure(new ChannelTcpListenerConfiguration(() => new CustomDecoder(), () => new CustomEncoder());
            ChannelFactory = new TcpChannelFactory();
        }

        public event EventHandler<ClientConnectedEventArgs> ClientConnected;

        public event EventHandler<ClientDisconnectedEventArgs> ClientDisconnected;

        /// <summary>
        ///     An internal error occurred
        /// </summary>
        public event EventHandler<ErrorEventArgs> ListenerError = delegate { };

        public ITcpChannelFactory ChannelFactory { get; set; }

        /// <summary>
        ///     Port that the server is listening on.
        /// </summary>
        /// <remarks>
        ///     You can use port <c>0</c> in <see cref="Start" /> to let the OS assign a port. This method will then give you the
        ///     assigned port.
        /// </remarks>
        public int LocalPort
        {
            get
            {
                if (_listener == null)
                    return -1;

                return ((IPEndPoint) _listener.LocalEndpoint).Port;
            }
        }

        /// <summary>
        ///     Delegate invoked when a new message is received
        /// </summary>
        public MessageHandler MessageReceived { get { return _messageReceived; } set { _messageReceived = value ?? delegate { }; } }

        /// <summary>
        ///     Delegate invoked when a message has been sent to the remote end point
        /// </summary>
        public MessageHandler MessageSent { get { return _messageSent; } set { _messageSent = value ?? delegate { }; } }

        /// <summary>
        ///     Start this listener.
        /// </summary>
        /// <remarks>
        ///     This also pre-configures 20 channels that can be used and reused during the lifetime of
        ///     this listener.
        /// </remarks>
        /// <param name="address">Address to accept connections on</param>
        /// <param name="port">Port to use. Set to <c>0</c> to let the OS decide which port to use. </param>
        /// <seealso cref="LocalPort" />
        public virtual void Start(IPAddress address, int port) {
            if (port < 0)
                throw new ArgumentOutOfRangeException("port", port, "Port must be 0 or more.");
            if (_listener != null)
                throw new InvalidOperationException("Already listening.");

            _listener = new TcpListener(address, port);
            _listener.AllowNatTraversal(true);
            _listener.Start();

            for (var i = 0; i < 20; i++) {
                var decoder = _configuration.DecoderFactory();
                var encoder = _configuration.EncoderFactory();
                var channel = ChannelFactory.Create(_bufferPool.Pop(), encoder, decoder);
                _channels.Push(channel);
            }

            _listener.BeginAcceptSocket(OnAcceptSocket, null);
        }

        /// <summary>
        ///     Stop the listener.
        /// </summary>
        public virtual async void Stop() {
            // Let us do some cleaning
            foreach (ITcpChannel tcpChannel in _channels) await tcpChannel.CloseAsync();
            _channels.Clear();
            _listener.Stop();
        }

        /// <summary>
        ///     To allow the sub classes to configure this class in their constructors.
        /// </summary>
        /// <param name="configuration"></param>
        protected void Configure(ChannelTcpListenerConfiguration configuration) {
            _bufferPool = configuration.BufferPool;
            _configuration = configuration;
        }

        /// <summary>
        ///     A client has connected (nothing has been sent or received yet)
        /// </summary>
        /// <param name="channel">Channel which we created for the remote socket.</param>
        /// <returns></returns>
        protected virtual ClientConnectedEventArgs OnClientConnected(ITcpChannel channel) {
            var args = new ClientConnectedEventArgs(channel);
            ClientConnected(this, args);
            return args;
        }

        /// <summary>
        ///     A client has disconnected
        /// </summary>
        /// <param name="channel">Channel representing the client that disconnected</param>
        /// <param name="exception">
        ///     Exception which was used to detect disconnect (<c>SocketException</c> with status
        ///     <c>Success</c> is created for graceful disconnects)
        /// </param>
        protected virtual void OnClientDisconnected(ITcpChannel channel, Exception exception) { ClientDisconnected(this, new ClientDisconnectedEventArgs(channel, exception)); }

        /// <summary>
        ///     Receive a new message from the specified client
        /// </summary>
        /// <param name="source">Channel for the client</param>
        /// <param name="msg">Message (as decoded by the specified <see cref="IMessageDecoder" />).</param>
        protected virtual void OnMessage(ITcpChannel source, object msg) { _messageReceived(source, msg); }

        private void OnAcceptSocket(IAsyncResult ar) {
            try {
                var socket = _listener.EndAcceptSocket(ar);
                socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay, 1);
                socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Linger, new LingerOption(true, 1));
                ITcpChannel channel;
                if (!_channels.TryPop(out channel)) {
                    var decoder = _configuration.DecoderFactory();
                    var encoder = _configuration.EncoderFactory();
                    channel = ChannelFactory.Create(_bufferPool.Pop(), encoder, decoder);
                }

                channel.Disconnected += OnChannelDisconnect;
                channel.MessageReceived += OnMessage;
                channel.Assign(socket);

                var args = OnClientConnected(channel);
                if (!args.MayConnect) {
                    if (args.Response != null)
                        channel.Send(args.Response);
                    channel.Close();
                    return;
                }
            }
            catch (Exception exception) {
                ListenerError(this, new ErrorEventArgs(exception));
            }

            if (_listener != null) _listener.BeginAcceptSocket(OnAcceptSocket, null);
        }

        private void OnChannelDisconnect(ITcpChannel source, Exception exception) {
            OnClientDisconnected(source, exception);
            source.Cleanup();
            _channels.Push(source);
        }
    }

CustomServer.cs


    public class CustomServer {
        private readonly Logger _log = LogManager.GetCurrentClassLogger();
        private readonly ConcurrentDictionary<string, ClientContext> _clients = new ConcurrentDictionary<string, ClientContext>();
        private readonly CustomListener _listener;
        public event EventHandler<InboundClientEventArgs> ClientReady;

        public CustomServer() {
            var config = new ChannelTcpListenerConfiguration(() => new CustomDecoder(), () => new CustomEncoder());
            _listener = new FreeSwitchListener(config);
            _listener.MessageReceived += OnClientMessage;
            _listener.ClientConnected += OnClientConnect;
            _listener.ClientDisconnected += OnClientDisconnect;
        }

        /// <summary>
        ///     Port that we got assigned (or specified)
        /// </summary>
        public int LocalPort { get { return _listener.LocalPort; } }

        public void Start(IPAddress address, int port) { _listener.Start(address, port); }

        public void Stop() {
            _listener.Stop();
            _clients.Clear();
        }

        private async void OnClientConnect(object sender, ClientConnectedEventArgs e) {
            ITcpChannel channel = e.Channel;
            ClientContext context = new ClientContext(ref channel);
            try {
                var added = _clients.TryAdd(channel.ChannelId, context);
                if (added) {
                    bool ready = // do some checks here to get the connected client ready. It is up to you 
                    if (!ready) {
                        await context.Close();
                        return;
                    }
                }
                if (ClientReady != null) ClientReady(this, new InboundClientEventArgs(context));
            }
            catch (Exception) {
                if (channel != null) channel.Close();
            }
        }

        private void OnClientDisconnect(object sender, ClientDisconnectedEventArgs e) {
            ITcpChannel channel = e.Channel;
            Exception exception = e.Exception;
            ClientContext context;
            if (exception != null) {
                if (_clients.TryGetValue(channel.ChannelId, out context)) context.Error(channel, exception);
                return;
            }

            try { _clients.TryRemove(channel.ChannelId, out context); }
            catch (ArgumentNullException) {
                // ignored
            }
        }

        private void OnClientMessage(ITcpChannel channel, object message) {
            try {
                ClientContext context;
                var fetched = _clients.TryGetValue(channel.ChannelId, out context);
                if (fetched) context.MessageReceived(channel, message);
            }
            catch (Exception) {
                // ignored
            }
        }
    }

ClientContext.cs


    public delegate void ErrorHandler(ITcpChannel channel, Exception exception);

    public class ClientContext {
        private readonly ITcpChannel _channel;
        private readonly Logger _log = LogManager.GetCurrentClassLogger();
        private readonly ConcurrentQueue<object> _readItems = new ConcurrentQueue<object>();
        private readonly SemaphoreSlim _readSemaphore = new SemaphoreSlim(0, 1);
        private readonly ConcurrentQueue<CommandAsyncEvent> _requestsQueue;
        private readonly SemaphoreSlim _sendCompletedSemaphore = new SemaphoreSlim(0, 1);
        private readonly SemaphoreSlim _sendQueueSemaphore = new SemaphoreSlim(1, 1);
        private ConnectedCall _connectedCall;
        private Exception _sendException;

        public ClientContext(ref ITcpChannel channel) {
            _channel = channel;
            _channel.MessageSent += OnMessageSent;
            _channel.ChannelFailure += OnError;
            _requestsQueue = new ConcurrentQueue<CommandAsyncEvent>();
            Error += OnError;
            MessageReceived += OnMessageReceived;
        }

        /// <summary>
        ///     Can be compared with a session in a web server.
        /// </summary>
        public IChannelData ChannelData { get { return _channel.Data; } }

        /// <summary>
        ///     Connection State
        /// </summary>
        public bool Connected { get { return _channel != null && _channel.IsConnected; } }

        /// <summary>
        ///     Something failed during processing.
        /// </summary>
        public ErrorHandler Error { get; private set; }

        /// <summary>
        ///     Channel received a new message
        /// </summary>
        public MessageHandler MessageReceived { get; private set; }

        /// <summary>
        ///     Address to the currently connected client.
        /// </summary>
        public EndPoint RemoteEndPoint { get { return _channel.RemoteEndpoint; } }

        /// <summary>
        ///     Close()
        /// </summary>
        public async Task Close() {
            if (_channel == null)
                return;
            _connectedCall = null;
            await _channel.CloseAsync();
        }

        public async Task<object> Send(object command) {
            if (!Connected)
                return null;
            // Send command
            EnqueueCommand(command);
            await SendAsync(command);
            await _readSemaphore.WaitAsync(TimeSpan.FromMilliseconds(-1), CancellationToken.None);
            object item;
            var gotItem = _readItems.TryDequeue(out item);
            if (!gotItem)
                throw new ChannelException("Was signalled that something have been recieved, but found nothing in the in queue");
            // signal so that more items can be read directly
            if (_readItems.Count > 0)
                _readSemaphore.Release();

            if (item == null) return null;
            if (_requestsQueue.Count <= 0) return null;
            CommandAsyncEvent event2;
            if (!_requestsQueue.TryDequeue(out event2)) return null;
            if (event2 == null) return null;
            if (!event2.Command.Equals(command)) return null;
            event2.Complete(item);
            return await event2.Task;

        }

        protected CommandAsyncEvent EnqueueCommand(object command) {
            var event2 = new CommandAsyncEvent(command);
            _requestsQueue.Enqueue(event2);
            return event2;
        }

        /// <summary>
        ///     SendAsync(). It is used internally to send command to FreeSwitch
        /// </summary>
        /// <param name="message">the command to send</param>
        /// <returns>Async task</returns>
        protected async Task SendAsync(object message) {
            if (message == null) throw new ArgumentNullException("message");
            if (_sendException != null) {
                var ex = _sendException;
                _sendException = null;
                throw new AggregateException(ex);
            }
            await _sendQueueSemaphore.WaitAsync();
            _channel.Send(message);
            await _sendCompletedSemaphore.WaitAsync();
            _sendQueueSemaphore.Release();
        }

        private void OnError(ITcpChannel channel, Exception exception) {
            if (exception != null) {
                SocketException socketException = exception as SocketException;
                if (socketException != null) {
                    var soke = socketException;
                    if (soke.SocketErrorCode == SocketError.Shutdown) {
                        _log.Warn("Channel [#{0}--Address={1}] Socket already closed", channel.ChannelId, channel.RemoteEndpoint);
                        return;
                    }
                }
            }
            _log.Error(exception);
        }

        private async void OnMessageReceived(ITcpChannel channel, object message) {
            // Here we validate the channel.
            if (!channel.ChannelId.Equals(_channel.ChannelId)) return;

            // Handle the decoded message here
        }

        private void OnMessageSent(ITcpChannel channel, object message) { _sendCompletedSemaphore.Release(); }
    }

CommandAsyncEvent.cs


    /// <summary>
    /// Used to send command
    /// </summary>
    public class CommandAsyncEvent {
        private readonly object _command;
        private readonly TaskCompletionSource<object> _source;

        public CommandAsyncEvent(object command)
        {
            _command = command;
            _source = new TaskCompletionSource<object>();
        }

        /// <summary>
        /// The FreeSwitch command to  send
        /// </summary>
        public object Command { get { return _command; } }

        /// <summary>
        /// The response
        /// </summary>
        public Task<object> Task { get { return _source.Task; } }

        public void Complete(object response) { _source.TrySetResult(response); }

    }
Tochemey commented 9 years ago

Hello Alejandro, please do revert to us that you are ok with the solution provided so that we know whether we need to close the issue or not.

CallMeAreks commented 9 years ago

I used a different library that worked out of the box for me. Haven't tried your solution yet but I'm very grateful for your kindness and willingness to help me.

I'll close the issue then.

Once again thank you very much!