ow2-proactive / programming

ProActive Programming library
http://proactive.activeeon.com/
GNU Affero General Public License v3.0
7 stars 19 forks source link

When using PAMR if the router is restarted an existing client will never reconnect there should be an exception that notifies the application #1105

Closed activeeon-bot closed 8 years ago

activeeon-bot commented 12 years ago

Original issue created by Vladimir Bodnartchouk on 26, Sep 2012 at 11:30 AM - PROACTIVE-1231


Since the router does not support client reconnection after restart, the application hangs.

To reproduce the problem:

1- Start a router

2- Start a client app

3- Kill router

4- The client app will log the following: 69159@optimus - (link: WARN pamr.client) PAMR Router localhost/127.0.0.1:33647 is unreachable (Connection refused: connect). Will try to estalish a new tunnel in 2 seconds 69159@optimus - (link: WARN pamr.client) PAMR Router localhost/127.0.0.1:33647 is unreachable (Connection refused: connect). Will try to estalish a new tunnel in 2 seconds 69159@optimus - (link: WARN pamr.client) PAMR Router localhost/127.0.0.1:33647 is unreachable (Connection refused: connect). Will try to estalish a new tunnel in 4 seconds

5- Restart the router

6- The client app will hang with the following log 69159@optimus - (link: WARN pamr.client) PAMR Router localhost/127.0.0.1:33647 is unreachable (Router ID does not match. The router has probably been restarted. Disconnecting...). Will try to estalish a new tunnel in 32 seconds

From this point if the application tries to create a new active object there will be e: java.lang.NullPointerException at org.objectweb.proactive.api.PAActiveObject.newActive(PAActiveObject.java:438) at org.objectweb.proactive.api.PAActiveObject.newActive(PAActiveObject.java:157)

The application hangs if it initiates a communication between existing active objects since no exception is reported.

As a temporary raough and brute force solution, in order to be notified in case of such a failure it is possible to dynamically replace the PAMR client socket factory by a custom one that can intercept an exception with the message "Router ID does not match" and execute user code.

Example:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Socket;

import org.objectweb.proactive.Body;
import org.objectweb.proactive.InitActive;
import org.objectweb.proactive.api.PAActiveObject;
import org.objectweb.proactive.core.remoteobject.AbstractRemoteObjectFactory;
import org.objectweb.proactive.extensions.pamr.client.AgentImpl;
import org.objectweb.proactive.extensions.pamr.client.Tunnel;
import org.objectweb.proactive.extensions.pamr.remoteobject.PAMRRemoteObjectFactory;
import org.objectweb.proactive.extensions.pamr.remoteobject.util.socketfactory.PAMRSocketFactorySPI;

public class Main implements Serializable, InitActive {
    public Main() {}

    @Override
    public void initActivity(Body body) {
        System.out.println("Main.initActivity() url " + body.getUrl());
        // auto call toto = no half bodies
        Main m = (Main) PAActiveObject.getStubOnThis();
        m.toto();
    }

    public void toto() {
        System.out.println("Main.toto()");
    }

