pystorm / streamparse

Run Python in Apache Storm topologies. Pythonic API, CLI tooling, and a topology DSL.
http://streamparse.readthedocs.io/
Apache License 2.0
1.49k stars 218 forks source link

Async loop interrupted!: #67

Closed elastic10 closed 10 years ago

elastic10 commented 10 years ago

i'm using the framework streamparse to create storm topology. when i run my topology in local mode, my topology run on for five seconds and after that it shuttin down; is there any solution please, tanks in advance. (im using my toplogy to get twitter stream )

log: topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "storm.messaging.netty.flush.check.interval.ms" 10, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/tmp/d47409c7-af6c-461c-a6a3-52fd9d283435", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2001, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" 2, "streamparse.log.path" "/home/bena/sahbi/logs", "topology.kryo.decorators" (), "topology.name" "wordcount", "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" 5000, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (1024 1025 1026), "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.kryo.register" nil, "topology.message.timeout.secs" 60, "task.refresh.poll.secs" 10, "topology.workers" 2, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "topology.multilang.serializer" "backtype.storm.multilang.JsonSerializer", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.worker.receiver.thread.count" 1, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", "topology.max.task.parallelism" nil, "storm.messaging.netty.transfer.batch.size" 262144, "streamparse.log.level" "debug"} 3287 [Thread-4] INFO backtype.storm.daemon.worker - Worker 102ed550-1958-4494-9042-4b553336c01f for storm wordcount-1-1411485247 on ddd46eee-f950-41ae-b4db-ed3997a22d27:1024 has finished loading 3293 [Thread-22-acker] INFO backtype.storm.daemon.executor - Preparing bolt acker:(1) 3293 [Thread-22-acker] INFO backtype.storm.daemon.executor - Prepared bolt acker:(1) 3294 [Thread-23-worker-receiver-thread-0] INFO backtype.storm.messaging.loader - Starting receive-thread: [stormId: wordcount-1-1411485247, port: 1024, thread-id: 0 ] 3606 [Thread-14-word-spout] INFO backtype.storm.spout.ShellSpout - Launched subprocess with pid 29050 3607 [Thread-9-count-bolt] INFO backtype.storm.task.ShellBolt - Launched subprocess with pid 29041 3607 [Thread-14-word-spout] INFO backtype.storm.daemon.executor - Opened spout word-spout:(5) 3607 [Thread-12-count-bolt] INFO backtype.storm.task.ShellBolt - Launched subprocess with pid 29043 3609 [Thread-9-count-bolt] INFO backtype.storm.daemon.executor - Prepared bolt count-bolt:(3) 3610 [Thread-14-word-spout] INFO backtype.storm.daemon.executor - Activating spout word-spout:(5) 3610 [Thread-12-count-bolt] INFO backtype.storm.daemon.executor - Prepared bolt count-bolt:(4) 6965 [main] INFO backtype.storm.daemon.nimbus - Shutting down master 6997 [main] INFO backtype.storm.daemon.nimbus - Shut down master 7015 [main] INFO backtype.storm.daemon.supervisor - Shutting down ddd46eee-f950-41ae-b4db-ed3997a22d27:102ed550-1958-4494-9042-4b553336c01f 7015 [main] INFO backtype.storm.process-simulator - Killing process b35fe761-8f07-4b7d-98ee-9e11dff1941c 7015 [main] INFO backtype.storm.daemon.worker - Shutting down worker wordcount-1-1411485247 ddd46eee-f950-41ae-b4db-ed3997a22d27 1024 7015 [main] INFO backtype.storm.daemon.worker - Shutting down receive thread 7015 [main] INFO backtype.storm.messaging.loader - Shutting down receiving-thread: [wordcount-1-1411485247, 1024] 7016 [main] INFO backtype.storm.messaging.loader - Waiting for receiving-thread:[wordcount-1-1411485247, 1024] to die 7016 [Thread-23-worker-receiver-thread-0] INFO backtype.storm.messaging.loader - Receiving-thread:[wordcount-1-1411485247, 1024] received shutdown notice 7017 [main] INFO backtype.storm.messaging.loader - Shutdown receiving-thread: [wordcount-1-1411485247, 1024] 7017 [main] INFO backtype.storm.daemon.worker - Shut down receive thread 7017 [main] INFO backtype.storm.daemon.worker - Terminating messaging context 7017 [main] INFO backtype.storm.daemon.worker - Shutting down executors 7017 [main] INFO backtype.storm.daemon.executor - Shutting down executor count-bolt:[3 3] 7017 [Thread-9-count-bolt] INFO backtype.storm.util - Async loop interrupted! 7017 [Thread-7-disruptor-executor[3 3]-send-queue] INFO backtype.storm.util - Async loop interrupted! 7018 [main] INFO backtype.storm.daemon.executor - Shut down executor count-bolt:[3 3] 7018 [main] INFO backtype.storm.daemon.executor - Shutting down executor word-spout:[5 5] 7018 [Thread-13-disruptor-executor[5 5]-send-queue] INFO backtype.storm.util - Async loop interrupted!

amontalenti commented 10 years ago

Hey there --

Yes, this is determined by the "-t --time" flag available for "sparse run".

    -t --time <time>            Time (in seconds) to keep local running [default: 5]

So, to run longer:

 sparse -n <topology-name> -t 60 run

will run for 60 seconds.

elastic10 commented 10 years ago

Thanks, but i look for somethiing that run on every time because i'm get real time stream. do you have a solution for this?

kbourgoin commented 10 years ago

sparse run is designed for local testing of topologies, and is intended to exit after that pre-defined amount of time. To have this run indefinitely, you want to use spare submit to submit the topology to a remote cluster that you've set up somewhere.