Atmosphere / wasync

WebSockets with fallback transports client library for Node.js, Android and Java
http://async-io.org
161 stars 47 forks source link

Atmosphere streaming problem #145

Open zsenyeg opened 7 years ago

zsenyeg commented 7 years ago

Hi there!

I have a problem using wasync in my project. Project architecture and configuration:

Server side: Websphere 8.5 - Atmosphere Framework 2.3.8, protocol STREAMING, casuse WS is not supported by WAS currently. Server side is standard @ManagedService

import org.atmosphere.config.service.Disconnect;
import org.atmosphere.config.service.ManagedService;
import org.atmosphere.config.service.Ready;
import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.AtmosphereResourceEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectMapper;

@ManagedService(path = "/atmosphere/push")
public class AllexaPushService {
    private final Logger LOGGER = LoggerFactory.getLogger(AllexaPushService.class);

    private ObjectMapper objectMapper = new ObjectMapper();

    @Ready
    public void onReady(final AtmosphereResource r) {
    LOGGER.info("Client {} connected.", r.uuid());
    try {
        r.getBroadcaster().broadcast(objectMapper.writeValueAsString(new ConnectionStateChangeEvent(r.uuid(), true)), r);
    } catch (Exception e) {
        LOGGER.error(e.getMessage(), e);
    }
    }

    @Disconnect
    public void onDisconnect(AtmosphereResourceEvent event) {
    if (event.isCancelled()) {
        LOGGER.info("Client {} unexpectedly disconnected", event.getResource().uuid());
    } else if (event.isClosedByClient()) {
        LOGGER.info("Client {} closed the connection", event.getResource().uuid());
    }
    }
}

web.xml

    <servlet>
        <description>AtmosphereServlet</description>
        <servlet-name>AtmosphereServlet</servlet-name>
        <servlet-class>org.atmosphere.cpr.AtmosphereServlet</servlet-class>
        <init-param>
            <param-name>org.atmosphere.cpr.broadcaster.maxAsyncWriteThreads</param-name>
            <param-value>5</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.cpr.broadcaster.maxProcessingThreads</param-name>
            <param-value>2</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.disableOnStateEvent</param-name>
            <param-value>true</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.useWebSocket</param-name>
            <param-value>false</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.cpr.asyncSupport</param-name>
            <param-value>org.atmosphere.container.Servlet30CometSupport</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.interceptor.HeartbeatInterceptor.heartbeatFrequencyInSeconds</param-name>
            <param-value>20</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.cpr.CometSupport.maxInactiveActivity</param-name>
            <param-value>1800000</param-value>
        </init-param>
        <init-param>
            <param-name>com.ibm.ws.webcontainer.async-supported</param-name>
            <param-value>true</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.cpr.AtmosphereInterceptor.disable</param-name>
            <param-value>org.atmosphere.client.TrackMessageSizeInterceptor</param-value>
        </init-param>
        <!-- 
        <init-param>
            <param-name>org.atmosphere.cpr.objectFactory</param-name>
            <param-value>org.atmosphere.cdi.CDIObjectFactory</param-value>
        </init-param>
         -->
        <load-on-startup>1</load-on-startup>
        <async-supported>true</async-supported>
    </servlet>

Client side wasync related code:

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.atmosphere.wasync.ClientFactory;
import org.atmosphere.wasync.Decoder;
import org.atmosphere.wasync.Event;
import org.atmosphere.wasync.Function;
import org.atmosphere.wasync.Request.METHOD;
import org.atmosphere.wasync.Socket;
import org.atmosphere.wasync.impl.AtmosphereClient;
import org.atmosphere.wasync.impl.AtmosphereRequest.AtmosphereRequestBuilder;
import org.atmosphere.wasync.impl.DefaultOptions;
import org.atmosphere.wasync.impl.DefaultOptionsBuilder;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ning.http.client.AsyncHttpClient;

