Yelp / pyleus

Pyleus is a Python framework for developing and launching Storm topologies.
Apache License 2.0
404 stars 113 forks source link

Kafka Consumer Spout Example and Offset Time type casting issue #93

Closed moandcompany closed 9 years ago

moandcompany commented 9 years ago

Hi,

I'm trying to use Pyleus to consume data from Kafka in a Storm topology.

In the example topology yaml file there is an option value called 'start_offset_time' which appears to represent the Kafka consumer offset for reading a particular topic:

        "start_offset_time: 1398971060"

Pyleus builds this topology correctly but I am getting a type casting error when submitting the topology for local or nimbus execution.

The issue I'm running into is figuring out how to explicitly specify a 'Long' in YAML versus having it be read as an 'int' or 'String'

A start_offset_time of '0L' for example builds but is read as a String during execution.

         Exception in thread "main" java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
            at com.yelp.pyleus.PyleusTopologyBuilder.handleKafkaSpout(PyleusTopologyBuilder.java:155)
            at com.yelp.pyleus.PyleusTopologyBuilder.handleSpout(PyleusTopologyBuilder.java:103)
            at com.yelp.pyleus.PyleusTopologyBuilder.buildTopology(PyleusTopologyBuilder.java:196)
            at com.yelp.pyleus.PyleusTopologyBuilder.main(PyleusTopologyBuilder.java:292)

A start_offset_time of '0' builds but is read as an Integer during execution and raises a similar exception:

          Exception in thread "main" java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
            at com.yelp.pyleus.PyleusTopologyBuilder.handleKafkaSpout(PyleusTopologyBuilder.java:155)
            at com.yelp.pyleus.PyleusTopologyBuilder.handleSpout(PyleusTopologyBuilder.java:103)
            at com.yelp.pyleus.PyleusTopologyBuilder.buildTopology(PyleusTopologyBuilder.java:196)
            at com.yelp.pyleus.PyleusTopologyBuilder.main(PyleusTopologyBuilder.java:292)

Cheers, Andrew

moandcompany commented 9 years ago

I found a temporary workaround for this (practically, only valid if your consumer is intended to start from the HEAD of the Kafka topic/message queue)

The YAML reader, by default, tries to make any integer value it reads a (32-bit) 'Integer' -- If the value does not fit, it next tries to fit the value into a (64-bit) 'Long', and then a 'BigInteger'

The largest value for a (32-bit) signed 'Integer' is '2147483647'

Setting 'start_offset_time' to '2147483648' allows the topology to be submitted and run successfully.

In my test case I am using the option 'from_start: false' so 'start_offset_time' is not actually being used, so this workaround would be invalid if you actually wanted your Kafka Consumer spout to begin at some offset less than 2147483648.

mzbyszynski commented 9 years ago

I was just poking around in this code last night so it was easy for me to submit a PR to fix this. Hope that helps!

fullergalway commented 9 years ago

Thanks this caught me also.