    public static void main(String[] args) {
        try {
            PAMRRemoteObjectFactory f = (PAMRRemoteObjectFactory) AbstractRemoteObjectFactory.getRemoteObjectFactory("pamr");
            final AgentImpl agent = (AgentImpl) f.getAgent();

            final Field privateField = AgentImpl.class.getDeclaredField("socketFactory");
            privateField.setAccessible(true);

            final PAMRSocketFactorySPI target = (PAMRSocketFactorySPI) privateField.get(agent);
            privateField.set(agent, new PAMRSocketFactorySPI() {
                @Override
                public Socket createSocket(String host, int port)
                        throws IOException {

                    final Socket targetSocket = target.createSocket(host, port);
                    Tunnel tunnel = new Tunnel(targetSocket);
                    try {
                        // Initiate a manual handhsake
                        Method m = agent.getClass().getDeclaredMethod("routerHandshake", Tunnel.class);
                        m.setAccessible(true);
                        m.invoke(agent, tunnel);
                    } catch (Throwable e) {
                        try {
                            tunnel.shutdown();
                        } catch (Exception ee) {}

                        if (e instanceof InvocationTargetException) {
                            Throwable targetException = ((InvocationTargetException) e)
                                    .getTargetException();
                            // detect bad router id
                            String msg = targetException.getMessage();
                            if (msg != null
                                    && msg.startsWith("Router ID does not match")) {
                                System.out.println(" BINGO !");

                                // EXECUTE USER CODE

                            }
                            throw new IOException(msg);
                        }

                        throw new IOException(e);
                    }
                    return targetSocket;
                }

                @Override
                public String getAlias() {
                    return target.getAlias();
                }
            });

        } catch (Exception e) {
            System.out.println("Main.main() Failed to get the local message routing agent " + e.getMessage());
        }

        try {
            Main m = PAActiveObject.newActive(Main.class, new Object[] {});
            System.out.println("Main.main()");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
activeeon-bot commented 12 years ago

Original comment posted by Vladimir Bodnartchouk on 26, Sep 2012 at 18:09 PM


The following example show how to auto-reconnect to a scheduler after a PAMR router restart (and scheduler restart) without restarting client application, this can be useful for long running applications that acts as scheduler client :

iimport java.io.IOException; import java.io.Serializable; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.Socket; import java.util.Hashtable; import java.util.concurrent.atomic.AtomicLong;

import org.objectweb.proactive.Body; import org.objectweb.proactive.InitActive; import org.objectweb.proactive.api.PAActiveObject; import org.objectweb.proactive.core.body.BodyMap; import org.objectweb.proactive.core.body.LocalBodyStore; import org.objectweb.proactive.core.remoteobject.AbstractRemoteObjectFactory; import org.objectweb.proactive.extensions.pamr.client.AgentImpl; import org.objectweb.proactive.extensions.pamr.client.Tunnel; import org.objectweb.proactive.extensions.pamr.protocol.MagicCookie; import org.objectweb.proactive.extensions.pamr.remoteobject.PAMRRemoteObjectFactory; import org.objectweb.proactive.extensions.pamr.remoteobject.util.socketfactory.PAMRSocketFactorySPI; import org.ow2.proactive.authentication.crypto.Credentials; import org.ow2.proactive.scheduler.common.Scheduler; import org.ow2.proactive.scheduler.common.SchedulerAuthenticationInterface; import org.ow2.proactive.scheduler.common.SchedulerConnection;

public class Main {

public static void main(String(link: ) args) {
    try {
        PAMRRemoteObjectFactory f = (PAMRRemoteObjectFactory) AbstractRemoteObjectFactory
                .getRemoteObjectFactory("pamr");
        final AgentImpl agent = (AgentImpl) f.getAgent();

        final Field privateField = AgentImpl.class
                .getDeclaredField("socketFactory");
        privateField.setAccessible(true);

        final Field routerIdField = AgentImpl.class
                .getDeclaredField("routerID");
        routerIdField.setAccessible(true);

        final Field agentIdField = AgentImpl.class
                .getDeclaredField("agentID");
        agentIdField.setAccessible(true);

        final Field requestIDGeneratorField = AgentImpl.class
                .getDeclaredField("requestIDGenerator");
        requestIDGeneratorField.setAccessible(true);

        final Field magicCookieField = AgentImpl.class
                .getDeclaredField("magicCookie");
        magicCookieField.setAccessible(true);

        final Field tunnelField = AgentImpl.class.getDeclaredField("t");
        tunnelField.setAccessible(true);

        final PAMRSocketFactorySPI target = (PAMRSocketFactorySPI) privateField
                .get(agent);
        privateField.set(agent, new PAMRSocketFactorySPI() {
            @Override
            public Socket createSocket(String host, int port)
                    throws IOException {

                final Socket targetSocket = target.createSocket(host, port);
                try {
                    routerIdField.set(agent, Long.MIN_VALUE);
                    agentIdField.set(agent, null);
                    AtomicLong al = (AtomicLong) requestIDGeneratorField
                            .get(agent);
                    al.set(0);
                    magicCookieField.set(agent, new MagicCookie());
                } catch (Exception eee) {
                }

                Tunnel tunnel = new Tunnel(targetSocket);
                try {
                    // Initiate a manual handhsake
                    Method m = agent.getClass().getDeclaredMethod(
                            "routerHandshake", Tunnel.class);
                    System.out.println("createSocket() --> " + m.getName());
                    m.setAccessible(true);
                    m.invoke(agent, tunnel);
                    // Set new tunnel
                    tunnelField.set(agent, tunnel);
                    System.out.println("MY NEW ID IS:  --> "
                            + agent.getAgentID());
                } catch (Throwable e) {
                    try {
                        tunnel.shutdown();
                    } catch (Exception ee) {
                    }
                    throw new IOException(e);
                }
                throw new IOException(
                        "dummy exception to slip legacy reconnect");
            }

            @Override
            public String getAlias() {
                return target.getAlias();
            }
        });

    } catch (Exception e) {
        e.printStackTrace();
    }

    SchedulerAuthenticationInterface auth = null;
    Scheduler scheduler = null;

    while (true) {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            System.out.println("Main.main() ---> " + scheduler.getStatus());
        } catch (Exception ee) {
            try {
                try {
                    LocalBodyStore.getInstance().clearAllContexts();
                    BodyMap bm = LocalBodyStore.getInstance()
                            .getLocalHalfBodies();
                    Field privateField = bm.getClass().getDeclaredField(
                            "idToBodyMap");
                    privateField.setAccessible(true);
                    Hashtable ht = (Hashtable) privateField.get(bm);
                    ht.clear();
                } catch (Exception ex) {
                    System.out
                            .println("Main.main() unable to clen half bodies"
                                    + ex.getMessage());
                }
                auth = SchedulerConnection.join("pamr://1");
                System.out.println("--- join ok");
                scheduler = auth
                        .login(Credentials
                                .getCredentials("H:<BR/>Temp<BR/>workspace<BR/>Releases<BR/>ProActiveScheduling-3.2.0_server<BR/>config<BR/>authentication<BR/>rm.cred"));
                System.out.println("--- login ok");
            } catch (Exception eee) {
                System.out
                        .println("Main.main() could not reconnect to the scheduler: "
                                + eee.getMessage());
            }
            try {
                System.out.println("Main.main() ---> REGET "
                        + scheduler.getStatus());
            } catch (Exception rege) {
                rege.printStackTrace();
            }
        }
    }

}

}