warmcat / libwebsockets

canonical libwebsockets.org networking library
https://libwebsockets.org
Other
4.78k stars 1.49k forks source link

Problem connecting to two clients at different address/port using lws #1029

Closed readysteadygo2006 closed 7 years ago

readysteadygo2006 commented 7 years ago

Hi,

I'm trying to use lws to connect to two clients, I want to have a different callback for each client, so I was using the protocol mechanism to achieve this, however the connectivity achieved is intermittent. Specifically if both servers are available when the code is run, then connections will be made, but if a server is terminated and then recreated, no attempt will be made to reconnect to the terminated server, as the code never detects that the connection has terminated.

I have two attempts, the original using lws_client_connect, but then tried lws_client_connect_via_info as lws_client_connect is depreciated. Both give the same result, can you help!

First(lws_client_connect): void lws_mobile_thread(void in) { uint32_t target = ((uint32_t)in); char* targetAddress; uint32_t targetPort;

struct lws_context_creation_info info;
memset( &info, 0, sizeof(info) );

info.port = CONTEXT_PORT_NO_LISTEN;
info.protocols = protocols;
info.gid = -1;
info.uid = -1;

struct lws_context *context = lws_create_context( &info );

time_t old = 0;
while( 1 )
{
    struct timeval tv;
    gettimeofday( &tv, NULL );

    /* Connect if we are not connected to the server. */
    if( !web_socket_mobile && tv.tv_sec != old )
    {
        cout << "connecting: " << sysConfig.mobileIP << ":" << sysConfig.mobilePort << endl;
        web_socket_mobile = lws_client_connect( /* context = */ context,
                                         /* address =*/  sysConfig.mobileIP,
                                         /* port = */ sysConfig.mobilePort,
                                         /* ssl_connection = */ 0,
                                         /* path = */ "/",
                                         /* host = */ lws_canonical_hostname( context ),
                                         /* origin = */ "mobile",
                                         /* protocol = */ protocols[PROTOCOL_MOBILE].name,
                                         /* ietf_version_or_minus_one = */ -1 );
    }

    if( !web_socket_server && tv.tv_sec != old )
    {
        cout << "connecting: " << sysConfig.serverIP << ":" << sysConfig.serverPort << endl;
        web_socket_server = lws_client_connect( /* context = */ context,
                                         /* address =*/  sysConfig.serverIP,
                                         /* port = */ sysConfig.serverPort,
                                         /* ssl_connection = */ 0,
                                         /* path = */ "/",
                                         /* host = */ lws_canonical_hostname( context ),
                                         /* origin = */ "server",
                                         /* protocol = */ protocols[PROTOCOL_SERVER].name,
                                         /* ietf_version_or_minus_one = */ -1 );
    }

    if( tv.tv_sec != old )
    {
        //Send a random number to the server every second. 
        lws_callback_on_writable( web_socket_server );
        lws_callback_on_writable( web_socket_mobile );
        old = tv.tv_sec;
    }

    lws_service( context, 250 );
}

lws_context_destroy( context );

return 0;

}

second attempt (lws_client_connect_via_info):

void lws_mobile_thread(void in) { uint32_t target = ((uint32_t)in); char* targetAddress; uint32_t targetPort; unsigned int rl_dumb = 0, rl_mirror = 0, do_ws = 1, pp_secs = 0, do_multi = 0;

struct lws_context_creation_info info;
struct lws_client_connect_info connInfo;
memset( &info, 0, sizeof(info) );

info.port = CONTEXT_PORT_NO_LISTEN;
info.protocols = protocols;
info.gid = -1;
info.uid = -1;

struct lws_context *context = lws_create_context( &info );

time_t old = 0;
while( 1 )
{
    struct timeval tv;
    gettimeofday( &tv, NULL );

    /* Connect if we are not connected to the server. */
    cout << "web_socket_mobile: " << web_socket_mobile << endl;
    if( !web_socket_mobile && tv.tv_sec != old )
    {
        cout << "connecting: " << sysConfig.mobileIP << ":" << sysConfig.mobilePort << endl;

        connInfo.port = sysConfig.mobilePort;
        connInfo.path = "/";
        connInfo.address = sysConfig.mobileIP;
        connInfo.host = "192.168.1.110";
        connInfo.origin = "192.168.1.110";
        connInfo.protocol = protocols[PROTOCOL_MOBILE].name;
        connInfo.userdata = NULL;
        connInfo.context = context;
        connInfo.pwsi = &web_socket_mobile;

        /* Create initial connection. */
        lws_client_connect_via_info(&connInfo);

    //  web_socket_mobile = lws_client_connect( /* context = */ context,
    //                                   /* address =*/  sysConfig.mobileIP,
    //                                   /* port = */ sysConfig.mobilePort,
    //                                   /* ssl_connection = */ 0,
    //                                    path =  "/",
    //                                   /* host = */ lws_canonical_hostname( context ),
    //                                   /* origin = */ "mobile",
    //                                   /* protocol = */ protocols[PROTOCOL_MOBILE].name,
    //                                   /* ietf_version_or_minus_one = */ -1 );
    }

    cout << "web_socket_server: " << web_socket_server << endl;
    if( !web_socket_server && tv.tv_sec != old )
    {
        cout << "connecting: " << sysConfig.serverIP << ":" << sysConfig.serverPort << endl;

        connInfo.port = sysConfig.serverPort;
        connInfo.path = "/";
        connInfo.address = sysConfig.serverIP;
        connInfo.host = "192.168.1.110";
        connInfo.origin = "192.168.1.110";
        connInfo.protocol = protocols[PROTOCOL_SERVER].name;
        connInfo.userdata = NULL;
        connInfo.context = context;
        connInfo.pwsi = &web_socket_server;

        /* Create initial connection. */
        lws_client_connect_via_info(&connInfo);

        // web_socket_server = lws_client_connect( /* context = */ context,
        //                                  /* address =*/  sysConfig.serverIP,
        //                                  /* port = */ sysConfig.serverPort,
        //                                  /* ssl_connection = */ 0,
        //                                   path =  "/",
        //                                  /* host = */ lws_canonical_hostname( context ),
        //                                  /* origin = */ "server",
        //                                  /* protocol = */ protocols[PROTOCOL_SERVER].name,
        //                                  /* ietf_version_or_minus_one = */ -1 );
    }

//  if( tv.tv_sec != old )
//  {
//      //Send a random number to the server every second. 
//      lws_callback_on_writable( web_socket_server );
//      lws_callback_on_writable( web_socket_mobile );
//      old = tv.tv_sec;
//  }

    lws_service( context, 500 );
}

lws_context_destroy( context );

return 0;

}

