vicenteg / mapr-pcapstream

An example Spark Streaming application processing PCAP data.
20 stars 10 forks source link

FileStream input format error #9

Open samchorlton opened 8 years ago

samchorlton commented 8 years ago

Hi There,

I've been trying to implement the code to see how it works and I appear to be getting an error. Would you be able to take a look and see if you can see what is causing it? If you need to see more of the code then just let me know. Thanks

My imports are:

import net.ripe.hadoop.pcap.io.PcapInputFormat import net.ripe.hadoop.pcap.packet.Packet import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark._

import org.apache.hadoop.io.{LongWritable, ObjectWritable} import org.apache.hadoop.mapred.FileInputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.TextInputFormat

This line is causing the error:

val pcapData = ssc.fileStreamLongWritable, ObjectWritable, PcapInputFormat

Is causing the following error:

- wrong number of type parameters for overloaded method value fileStream with alternatives: [K, V, F <: 
 org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean, conf: 
 org.apache.hadoop.conf.Configuration)(implicit evidence$12: scala.reflect.ClassTag[K], implicit evidence$13: scala.reflect.ClassTag[V], implicit evidence$14: 
 scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]]
 (directory: String, filter: org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean)(implicit evidence$9: scala.reflect.ClassTag[K], implicit evidence$10: 
 scala.reflect.ClassTag[V], implicit evidence$11: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <: 
 org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String)(implicit evidence$6: scala.reflect.ClassTag[K], implicit evidence$7: 
 scala.reflect.ClassTag[V], implicit evidence$8: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)]
- type arguments [org.apache.hadoop.io.LongWritable,org.apache.hadoop.io.ObjectWritable,net.ripe.hadoop.pcap.io.PcapInputFormat] conform to the 
 bounds of none of the overloaded alternatives of value fileStream: [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: 
 org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean, conf: org.apache.hadoop.conf.Configuration)(implicit evidence$12: scala.reflect.ClassTag[K], 
 implicit evidence$13: scala.reflect.ClassTag[V], implicit evidence$14: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)] 
 <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean)
 (implicit evidence$9: scala.reflect.ClassTag[K], implicit evidence$10: scala.reflect.ClassTag[V], implicit evidence$11: 
 scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]]
 (directory: String)(implicit evidence$6: scala.reflect.ClassTag[K], implicit evidence$7: scala.reflect.ClassTag[V], implicit evidence$8: 
 scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)]
- type arguments [org.apache.hadoop.io.LongWritable,org.apache.hadoop.io.ObjectWritable,net.ripe.hadoop.pcap.io.PcapInputFormat] conform to the 
 bounds of none of the overloaded alternatives of value fileStream: [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: 
 org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean, conf: org.apache.hadoop.conf.Configuration)(implicit evidence$12: scala.reflect.ClassTag[K], 
 implicit evidence$13: scala.reflect.ClassTag[V], implicit evidence$14: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)] 
 <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean)
 (implicit evidence$9: scala.reflect.ClassTag[K], implicit evidence$10: scala.reflect.ClassTag[V], implicit evidence$11: 
 scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]]
 (directory: String)(implicit evidence$6: scala.reflect.ClassTag[K], implicit evidence$7: scala.reflect.ClassTag[V], implicit evidence$8: 
 scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)]
vicenteg commented 8 years ago

Hi @samchorlton,

It's a bit hard for me to say exactly what the problem is. I do recall hitting something like this at one point, but I don't recall the root cause.

Do you have a repo with your complete implementation?

samchorlton commented 8 years ago

Hi @vicenteg,

I can't easily upload it I'm afraid. I've have been doing some more investigation and if I use CombinePcapInputFormat it doesn't produce the error?

vicenteg commented 8 years ago

Sounds like you were able to work around the problem, then?