logstash-plugins / logstash-codec-avro

A logstash codec plugin for decoding and encoding Avro records
Apache License 2.0
15 stars 63 forks source link

Logstash Error #15

Open jconlon opened 8 years ago

jconlon commented 8 years ago

Getting the following errors while trying to decode an avro kafka payloads:

���T.2016-02-18T21:04:14.182MadisonMadison._sp80UM-LEeWzy4JC2MRxcg._dM83cNa1EeWNn7pTE75EYA jconlon@mudshark"Condor Industries"Condor Industries._iqWIsM9sEeWzy4JC2MRxcg2cdo://mudshark:2036/repo1 {:exception=>#<NoMethodError: undefined methoddecode' for #Array:0x61a3ee90>, :backtrace=>["/opt/elastic/logstash/logstash-2.2.2/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.rb:178:in queue_event'", "/opt/elastic/logstash/logstash-2.2.2/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-2.0.4/lib/logstash/inputs/kafka.rb:148:inrun'", "/opt/elastic/logstash/logstash-2.2.2/vendor/bundle/jruby/1.9/gems/logstash-core-2.2.2-java/lib/logstash/pipeline.rb:331:in inputworker'", "/opt/elastic/logstash/logstash-2.2.2/vendor/bundle/jruby/1.9/gems/logstash-core-2.2.2-java/lib/logstash/pipeline.rb:325:instart_input'"], :level=>:error}`

Logstash configuration

input {
     kafka {
       topic_id => "Observe"
       type => "Observe"
       reset_beginning => true
       auto_offset_reset => smallest
       codec => {
         avro => {
           schema_uri => "/home/jconlon/git/com.verticon.irouter/com.verticon.im.avro/avroSchema/observeEvent.avsc"
         }
       }
    }  
}

Payloads created with Avro 1.7.7 in Java:

Observe event = (Observe) abstractEvent;
            ObserveEvent observeEvent = ObserveEvent.newBuilder().setComments(event.getComments())
                    .setEntityKey(entity.getKey()).setEntityName(entity.getName()).setEntityTag(entity.getTag())
                    .setEventKey(event.getKey())
                    .setLocalTime(event.getLocalDateTime() != null ? event.getLocalDateTime().toString() : null)
                    .setModelRepository(modelRepo).setCurrentParentKey(parent.getKey())
                    .setCurrentParentName(parent.getName()).setCurrentParentTag(parent.getTag())
                    .setTimeStamp(event.getUtc() != null ? event.getUtc().toEpochMilli() : null)
                    .setUrl(event.getUrl() != null ? event.getUrl().toString() : null).setUser(event.getUser()).build();

            logger.debug("Transformed payload {}", observeEvent);

            ByteArrayOutputStream out = new ByteArrayOutputStream();
            // BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out,
            // null);
            BinaryEncoder encoder = EncoderFactory.get().blockingBinaryEncoder(out, null);
            DatumWriter<ObserveEvent> writer = new SpecificDatumWriter<ObserveEvent>(ObserveEvent.class);

            try {
                writer.write(observeEvent, encoder);
                encoder.flush();
                out.close();
                serializedBytes = out.toByteArray();
            } catch (IOException e) {
                logger.error("Failed to write byte array", e);
            }

Tried both:

                     // BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
            BinaryEncoder encoder = EncoderFactory.get().blockingBinaryEncoder(out, null);

Appreciated any tips on solving this.

thanks, John

rmoff commented 8 years ago

Per http://stackoverflow.com/a/33211940/350613 have you tried this alternative logstash config for the avro codec:

input {
     kafka {
       topic_id => "Observe"
       type => "Observe"
       reset_beginning => true
       auto_offset_reset => smallest
       codec => avro  {
           schema_uri => "/home/jconlon/git/com.verticon.irouter/com.verticon.im.avro/avroSchema/observeEvent.avsc"
         }

    }  
}
micahlmartin commented 8 years ago

+1 I'm getting the same error. I tried using the config from that stackoverflow post but when I try it I get this error:

fetched an invalid config {:config=>"input {
  kafka {
    topic_id => \"test\"
    zk_connect => \"zookeeper:2181\"
    codec => avro {
        schema_uri => \"/etc/avro/test.avsc\"
      }
    }
  }
  }

output {

  file {
    path => \"/data/test-%{+YYYY-MM-dd}.log\"
  }

  }

", :reason=>"Expected one of #, input, filter, output at line 10, column 1 (byte 300) after ", :level=>:error}