The common callback and protocol definitions are shown attached below:

static int callback_mobile( struct lws wsi, enum lws_callback_reasons reason, void user, void in, size_t len ) { int n; string output; int maxCount = 0; char outputCStr; int outputLen; string *cmdString;

if (reason == LWS_CALLBACK_CLIENT_RECEIVE) {
    cmdString = new string((char*)in);
}

switch( reason )
{
    case LWS_CALLBACK_CLIENT_ESTABLISHED:
        lws_callback_on_writable( wsi );
        break;

    case LWS_CALLBACK_CLIENT_RECEIVE:
        lwsl_notice("callback_mobile: in LWS_CALLBACK_CLIENT_RECEIVE\n");
        lwsl_notice("callback_mobile: %s\n", (char*)in);
        cmdQueueMobile.push(*cmdString);
        lws_rx_flow_control(wsi, 0);
        lws_callback_on_writable(wsi);

        break;

    case LWS_CALLBACK_CLIENT_WRITEABLE:
    {
        lwsl_notice("callback_mobile: in LWS_CALLBACK_CLIENT_WRITEABLE:%d\n", dataQueueMobile.size());
        if((dataQueueMobile.size() > 0)) {
            output = dataQueueMobile.front();
            dataQueueMobile.pop();
            outputCStr = output.c_str();
            outputLen = strlen(outputCStr);
            lwsl_notice("callback_mobile: %s\n", (char*)outputCStr);
            unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + MAX_ECHO_PAYLOAD + LWS_SEND_BUFFER_POST_PADDING];
            unsigned char *p = &buf[LWS_SEND_BUFFER_PRE_PADDING];
            size_t n = sprintf( (char *)p, "%s", outputCStr );
            n = lws_write(wsi, p, n, LWS_WRITE_TEXT);
            if (n < 0) {
                lwsl_err("ERROR %d writing to socket, hanging up\n", n);
                return 1;
            }
        }
        lws_rx_flow_control(wsi, 1);
        // unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + MAX_ECHO_PAYLOAD + LWS_SEND_BUFFER_POST_PADDING];
        // unsigned char *p = &buf[LWS_SEND_BUFFER_PRE_PADDING];
        // size_t n = sprintf( (char *)p, "%u", rand() );
        // lws_write( wsi, p, n, LWS_WRITE_TEXT );
        break;
    }

    case LWS_CALLBACK_CLOSED:
    case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
        lwsl_notice("***callback_mobile: in LWS_CALLBACK_CLOSED\n");
        web_socket_mobile = NULL;
        break;

    default:
        break;
}

return 0;

}

