edisona / rcf-cpp

Automatically exported from code.google.com/p/rcf-cpp
1 stars 0 forks source link

Problems with NamedPipe client/sever #32

Open GoogleCodeExporter opened 9 years ago

GoogleCodeExporter commented 9 years ago
Below are the source code of the server and client using NamedPipeEndpoint.

When there is only 1 client everything works just OK. But when I try to start 
second client, either client.connect() or ssPtr->beginSubscribe fails with the 
following exceptions:

On calling client.connect() in the client code:
Failed to connect: [25] Server side object not found. Service: I_TestCMD. 
Interface: I_TestCMD.

and on calling .beginSubscribe()
Server side object not found. Service: I_RequestSubscription. Interface: I_Reque
stSubscription.

There are no problems when using TcpEndpoint

///////////////////////////////
// Server source code
// server.cpp

class testCMD
{
public:

    DWORD connect()
    {
        cout << "server connect has been called" << endl;
        return 0;
    }

    BOOL processCmd(RCF::ByteBuffer s)
    {
        cout << s.getPtr() << endl;
        return TRUE;
    }
    void Notify(const std::string &s)
    {
    }
};

RCF_BEGIN(I_TestCMD, "I_TestCMD")
    RCF_METHOD_R1(BOOL, processCmd, RCF::ByteBuffer)
    RCF_METHOD_R0(DWORD, connect)
    RCF_METHOD_V1(void, Notify, const std::string &)
RCF_END(I_TestCMD)

class RCF_SERVER 
{
public:
    RCF_SERVER(const std::string &pipeName);
    ~RCF_SERVER();

    static void clientConnect(RCF::RcfSession & rcfSession, 
        const std::string & publisherName)
    {
        cout << "Client has been connected." << endl;
    }

    static void clientDisconnect(RCF::RcfSession & rcfSession, 
        const std::string & publisherName)
    {
        cout << "Client has been disconnected" << endl;
    }

    void publish(const std::string &s)
    {
        mPublishingServicePtr->publish( (I_TestCMD *) 0).Notify(s);
    }

private:
    RCF::PublishingServicePtr mPublishingServicePtr;
    RCF::NamedPipeEndpoint *ep;
    RCF::RcfServer *server;
};

RCF_SERVER::RCF_SERVER(const std::string &pipeName)
{
    ep = new RCF::NamedPipeEndpoint(util::toTstring(pipeName));

    server = new RCF::RcfServer(*ep);
    server->getServerTransport().setConnectionLimit(100);
    std::size_t targetThreadCount = 2;
    std::size_t maxThreadCount = 100;
    boost::uint32_t threadIdleTimeoutMs = 30*1000;

    RCF::DynamicThreadPoolPtr dynamicThreadPoolPtr( 
        new RCF::DynamicThreadPool(
        targetThreadCount, 
        maxThreadCount,
        threadIdleTimeoutMs) );

    server->setPrimaryThreadManager(dynamicThreadPoolPtr);

    RCF::PublishingServicePtr psPtr(new RCF::PublishingService());
    server->addService(psPtr);
    mPublishingServicePtr = psPtr;

    testCMD *cmds = new testCMD();
    server->bind<I_TestCMD>(*cmds);
    server->start();

    psPtr->setOnConnectCallback( 
        boost::bind(clientConnect, _1, _2) );

    psPtr->setOnDisconnectCallback( 
        boost::bind(clientDisconnect, _1, _2) );

    psPtr->beginPublish<I_TestCMD>();
}

RCF_SERVER::~RCF_SERVER()
{
    delete server;
}

int _tmain(int argc, _TCHAR* argv[])
{
    RCF::RcfInitDeinit rcfinitDeinit;

    RCF_SERVER server("TestPipe");
    int iCounter = 0;
    while(1)
    {
        if(iCounter % 5 == 0)
        {
            server.publish("Hello there!!!");
            Sleep(1000);
        }
        iCounter++;

        Sleep(1000);
    }

    return 0;
}

///////////////////////////////
// Client source code
// client.cpp

class testCMD
{
public:

    DWORD connect()
    {
        cout << "client connect has been called" << endl;
        return 0;
    }

    BOOL processCmd(RCF::ByteBuffer s)
    {
        cout << s.getPtr() << endl;
        return TRUE;
    }
    void Notify(const std::string &s)
    {
        cout << "Client was notified with: " << s << endl;
    }
};

RCF_BEGIN(I_TestCMD, "I_TestCMD")
    RCF_METHOD_R1(BOOL, processCmd, RCF::ByteBuffer);
    RCF_METHOD_R0(DWORD, connect);
    RCF_METHOD_V1(void, Notify, const std::string &)
RCF_END(I_TestCMD)

