ptgoetz / storm-hdfs

Storm components for interacting with HDFS file systems
Apache License 2.0
61 stars 59 forks source link

Unable to set directory or filename dynamically based on the information in a tuple? #26

Open huxihx opened 9 years ago

huxihx commented 9 years ago

Hi there, Thanks much for this contrib. We kind of design a log aggregating system which collects logs from sources. The system puts all the log lines into a single Kafka topic. With the help of storm-kafka, we could consume each log line right now, but encounter a problem when we are going to transform each line into an HDFS file.

Sounds like storm-hdfs can only specify the directory and file name at the very first stage. We could not route different log lines from different log sources to different HDFS files.

by the way, we rewrote a whole framework like what you offered to by-pass this problem but ran into a performance issue when frequently appending and closing HDFS file, which made us give up.

Is there any plan that storm-hdfs is able to support this scenario in the future? Thanks!

rs-01 commented 9 years ago

It will also take care of our use cases where we want to partition the data in HDFS based on the time when event was generated. The events generation and their entry into data ingestion pipeline can get delayed by several hours and in some cases few days. Flume already has this feature. Lack of this feature is blocking us not to use storm for the time being.

250690392 commented 6 years ago

hello,Thanks for your contiribution. I meet a mistake when running SeuqenceFileTopology: the cosole print: java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151) ~[hadoop-hdfs-2.6.0.jar:na] at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:2038) ~[hadoop-hdfs-2.6.0.jar:na] at org.apache.hadoop.hdfs.DFSOutputStream.hsync(DFSOutputStream.java:1968) ~[hadoop-hdfs-2.6.0.jar:na] at org.apache.hadoop.hdfs.DFSOutputStream.hsync(DFSOutputStream.java:1951) ~[hadoop-hdfs-2.6.0.jar:na] at org.apache.hadoop.fs.FSDataOutputStream.hsync(FSDataOutputStream.java:139) ~[hadoop-common-2.6.0.jar:na] at org.apache.hadoop.io.SequenceFile$Writer.hsync(SequenceFile.java:1250) ~[hadoop-common-2.6.0.jar:na] at org.apache.storm.hdfs.bolt.SequenceFileBolt.execute(SequenceFileBolt.java:114) ~[classes/:na] at backtype.storm.daemon.executor$eval5170$fn5171$tuple_action_fn5173.invoke(executor.clj:630) [na:0.9.1-incubating] at backtype.storm.daemon.executor$mk_task_receiver$fn5091.invoke(executor.clj:398) [na:0.9.1-incubating] at backtype.storm.disruptor$clojure_handler$reify__1894.onEvent(disruptor.clj:58) [na:0.9.1-incubating] at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:104) [storm-core-0.9.1-incubating.jar:0.9.1-incubating] at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78) [storm-core-0.9.1-incubating.jar:0.9.1-incubating] at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77) [na:0.9.1-incubating] at backtype.storm.daemon.executor$eval5170$fn5171$fn5183$fn5230.invoke(executor.clj:745) [na:0.9.1-incubating] at backtype.storm.util$async_loop$fn__390.invoke(util.clj:433) [na:0.9.1-incubating] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]

Could you help me find the fault?

250690392 commented 6 years ago

hello,Thanks for your contiribution. I meet a mistake when running SeuqenceFileTopology: the cosole print: java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151) ~[hadoop-hdfs-2.6.0.jar:na] at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:2038) ~[hadoop-hdfs-2.6.0.jar:na] at org.apache.hadoop.hdfs.DFSOutputStream.hsync(DFSOutputStream.java:1968) ~[hadoop-hdfs-2.6.0.jar:na] at org.apache.hadoop.hdfs.DFSOutputStream.hsync(DFSOutputStream.java:1951) ~[hadoop-hdfs-2.6.0.jar:na] at org.apache.hadoop.fs.FSDataOutputStream.hsync(FSDataOutputStream.java:139) ~[hadoop-common-2.6.0.jar:na] at org.apache.hadoop.io.SequenceFile$Writer.hsync(SequenceFile.java:1250) ~[hadoop-common-2.6.0.jar:na] at org.apache.storm.hdfs.bolt.SequenceFileBolt.execute(SequenceFileBolt.java:114) ~[classes/:na] at backtype.storm.daemon.executor$eval5170$fn5171$tuple_action_fn5173.invoke(executor.clj:630) [na:0.9.1-incubating] at backtype.storm.daemon.executor$mk_task_receiver$fn5091.invoke(executor.clj:398) [na:0.9.1-incubating] at backtype.storm.disruptor$clojure_handler$reify__1894.onEvent(disruptor.clj:58) [na:0.9.1-incubating] at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:104) [storm-core-0.9.1-incubating.jar:0.9.1-incubating] at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78) [storm-core-0.9.1-incubating.jar:0.9.1-incubating] at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77) [na:0.9.1-incubating] at backtype.storm.daemon.executor$eval5170$fn5171$fn5183$fn5230.invoke(executor.clj:745) [na:0.9.1-incubating] at backtype.storm.util$async_loop$fn__390.invoke(util.clj:433) [na:0.9.1-incubating] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]

Could you help me find the fault?