static int callback_server( struct lws wsi, enum lws_callback_reasons reason, void user, void in, size_t len ) { int n; string output; int maxCount = 0; char outputCStr; int outputLen; string *cmdString;

if (reason == LWS_CALLBACK_CLIENT_RECEIVE) {
    cmdString = new string((char*)in);
}

switch( reason )
{
    case LWS_CALLBACK_CLIENT_ESTABLISHED:
        lws_callback_on_writable( wsi );
        break;

    case LWS_CALLBACK_CLIENT_RECEIVE:
        lwsl_notice("callback_server: in LWS_CALLBACK_CLIENT_RECEIVE\n");
        lwsl_notice("callback_server: %s\n", (char*)in);
        cmdQueueServer.push(*cmdString);
        lws_rx_flow_control(wsi, 0);
        lws_callback_on_writable(wsi);

        break;

    case LWS_CALLBACK_CLIENT_WRITEABLE:
    {
        lwsl_notice("callback_server: in LWS_CALLBACK_CLIENT_WRITEABLE: %d\n", dataQueueServer.size());
        if((dataQueueServer.size() > 0)) {
            output = dataQueueServer.front();
            dataQueueServer.pop();
            outputCStr = output.c_str();
            outputLen = strlen(outputCStr);
            lwsl_notice("callback_server: %s\n", (char*)outputCStr);
            unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + MAX_ECHO_PAYLOAD + LWS_SEND_BUFFER_POST_PADDING];
            unsigned char *p = &buf[LWS_SEND_BUFFER_PRE_PADDING];
            size_t n = sprintf( (char *)p, "%s", outputCStr );
            n = lws_write(wsi, p, n, LWS_WRITE_TEXT);
            if (n < 0) {
                lwsl_err("callback_server:ERROR %d writing to socket, hanging up\n", n);
                return 1;
            }
        }
        lws_rx_flow_control(wsi, 1);
        // unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + MAX_ECHO_PAYLOAD + LWS_SEND_BUFFER_POST_PADDING];
        // unsigned char *p = &buf[LWS_SEND_BUFFER_PRE_PADDING];
        // size_t n = sprintf( (char *)p, "%u", rand() );
        // lws_write( wsi, p, n, LWS_WRITE_TEXT );
        break;
    }

    case LWS_CALLBACK_CLOSED:
    case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
        lwsl_notice("***callback_server: in LWS_CALLBACK_CLOSED\n");
        web_socket_server = NULL;
        break;

    default:
        break;
}

return 0;

}

enum protocols { PROTOCOL_SERVER = 0, PROTOCOL_MOBILE = 1, PROTOCOL_COUNT };

static struct lws_protocols protocols[] = { { "server-protocol", callback_server, 0, MAX_ECHO_PAYLOAD, }, { "mobile-protocol", callback_mobile, 0, MAX_ECHO_PAYLOAD, }, { NULL, NULL, 0, 0 } / terminator / };

lws-team commented 7 years ago

You dumped a lot of code, but basically you're saying with your implementation

if a server is terminated and then recreated, no attempt will be made to reconnect to the terminated server, as the code never detects that the connection has terminated

However with the test client, I can see the CLOSE notification comes and it attempts to reconnect, successfully if I start the server again. To reproduce, in one terminal window

 $ libwebsockets-test-server

and in another

 $ libwebsockets-test-client 127.0.0.1 -l

The -l makes the test client create "longlived" client sessions to make it easier to see if the client is hanging up and reconnecting or reacting to the server going down.

The client will say

[2017/09/22 05:45:25:9942] NOTICE: dumb: connecting
[2017/09/22 05:45:25:9942] NOTICE: lws_client_connect_2: 0x211cb50: address 127.0.0.1
[2017/09/22 05:45:25:9945] NOTICE: mirror: connecting
[2017/09/22 05:45:25:9945] NOTICE: lws_client_connect_2: 0x21dc7f0: address 127.0.0.1
[2017/09/22 05:45:25:9947] NOTICE: lws_client_connect_2: 0x211cb50: address 127.0.0.1
[2017/09/22 05:45:25:9947] NOTICE: lws_client_connect_2: 0x21dc7f0: address 127.0.0.1
[2017/09/22 05:45:25:9956] NOTICE: checking client ext permessage-deflate
[2017/09/22 05:45:25:9957] NOTICE: instantiating client ext permessage-deflate
[2017/09/22 05:45:25:9957] ERR:  Capping pmd rx to 128
[2017/09/22 05:45:25:9957] NOTICE: lws_extension_callback_pm_deflate: option set: idx 1, (null), len 0
[2017/09/22 05:45:25:9957] NOTICE: mirror: LWS_CALLBACK_CLIENT_ESTABLISHED

and then stay like that, until you ^C the server

[2017/09/22 05:45:29:6755] NOTICE: dumb: LWS_CALLBACK_CLOSED
[2017/09/22 05:45:29:6756] NOTICE: dumb: connecting
[2017/09/22 05:45:29:6756] NOTICE: lws_client_connect_2: 0x211cb50: address 127.0.0.1
[2017/09/22 05:45:29:6758] NOTICE: mirror: LWS_CALLBACK_CLOSED mirror_lifetime=469730
[2017/09/22 05:45:29:6758] NOTICE: lws_client_connect_2: 0x211cb50: address 127.0.0.1
[2017/09/22 05:45:29:6758] NOTICE: Connect failed errno=111
[2017/09/22 05:45:29:6758] ERR: CLIENT_CONNECTION_ERROR: dumb: connect failed
(etc)

First confirm the test server / client act the same on your platform. Then if so, align your code with those until your code also acts the same.

lws-team commented 7 years ago

Well, if you find anything I should do something about, let me know and I will reopen.