amazon-archives / cloudwatch-logs-subscription-consumer

A specialized Amazon Kinesis stream reader (based on the Amazon Kinesis Connector Library) that can help you deliver data from Amazon CloudWatch Logs to any other system in near real-time using a CloudWatch Logs Subscription Filter.
Other
397 stars 152 forks source link

Errors using consumer with ES 2.0 #8

Open DanHoerst opened 8 years ago

DanHoerst commented 8 years ago

Hi - I'm attempting to use the consumer with our ES 2.0 cluster and am receiving errors. I know there were some breaking changes with the Java API in ES 2.0, so I'm assuming that is the issue here, but I'm hoping to give as much info as possible in order to get this resolved.

The consumer was compiled as instructed in the Readme with default configuration other than the ES cluster details and TRACE logging.

The error I am receiving on the consumer side is:

2015-11-13 19:53:47,418 INFO  transport - [Blockbuster] failed to get local cluster state for [#transport#-1][localhost][inet[elasticsearch-int.x.com/10.xx.xx.xx:9300]], disconnecting...
org.elasticsearch.transport.RemoteTransportException: Failed to deserialize exception response from stream
Caused by: org.elasticsearch.transport.TransportSerializationException: Failed to deserialize exception response from stream
    at org.elasticsearch.transport.netty.MessageChannelHandler.handlerResponseError(MessageChannelHandler.java:178)
    at org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(MessageChannelHandler.java:130)
    at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
    at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:296)
    at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
    at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
    at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
    at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
    at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:255)
    at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
    at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
    at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
    at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
    at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
    at org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.StreamCorruptedException: Unsupported version: 1
    at org.elasticsearch.common.io.ThrowableObjectInputStream.readStreamHeader(ThrowableObjectInputStream.java:46)
    at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
    at org.elasticsearch.common.io.ThrowableObjectInputStream.<init>(ThrowableObjectInputStream.java:38)
    at org.elasticsearch.transport.netty.MessageChannelHandler.handlerResponseError(MessageChannelHandler.java:175)
    ... 23 more

The error that I am receiving on the Elasticsearch side is:

[2015-11-13 19:56:03,117][WARN ][transport.netty          ] [Mister X] exception caught on transport layer [[id: 0x45211071, /10.xx.xx.xx:26266 => /10.xx.xx.xx:9300]], closing connection
java.lang.IllegalStateException: Message not fully read (request) for requestId [24], action [cluster/state], readerIndex [34] vs expected [49]; resetting
        at org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(MessageChannelHandler.java:120)
        at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
        at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
        at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
        at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
        at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
        at org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChannelsHandler.java:75)
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
        at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
        at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
        at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

Both the ES cluster and the consumer are using Java 1.7.0_91.

Please let me know if there is anything else I can add.

Thanks.

gamefundas commented 8 years ago

Do we have a resolution for this? I am seeing the same failure for ES 2.x

danielcbright commented 8 years ago

I was/am having the same issue and now I've gotten a little further, I'm not Java programmer so pardon my ignorance. But I've updated the pom.xml to reflect a newer version of elasticsearch as shown:

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>2.0.0</version>
</dependency>

This caused a new issue during my Maven install:

[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.2:compile (default-compile) on project cloudwatch-logs-subscription-consumer: Compilation failure: Compilation failure:
[ERROR] /opt/cwlsc/src/main/java/com/amazonaws/services/logs/connectors/elasticsearch/CloudWatchLogsElasticsearchDocument.java:[19,38] package org.elasticsearch.common.lang3 does not exist
[ERROR] /opt/cwlsc/src/main/java/com/amazonaws/services/logs/connectors/elasticsearch/CloudWatchLogsElasticsearchDocument.java:[79,21] cannot find symbol
[ERROR] symbol:   variable StringUtils
[ERROR] location: class com.amazonaws.services.logs.connectors.elasticsearch.CloudWatchLogsElasticsearchDocument
[ERROR] -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException

I found out that the org.apache.commons:commons-lang3 artifact that is included in 1.6.x - 1.7.x of the elasticsearch mvn artifact see pom.xml here is not in 2.x.x see the 2.0.0 pom.xml here

So I added this dependency to the pom.xml for cloudwatch-logs-subscription-consumer

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.3.2</version>
</dependency>

And I changed src/main/java/com/amazonaws/services/logs/connectors/elasticsearch/CloudWatchLogsElasticsearchDocument.java to use the org.apache.commons lang3 instead of elasticsearch:

/*import org.elasticsearch.common.lang3.StringUtils;*/
import org.apache.commons.lang3.StringUtils;

After doing this, I can do a mvn clean install without issues.

The new issue I've run into is this:

2016-01-21 13:36:59,309 ERROR Worker - Worker.run caught exception, sleeping for 200 milli seconds!
java.lang.RuntimeException: java.lang.NoClassDefFoundError: org/elasticsearch/common/settings/ImmutableSettings
        at com.amazonaws.services.kinesis.connectors.KinesisConnectorRecordProcessorFactory.createProcessor(KinesisConnectorRecordProcessorFactory.java:53)
        at com.amazonaws.services.kinesis.clientlibrary.lib.worker.V1ToV2RecordProcessorFactoryAdapter.createProcessor(V1ToV2RecordProcessorFactoryAdapter.java:36)
        at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.createOrGetShardConsumer(Worker.java:486)
        at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:329)
        at com.amazonaws.services.kinesis.connectors.KinesisConnectorExecutorBase.run(KinesisConnectorExecutorBase.java:95)
        at com.amazonaws.services.logs.connectors.samples.elasticsearch.ElasticsearchConnector.main(ElasticsearchConnector.java:38)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:297)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NoClassDefFoundError: org/elasticsearch/common/settings/ImmutableSettings
        at com.amazonaws.services.kinesis.connectors.elasticsearch.ElasticsearchEmitter.<init>(ElasticsearchEmitter.java:106)
        at com.amazonaws.services.logs.connectors.samples.elasticsearch.ElasticsearchPipeline.getEmitter(ElasticsearchPipeline.java:38)
        at com.amazonaws.services.kinesis.connectors.KinesisConnectorRecordProcessorFactory.createProcessor(KinesisConnectorRecordProcessorFactory.java:46)
        ... 11 more

It looks like the ElasticsearchEmitter.java in the amazon-kinesis-connectors library has the import defined and that's what's breaking.

I'm going to add some logic to differentiate between 1.x and 2.x and test locally, if it works, I'll make a PR back to amazon-kinesis-connectors with the fix.

+@DVassallo for visibility, he's been great at helping me through this struggle :)

danielcbright commented 8 years ago

OK, I just spent all day getting this to work, it is a two-part problem. The first part is that the amazon-kinesis-connectors library needs to be updated to support elasticsearch 2.0, I've created a PR based on my work today https://github.com/awslabs/amazon-kinesis-connectors/pull/63.

The second part is that the current cloudwatch-logs-subscription-consumer uses org.elasticsearch.common.lang which WAS a shade of org.apache.commons.lang in elasticsearch 1.x, now shades are gone and you call the classes directly. A new PR has been made here that has examples on how to make it work for 2.x https://github.com/awslabs/cloudwatch-logs-subscription-consumer/pull/11.