import hu.danubius.allexa.be.business.helper.HttpClientFactory;
import hu.danubius.allexa.be.business.http.AllexaHttpClientBean;
import hu.danubius.allexa.be.dto.websocket.event.ConnectionStateChangeEvent;
import hu.danubius.allexa.util.helper.ObjectMapperFactory;

public class PushServiceTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(PushServiceTest.class);

    private Socket socket;
    private DefaultOptions options;

    public void init() {
    try {
        AsyncHttpClient asyncHttpClient = HttpClientFactory.createHttpClient(5000, "wAsync/2.0");

        AtmosphereClient client = ClientFactory.getDefault().newClient(AtmosphereClient.class);
        final AtmosphereRequestBuilder requestBuilder = client.newRequestBuilder();
        requestBuilder.method(METHOD.GET);
        String uri = "...";
        requestBuilder.uri(uri);
        requestBuilder.header("Accept", "application/json").header("Accept-Charset", AllexaHttpClientBean.CHARSET);
        requestBuilder.transport(org.atmosphere.wasync.Request.TRANSPORT.LONG_POLLING);

        DefaultOptionsBuilder optBuilder = client.newOptionsBuilder();
        optBuilder.reconnect(true);
        optBuilder.runtime(asyncHttpClient, false);
        optBuilder.pauseBeforeReconnectInSeconds(10);
        optBuilder.reconnectAttempts(-1);
        this.options = optBuilder.build();
        requestBuilder.decoder(new Decoder<String, Object>() {
        @Override
        public Object decode(Event event, String data) {
            ...
        }
        });

        this.socket = client.create(this.options);
        this.socket.on(Event.MESSAGE, new Function<ConnectionStateChangeEvent>() {
        @Override
        public void on(ConnectionStateChangeEvent response) {
            LOGGER.info("Connection with uuid: {} was success to central server", response.getUuid());
        }
        });
        socket.on(Event.HEADERS, new Function<String>() {
        @Override
        public void on(String t) {
            LOGGER.info("EVENT:HEADERS");
        }
        });

        socket.on(new Function<Throwable>() {
        @Override
        public void on(Throwable t) {
            LOGGER.info("MESSAGE:THROWABLE");
            LOGGER.error(t.getMessage(), t);
        }
        });
        socket.on(Event.REOPENED, new Function<String>() {
        @Override
        public void on(String t) {
            LOGGER.info("EVENT:REOPENED");
        }
        });
        socket.on(Event.TRANSPORT, new Function<String>() {
        @Override
        public void on(String t) {
            LOGGER.info("EVENT:TRANSPORT");
        }
        });
        socket.on(Event.STATUS, new Function<String>() {
        @Override
        public void on(String t) {
            LOGGER.info("EVENT:STATUS");
        }
        });
        socket.on(Event.OPEN, new Function<String>() {
        @Override
        public void on(String t) {
            LOGGER.info("EVENT:OPEN");
        }
        });
        socket.on(Event.CLOSE, new Function<String>() {
        @Override
        public void on(String t) {
            LOGGER.info("EVENT:CLOSE");
        }
        });
        socket.on(Event.ERROR, new Function<String>() {
        @Override
        public void on(String s) {
            LOGGER.info("EVENT:ERROR");
        }
        });
        socket.open(requestBuilder.build());
    } catch (Exception e) {
        LOGGER.error("Cannot initialize online state provider: {}", e.getMessage(), e);
    }
    }
}

AsyncHttpClient config:

import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.AsyncHttpClientConfig;
import com.ning.http.client.Realm;

public class HttpClientFactory {
    public static final AsyncHttpClient createHttpClient(int connectionTimeOut, String userAgent) {
    AsyncHttpClientConfig.Builder ccBuilder = new AsyncHttpClientConfig.Builder();
    Realm realm = new Realm.RealmBuilder().setPrincipal("...").setPassword("...").setUsePreemptiveAuth(true).setScheme(Realm.AuthScheme.BASIC).build();
    ccBuilder.setRealm(realm).build();
    ccBuilder.setAcceptAnyCertificate(true);
    ccBuilder.setFollowRedirect(true);
    ccBuilder.setRequestTimeout(-1);
    ccBuilder.setConnectTimeout(connectionTimeOut);
    ccBuilder.setUserAgent(userAgent);
    ccBuilder.setMaxRequestRetry(0);
    return new AsyncHttpClient(ccBuilder.build());
    }
}