int _tmain(int argc, _TCHAR* argv[])
{
    RCF::RcfInitDeinit rcfinitDeinit;

#ifdef BOOST_WINDOWS
    RCF::tstring pipeName = RCF_T("TestPipe");
#else
    RCF::tstring pipeName = RCF_TEMP_DIR "TestPipe";
#endif

    RCF::setDefaultConnectTimeoutMs(3000);
    RCF::NamedPipeEndpoint *ep = new RCF::NamedPipeEndpoint(pipeName);

    RcfClient<I_TestCMD> client(*ep);
    testCMD cmds;

    DWORD res = 0;
tryconnect_again:
    try
    {
        res = client.connect();  // exception may occur here on running more one client
        cout << "Received handle: " << res << endl;
    }
    catch(const RCF::Exception &e)
    {
        Sleep(3000);
        cout << "Failed to connect: [" << e.getErrorId() << "] " << e.getErrorString() << endl;
        goto tryconnect_again;
    }

    RCF::RcfServer subscriber(*ep);
    RCF::SubscriptionServicePtr ssPtr(new RCF::SubscriptionService());
    subscriber.addService(ssPtr);
    subscriber.start();

try_subscribe_again:
    try
    {
        ssPtr->beginSubscribe<I_TestCMD>(cmds, *ep); // exception may occur here on running more than one client
        cout << "Subscribed OK" << endl;
    }
    catch(const RCF::Exception &e)
    {
        Sleep(500);
        cout << e.getErrorString() << endl;
        goto try_subscribe_again;
    }

    while(1)
    {
        Sleep(1000);  
        try
        {      // trying to flood the server
            res = client.connect();
            cout << "connected again" << endl;
        }
        catch(const RCF::Exception &e)
        {
            cout << "Failed to connect: [" << e.getErrorId() << "] " << e.getErrorString() << endl;
            goto tryconnect_again;
        }
    }

    return 0;
}

Original issue reported on code.google.com by zoltan.b...@gmail.com on 20 Dec 2010 at 3:43

GoogleCodeExporter commented 9 years ago
The subscribing RcfServer on your client, should not be listening on any 
pipe... What's happening is that your second client is making calls to the 
server in the first client, rather than to the publishing server.

Change your client code to look like this:

    //RCF::RcfServer subscriber(*ep);
    RCF::RcfServer subscriber( RCF::NamedPipeEndpoint("") );

, and you should be fine.

Original comment by jarl.lin...@gmail.com on 21 Dec 2010 at 4:42

GoogleCodeExporter commented 9 years ago
Thanks, it really did the work.

Quick question, if I want to set publisher/subscriber communication over SSL 
using OpenSSL, do I have to set additional certificate for subscription server 
on the client side?

Thanks.

Original comment by zoltan.b...@gmail.com on 21 Dec 2010 at 7:04

GoogleCodeExporter commented 9 years ago
BTW, is it possible at all to set communication between publisher/subscriber 
strictly over SSL?
Thanks.

Original comment by zoltan.b...@gmail.com on 21 Dec 2010 at 12:48

GoogleCodeExporter commented 9 years ago
Currently publish/subscribe doesn't support transport filters. If you need to 
make calls from a server to a client over an encrypted connections, you can use 
server-to-client callbacks, and configure SSL on the callback connection.

There is a section on server-to-client callbacks in the User Guide:

http://deltavsoft.com/RcfUserGuide/1.2/rcf_user_guide/Bidirectional.html#rcf_use
r_guide.Bidirectional.ServerCallbacks

Original comment by jarl.lin...@gmail.com on 22 Dec 2010 at 12:37

GoogleCodeExporter commented 9 years ago
Sorry, that link is old... It should be:

http://deltavsoft.com/w/RcfUserGuide/1.3/rcf_user_guide/ServerCallbacks.html

Original comment by jarl.lin...@gmail.com on 22 Dec 2010 at 7:19

GoogleCodeExporter commented 9 years ago
1. Do you have any plans to provide transport filters for publisher/subscriber 
services?

2. Can you please recommend correct way of controlling client connections? I 
want my server to be able to track how many active connections it has and 
status of those connections(like pinging the clients, without adding server 
implementation in clients)? I looked at the Ping and PingBackService, but if 
I'm not mistaken that's all provided to check the connection from the client 
side?

Thanks.

Original comment by zoltan.b...@gmail.com on 22 Dec 2010 at 1:41

GoogleCodeExporter commented 9 years ago

1) Pub/sub with transport filters is on the list of features to implement, but 
I can't say when it will be done.

2) On the server side, you can keep track of client connections by using client 
disconnect notifications:

http://deltavsoft.com/w/RcfUserGuide/1.3/rcf_user_guide/FAQ.html#rcf_user_guide.
FAQ.Programming.ClientDisconnect

So your application can be notified whenever clients connect and disconnect, 
and keep track of the number of active connections, and any data you want to 
associate with them.

Normally you can't ping a client connection from the server side. If the client 
process closes the connection (which will happen if the client crashes or 
exits), then the server detects it and destroys the corresponding RcfSession, 
which your application can hook into, as above. 

There is also a SessionTimeoutService that will kill off client connections 
that have been idle for some time.

If you need to actually ping clients from your server, you would need to use 
server-to-client callback connections.

Original comment by jarl.lin...@gmail.com on 31 Dec 2010 at 2:13