MeltwaterArchive / datasift-java

Java client to interface with DataSift
http://www.datasift.com/
MIT License
10 stars 7 forks source link

.IllegalStateException: unexpected message type: UnpooledUnsafeDirectByteBuf #71

Closed zcourts closed 7 years ago

zcourts commented 10 years ago

See https://github.com/datasift/datasift-java/issues/47#issuecomment-58490775

The stack trace is: 
2014-10-01 18:00:52.158 [nioEventLoopGroup-2-2] ERROR c.s.e.datasift.stream.ErrorHandler -Error in Datasift stream\
io.netty.handler.codec.EncoderException: java.lang.IllegalStateException: unexpected message type: UnpooledUnsafeDirectByteBuf\
at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:107) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:193) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:657) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:715) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:650) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:112) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:657) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:715) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:705) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:740) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:895) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:241) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.higgs.ws.client.WebSocketStream.send(WebSocketStream.java:51) ~[ws-client-0.0.8-1.jar:na]\
at com.datasift.client.stream.StreamingData.pushUnsentSubscriptions(StreamingData.java:239) [datasift-java-3.0.0-Beta3.5.jar:na]\
at com.datasift.client.stream.StreamingData.subscribe(StreamingData.java:225) [datasift-java-3.0.0-Beta3.5.jar:na]\
at com.datasift.client.stream.StreamingData$2.operationComplete(StreamingData.java:246) [datasift-java-3.0.0-Beta3.5.jar:na]\
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:682) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.DefaultPromise.notifyLateListener(DefaultPromise.java:624) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:144) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:93) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:28) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at com.datasift.client.stream.StreamingData.pushUnsentSubscriptions(StreamingData.java:239) [datasift-java-3.0.0-Beta3.5.jar:na]\
at com.datasift.client.stream.StreamingData.subscribe(StreamingData.java:225) [datasift-java-3.0.0-Beta3.5.jar:na]\
at com.datasift.client.stream.StreamingData$2.operationComplete(StreamingData.java:246) [datasift-java-3.0.0-Beta3.5.jar:na]\
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:682) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.DefaultPromise.notifyLateListener(DefaultPromise.java:624) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:144) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:93) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:28) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at com.datasift.client.stream.StreamingData.pushUnsentSubscriptions(StreamingData.java:239) [datasift-java-3.0.0-Beta3.5.jar:na]\
at com.datasift.client.stream.StreamingData.subscribe(StreamingData.java:225) [datasift-java-3.0.0-Beta3.5.jar:na]\
at com.datasift.client.stream.StreamingData$2.operationComplete(StreamingData.java:246) [datasift-java-3.0.0-Beta3.5.jar:na]\
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:682) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.Defau\
t io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:28) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at com.datasift.client.stream.StreamingData.pushUnsentSubscriptions(StreamingData.java:239) [datasift-java-3.0.0-Beta3.5.jar:na]\
at com.datasift.client.stream.StreamingData.subscribe(StreamingData.java:225) [datasift-java-3.0.0-Beta3.5.jar:na]\
at com.datasift.client.stream.StreamingData$2.operationComplete(StreamingData.java:246) [datasift-java-3.0.0-Beta3.5.jar:na]\
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:682) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.DefaultPromise.notifyLateListener(DefaultPromise.java:624) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:144) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:93) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:28) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at com.datasift.client.stream.StreamingData.pushUnsentSubscriptions(StreamingData.java:239) [datasift-java-3.0.0-Beta3.5.jar:na]\
at com.datasift.client.stream.StreamingData.subscribe(StreamingData.java:225) [datasift-java-3.0.0-Beta3.5.jar:na]\
at com.datasift.client.stream.StreamingData$2.operationComplete(StreamingData.java:246) [datasift-java-3.0.0-Beta3.5.jar:na]\
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:682) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.DefaultPromise.notifyLateListener(DefaultPromise.java:624) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:144) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:93) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:28) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at com.datasift.client.stream.StreamingData.pushUnsentSubscriptions(StreamingData.java:239) [datasift-java-3.0.0-Beta3.5.jar:na]\
at com.datasift.client.stream.StreamingData.subscribe(StreamingData.java:225) [datasift-java-3.0.0-Beta3.5.jar:na]\
at com.datasift.client.stream.StreamingData$2.operationComplete(StreamingData.java:246) [datasift-java-3.0.0-Beta3.5.jar:na]\
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:682) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.DefaultPromise.notifyLateListener(DefaultPromise.java:624) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:144) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:93) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:28) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at com.datasift.client.stream.StreamingData.pushUnsentSubscriptions(StreamingData.java:239) [datasift-java-3.0.0-Beta3.5.jar:na]\
at com.datasift.client.stream.StreamingData.subscribe(StreamingData.java:225) [datasift-java-3.0.0-Beta3.5.jar:na]\
at com.datasift.client.stream.StreamingData$2.operationComplete(StreamingData.java:246) [datasift-java-3.0.0-Beta3.5.jar:na]\
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:682) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.DefaultPromise$LateListeners.run(DefaultPromise.java:847) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.DefaultPromise$LateListenerNotifier.run(DefaultPromise.java:875) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:370) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:353) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) [netty-all-4.0.20.Final.jar:4.0.20.Final]\
at java.lang.Thread.run(Thread.java:722) [na:1.7.0_07]\
Caused by: java.lang.IllegalStateException: unexpected message type: UnpooledUnsafeDirectByteBuf\
nishant-kumar12 commented 9 years ago

I am having this for last 1 month. I keep on updating csdl and then do reconnect. I face this issue 1/5 times My code looks like try{ //trying to unsubscribe datasiftClient.liveStream().unsubscribe(currentStream); HttpRequestBuilder.restart(); //TODO need to check if we need to wait here till http client restarts. } catch(Exception e) { e.printStackTrace(); } }

    datasiftClient.liveStream().subscribe(new StreamSubscription(stream) {
        public void onDataSiftLogMessage(DataSiftMessage di) {
            //di.isWarning() is also available
            System.out.println((di.isError() ? "Error" : di.isInfo() ? "Info" : "Warning") + ":\n" + di);
        }

        public void onMessage(Interaction i) {
            System.out.println("INTERACTION:\n" + i);

            //deserializing incoming data to DataSiftData
            Gson gson = new GsonBuilder().excludeFieldsWithModifiers(Modifier.TRANSIENT).create();
            DataSiftData data = gson.fromJson(i.toString(), DataSiftData.class);

            if(data.interaction.type.compareTo("facebook_page") == 0 && data.interaction.subtype.compareTo("page_like") == 0)
            {
                //ignoring the page like as we dont have any information of who liked the page.
                Log.debug("ignoring page like");
            }
            else{
                //updating interaction author.id with interaction.author.idstr
                data.interaction.author.idStr = String.valueOf(data.interaction.author.id); 
                PostData postWithAlertObj = new PostData();
                postWithAlertObj.setPostData(data);
                postWithAlertObj.setProfileID(data.interaction.author.idStr);
                postWithAlertObj.setInteractionID(data.interaction.id);

                queue.offer(postWithAlertObj);
            }
        }
    });
zcourts commented 9 years ago

Why?

HttpRequestBuilder.restart();

Just doing

datasiftClient.liveStream().unsubscribe(currentStream);

is enough to unsubscribe from a stream. If you still receive interactions for a stream after unsubscribing then this is probably a bug (bare in mind that when you unsubscribe there may have been interactions already matched and need to be delivered).

zcourts commented 9 years ago

@nishant-kumar12 I'll try to reproduce this issue again. This time using HttpRequestBuilder.restart(); to see if that helps reproduce the exception.