AsyncHttpClient version is maven dependency of wasync.

My problem is that if i turn off wifi, or pull out network connector in client machine, wasync cannot detect network loss, do nothing, just not receiving messages from server, even if network connection is coming back.

Is it a streaming related problem, or related to my configuration. How can i detect that network connection is not available? Maybe i should write a ping like scheduler, and detect socket io exception?

Any help would be life saving for me.

Cheers, Zsolt

jfarcand commented 7 years ago

@zsenyeg Can you put a breakpoint https://github.com/Atmosphere/wasync/blob/2b3e5e708497140ef3e888700ab337fc348e07fe/wasync/src/main/java/org/atmosphere/wasync/transport/StreamTransport.java#L284-L284 and debug from there.

zsenyeg commented 7 years ago

Hey!

Thanks for your suggestion. I'v tried to debugging the problem, the close method is not invoking in case of wifi connection loss.

Another strange - or expected - behaviour is when the central server is down automatic reconnection is not working. Here's the warning from the log:

2017-05-24 11:03:42 - [New I/O boss #9] - WARN  o.a.wasync.transport.StreamTransport - StreamTransport notified with exception java.net.ConnectException: Connection refused: no further information: was.example.com/127.0.0.1:9443 for request : https://was.example.com:9443/allexa-be-gj-expert/atmosphere/push
2017-05-24 11:03:42 - [New I/O boss #9] - WARN  o.a.wasync.transport.StreamTransport - 
java.net.ConnectException: Connection refused: no further information: was.example.com/127.0.0.1:9443
    at com.ning.http.client.providers.netty.request.NettyConnectListener.onFutureFailure(NettyConnectListener.java:131) [async-http-client-1.9.28.jar:na]
    at com.ning.http.client.providers.netty.request.NettyConnectListener.operationComplete(NettyConnectListener.java:143) [async-http-client-1.9.28.jar:na]
    at org.jboss.netty.channel.DefaultChannelFuture.notifyListener(DefaultChannelFuture.java:409) [netty-3.10.3.Final.jar:na]
    at org.jboss.netty.channel.DefaultChannelFuture.notifyListeners(DefaultChannelFuture.java:400) [netty-3.10.3.Final.jar:na]
    at org.jboss.netty.channel.DefaultChannelFuture.setFailure(DefaultChannelFuture.java:362) [netty-3.10.3.Final.jar:na]
    at org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:109) [netty-3.10.3.Final.jar:na]
    at org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79) [netty-3.10.3.Final.jar:na]
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) [netty-3.10.3.Final.jar:na]
    at org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42) [netty-3.10.3.Final.jar:na]
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) [netty-3.10.3.Final.jar:na]
    at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) [netty-3.10.3.Final.jar:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_102]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_102]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_102]
Caused by: java.net.ConnectException: Connection refused: no further information: was.example.com/127.0.0.1:9443
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_102]
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[na:1.8.0_102]
    at org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152) [netty-3.10.3.Final.jar:na]
    at org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105) [netty-3.10.3.Final.jar:na]
    ... 8 common frames omitted
2017-05-24 11:03:42 - [New I/O boss #9] - INFO  h.d.a.b.b.OnlineStateProviderBean - MESSAGE:THROWABLE

Is it intended that after on throwable call there's no automatic reconnection? When central server is up, and application is down the message in the log is the next, and automatic reconnection is working:

`2017-05-24 11:03:31 - [New I/O worker #5] - ERROR h.d.a.b.b.OnlineStateProviderBean - Unexpected message Error 404: com.ibm.ws.webcontainer.servlet.exception.NoTargetForURIException: No target servlet configured for uri: /allexa-be-gj-expert/atmosphere/push

What am i doing wrong?

Cheers, Zsolt