twitter-archive / kestrel

simple, distributed message queue system (inactive)
http://twitter.github.io/kestrel
Other
2.78k stars 313 forks source link

Kestrel 2.9.2 doesn't initialize properly in Java #126

Open rajeevgoel opened 11 years ago

rajeevgoel commented 11 years ago

Hi All,

I am using Kestrel (net.lag:kestrel_2.9.2:jar:2.4.1) as a standalone queue in Java. It fails in queue.setup() method (code below). There are no files inside ./kestreltmp directory. There is no need to replay journals, but it does and fails over there.

Any pointers to fix this would be great.

Code: QueueConfig config = new QueueBuilder().apply(); System.out.println("config: " + config.toString()); java.util.concurrent.ScheduledExecutorService service = java.util.concurrent.Executors.newSingleThreadScheduledExecutor();

PersistentQueue queue = new PersistentQueue("test", "./kestreltmp", config, new JavaTimer(), service); queue.setup(); // fails here

Output

config: maxItems=2147483647 maxSize=9223372036854775807.bytes maxItemSize=9223372036854775807.bytes maxAge=None defaultJournalSize=16777216.bytes maxMemorySize=1001024.bytes maxJournalSize=1073741824.bytes discardOldWhenFull=false keepJournal=true syncJournal=1.seconds expireToQueue=None maxExpireSweep=2147483647 fanoutOnly=false maxQueueAge=None

max memory size is : 1001024.bytes

Jul 18, 2013 12:50:16 PM com.twitter.logging.Logger log INFO: Replaying transaction journal for '%s' Exception in thread "main" java.lang.NullPointerException at scala.collection.mutable.ArrayOps$ofRef.newBuilder(ArrayOps.scala:113) at scala.collection.mutable.ArrayOps$ofRef.newBuilder(ArrayOps.scala:108) at scala.collection.TraversableLike$class.filter(TraversableLike.scala:250) at scala.collection.mutable.ArrayOps.filter(ArrayOps.scala:38) at net.lag.kestrel.Journal.replay(Journal.scala:301) at net.lag.kestrel.PersistentQueue.replayJournal(PersistentQueue.scala:509) at net.lag.kestrel.PersistentQueue.setup(PersistentQueue.scala:441) at com.thed.teststep.PersistentQueueServiceTest.testPush(PersistentQueueServiceTest.java:185) at com.thed.teststep.PersistentQueueServiceTest.main(PersistentQueueServiceTest.java:89)

Second question is related to accessing Scala classes from Java. How to provide override values in QueueConfig ? Am unable to override values:

QueueBuilder qb = new QueueBuilder();

qb.maxItems(). // is not editable, how to override it ?

Next attempted to create QueueConfig directly with 14 args constructor, don't know how to provide values for: Option maxAge .
How to instantiate Option correctly ?

Thanks and regards, Rajeev

eric commented 11 years ago

You can use Option.apply(the_thing) to instantiate an Option.

eric commented 11 years ago

Also, if you want to fix the problem of seeing %s in the logs, this can help:

https://gist.github.com/eric/13f12b1f044fe8fd9b3a

rajeevgoel commented 11 years ago

Eric, Thanks a ton for the tip of Option.apply(), it worked. Provided all 14 arguments and could start the queue.

I don't understand why %s shows up. Is it due to bad logger configuration? BTW found that jul-over-slf4j has horrible performance, it is 60 times slower. Please see here http://www.slf4j.org/legacy.html

Thanks again !

Posting java code which worked for me:

    Duration oneSecond = new Duration(1000000000);  // 1sec
    Duration forever = Duration.forever() ;
    QueueConfig qc = new QueueConfig(50000, new StorageUnit(1024*1024*50), new StorageUnit(1024*1024*10), 
            Option.apply(forever), /* defaultJournalSize*/ new StorageUnit(1024*1024*50), /* maxMemorySize */new StorageUnit(1024*1024), 
            /* maxJournalSize */ new StorageUnit(1024*1024*50), false, true, 
            oneSecond, Option.apply("None"), /* doesn't matter as expireQueue is false */ 1, false, Option.apply(forever));
    System.out.println("config: " + qc.toString());
    // creating timer and service
    java.util.concurrent.ScheduledExecutorService service = java.util.concurrent.Executors.newSingleThreadScheduledExecutor();
    Timer timer = new JavaTimer();
    PersistentQueue queue = new PersistentQueue("test", "./kestreltmp", config, timer, service);
    queue.setup();

    // finally
    queue.close();
    timer.stop(); // else JVM will wait on this thread.
eric commented 11 years ago

For the expireToQueue setting, you may want to use Option.empty() instead of Option.apply("None") to represent nothing.