brainlag / JavaNSQClient

Fast Java client for NSQ
MIT License
90 stars 57 forks source link

fail to run message.requeue(); #45

Open gzliudan opened 6 years ago

gzliudan commented 6 years ago

nsq version:1.0 os:centos 7.4

codes: public static void main( String[] args ) { NSQLookup lookup = new DefaultNSQLookup(); lookup.addLookupAddress("192.168.1.228", 4161); NSQConsumer consumer = new NSQConsumer(lookup, "TestTopic", "dusti", (message) -> { System.out.println("received: " + new String(message.getMessage())); message.finished(); message.requeue(); });

    consumer.start();
}

below is run time messages: 17:00:37.080 INFO Created connection: knowledgebase:4150 - Connection. 17:00:37.100 INFO IdentifyResponse: {"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250} - NSQFeatureDetectionHandler.channelRead0 17:00:37.101 INFO reinstall LengthFieldBasedFrameDecoder - NSQFeatureDetectionHandler.eject 17:00:37.102 INFO Server identification: {"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250} - Connection. received: test8 received: test9 十二月 28, 2017 5:00:53 下午 io.netty.util.concurrent.SingleThreadEventExecutor runAllTasks 警告: A task raised an exception. java.lang.IllegalStateException: Queue full at java.util.AbstractQueue.add(AbstractQueue.java:98) at com.github.brainlag.nsq.Connection.incoming(Connection.java:129) at com.github.brainlag.nsq.netty.NSQHandler.lambda$channelRead0$3(NSQHandler.java:41) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:408) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:402) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) at java.lang.Thread.run(Thread.java:748)

Andy320 commented 6 years ago

I came across the same problem when receiving messages from nsq.

The java code below

` protected MsgConsumer(MsgConfig config, String topic) {

    NSQLookup lookup = new DefaultNSQLookup();
    lookup.addLookupAddress(config.getHost(), config.getPort());
    logger.info("start to init nsq consumer, host={},port={},topic={}", config.getHost(), config.getPort(), topic);

    NSQConsumer consumer = new NSQConsumer(lookup, topic, channel,new NSQMessageCallback (){
        @Override
        public void message(NSQMessage message) {
            try {
                message.finished();
                String msg = new String(message.getMessage(), "UTF-8");
                System.out.println("接收 rev:"+msg);
                if (Strings.isNullOrEmpty(msg)) {
                    logger.info("consume msg is empty.");
                    return;
                }
                onMessage(msg);

            } catch (Exception e) {
                message.requeue();
                logger.error("failed to consume message({}), cause: {}",
                        message, Throwables.getStackTraceAsString(e));
            }
        }
    });
    consumer.start();

}

`

The exception below {"date":"2018-02-06T16:44:43.231","traceId":"null","sequenceId":"null","level":"WARN","appName":"box-service","class":"io.netty.util.concurrent.SingleThreadEventExecutor","method":"warn","line":"151","message":"A task raised an exception. java.lang.IllegalStateException: Queue full at java.util.AbstractQueue.add(AbstractQueue.java:98) at com.github.brainlag.nsq.Connection.incoming(Connection.java:129) at com.github.brainlag.nsq.netty.NSQHandler.lambda$channelRead0$3(NSQHandler.java:41) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:408) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:402) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) at java.lang.Thread.run(Thread.java:745)

hankchan101 commented 5 years ago

how can I fix it?