yahoo / CaffeOnSpark

Distributed deep learning on Hadoop and Spark clusters.
Apache License 2.0
1.27k stars 358 forks source link

How to use 2-dimension data (like the simplest 2d matrix) as input? #266

Closed Yunzhi-b closed 7 years ago

Yunzhi-b commented 7 years ago

Hello,

I have successfully installed CaffeOnSpark and run all the examples. But now, I want to use a 2d matrix as input of the net, and it's not clear for me how to achieve it. I work in python, and in Standalone cluster.

On Caffe, I could use HDF5 layer. But on CaffeOnSpark, as it doesn't support HDF5, should I use memory data layer? As my input is not image, what should I put in "source_class", and "source", ''channels'', "height", "width" in "memory_data_param"?

Specificly, should I firstly transform my matrix into spark's dataframe, and pass it when I call "caffeonspark.train(...)"? If so, what should I put in ''source'' of ''memory_data_param", in prototxt file?

And I use two customized layers, when I work in Standalone cluster, what should I do?

I'm sorry for asking too many questions here... I also post my question in "caffeonspark-users". I really need your help.

Thank you in advance!!!!

junshi15 commented 7 years ago

you need convert your matrix to float_array, then put it in data frame.

In your prototxt, define a CoS layer: protobuf definition: https://github.com/yahoo/caffe/blob/fc0a02efd720bdc29b2f1d93e5354f19a2b9d559/src/caffe/proto/caffe.proto#L1409-L1452

Then you should use DataFrameSource: https://github.com/yahoo/CaffeOnSpark/blob/master/caffe-grid/src/main/scala/com/yahoo/ml/caffe/DataFrameSource.scala

Example: https://github.com/yahoo/CaffeOnSpark/blob/master/data/lenet_cos_train_test.prototxt#L2-L32

Yunzhi-b commented 7 years ago

Thanks for your reply! I have tried what you've told me, it could successfully build my network, but during train phase, when loading the input data, I got an error below:

...
17/07/03 16:42:00 INFO InternalParquetRecordReader: RecordReader initialized will read a total of 2966 records.
17/07/03 16:42:00 INFO InternalParquetRecordReader: at row 0. reading next block
17/07/03 16:42:00 INFO CodecPool: Got brand-new decompressor [.gz]
17/07/03 16:42:00 INFO InternalParquetRecordReader: block read in memory in 14 ms. row count = 2966
17/07/03 16:42:09 INFO Executor: Finished task 0.0 in stage 18.0 (TID 24). 2281 bytes result sent to driver
17/07/03 16:42:09 INFO CoarseGrainedExecutorBackend: Got assigned task 25
17/07/03 16:42:09 INFO Executor: Running task 1.0 in stage 18.0 (TID 25)
17/07/03 16:42:09 INFO ParquetRelation$$anonfun$buildInternalScan$1$$anon$1: Input split: ParquetInputSplit{part: file:/home/yunzhi/CaffeOnSpark/data/DAE/train_movie_data/part-r-00000-56b6e423-8062-4d8a-856f-eee07393a166.gz.parquet start: 0 end: 453161 length: 453161 hosts: []}
17/07/03 16:42:09 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/07/03 16:42:09 INFO CatalystReadSupport: Going to read the following fields from the Parquet file:

Parquet form:
message spark_schema {
  optional group data (LIST) {
    repeated group list {
      optional double element;
    }
  }
  optional group label (LIST) {
    repeated group list {
      optional double element;
    }
  }
}

Catalyst form:
StructType(StructField(data,ArrayType(DoubleType,true),true), StructField(label,ArrayType(DoubleType,true),true))

17/07/03 16:42:09 INFO InternalParquetRecordReader: RecordReader initialized will read a total of 3072 records.
17/07/03 16:42:09 INFO InternalParquetRecordReader: at row 0. reading next block
17/07/03 16:42:09 INFO CodecPool: Got brand-new decompressor [.gz]
17/07/03 16:42:09 INFO InternalParquetRecordReader: block read in memory in 5 ms. row count = 3072
17/07/03 16:42:18 INFO Executor: Finished task 1.0 in stage 18.0 (TID 25). 2281 bytes result sent to driver
17/07/03 16:42:18 INFO CoarseGrainedExecutorBackend: Got assigned task 26
17/07/03 16:42:18 INFO Executor: Running task 0.0 in stage 19.0 (TID 26)
17/07/03 16:42:18 INFO MapOutputTrackerWorker: Updating epoch to 1 and clearing cache
17/07/03 16:42:18 INFO TorrentBroadcast: Started reading broadcast variable 24
17/07/03 16:42:18 INFO MemoryStore: Block broadcast_24_piece0 stored as bytes in memory (estimated size 2.5 KB, free 1580.4 KB)
17/07/03 16:42:18 INFO TorrentBroadcast: Reading broadcast variable 24 took 17 ms
17/07/03 16:42:18 INFO MemoryStore: Block broadcast_24 stored as values in memory (estimated size 4.3 KB, free 1584.7 KB)
17/07/03 16:42:18 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 0, fetching them
17/07/03 16:42:18 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@10.0.2.15:33376)
17/07/03 16:42:18 INFO MapOutputTrackerWorker: Got the output locations
17/07/03 16:42:18 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
17/07/03 16:42:18 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 8 ms
17/07/03 16:42:18 ERROR CaffeProcessor: Transformer thread failed
java.lang.ClassCastException: java.lang.Double cannot be cast to java.lang.Float
    at scala.runtime.BoxesRunTime.unboxToFloat(BoxesRunTime.java:114)
    at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:93)
    at scala.collection.IndexedSeqOptimized$class.copyToArray(IndexedSeqOptimized.scala:174)
    at scala.collection.mutable.WrappedArray.copyToArray(WrappedArray.scala:34)
    at scala.collection.TraversableOnce$class.copyToArray(TraversableOnce.scala:241)
    at scala.collection.AbstractTraversable.copyToArray(Traversable.scala:105)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:249)
    at scala.collection.mutable.WrappedArray.toArray(WrappedArray.scala:72)
    at com.yahoo.ml.caffe.DataFrameSource$$anonfun$nextBatch$1.apply$mcVI$sp(DataFrameSource.scala:268)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
    at com.yahoo.ml.caffe.DataFrameSource.nextBatch(DataFrameSource.scala:234)
    at com.yahoo.ml.caffe.DataFrameSource.nextBatch(DataFrameSource.scala:30)
    at com.yahoo.ml.caffe.CaffeProcessor.com$yahoo$ml$caffe$CaffeProcessor$$doTransform(CaffeProcessor.scala:285)
    at com.yahoo.ml.caffe.CaffeProcessor$$anonfun$startThreads$1$$anonfun$apply$mcVI$sp$2$$anonfun$apply$mcZI$sp$1.apply$mcV$sp(CaffeProcessor.scala:151)
    at com.yahoo.ml.caffe.CaffeProcessor$$anonfun$startThreads$1$$anonfun$apply$mcVI$sp$2$$anonfun$apply$mcZI$sp$1.apply(CaffeProcessor.scala:151)
    at com.yahoo.ml.caffe.CaffeProcessor$$anonfun$startThreads$1$$anonfun$apply$mcVI$sp$2$$anonfun$apply$mcZI$sp$1.apply(CaffeProcessor.scala:151)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at java.util.concurrent.ForkJoinTask$AdaptedRunnable.exec(ForkJoinTask.java:1266)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334)
    at java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604)
    at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:785)
    at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:647)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398)
17/07/03 16:42:18 INFO CaffeProcessor: Model saving into file at the end of training:file:///tmp/lenet.model
I0703 16:42:18.919803  9792 solver.cpp:454] Snapshotting to binary proto file snaps_iter_0.caffemodel
I0703 16:42:19.110486  9792 sgd_solver.cpp:273] Snapshotting solver state to binary proto file snaps_iter_0.solverstate
17/07/03 16:42:19 INFO FSUtils$: destination file:file:///tmp/lenet.model
17/07/03 16:42:19 INFO FSUtils$: /home/yunzhi/spark-1.6.0-bin-hadoop2.6/work/app-20170703163930-0002/0/snaps_iter_0.caffemodel-->/tmp/lenet.model
17/07/03 16:42:30 INFO Executor: Finished task 0.0 in stage 19.0 (TID 26). 1263 bytes result sent to driver
17/07/03 16:42:30 INFO CoarseGrainedExecutorBackend: Got assigned task 27
17/07/03 16:42:30 INFO Executor: Running task 0.0 in stage 20.0 (TID 27)
17/07/03 16:42:30 INFO TorrentBroadcast: Started reading broadcast variable 25
17/07/03 16:42:30 INFO MemoryStore: Block broadcast_25_piece0 stored as bytes in memory (estimated size 1292.0 B, free 1585.9 KB)
17/07/03 16:42:30 INFO TorrentBroadcast: Reading broadcast variable 25 took 16 ms
17/07/03 16:42:30 INFO MemoryStore: Block broadcast_25 stored as values in memory (estimated size 2.0 KB, free 1588.0 KB)
17/07/03 16:42:30 INFO Executor: Finished task 0.0 in stage 20.0 (TID 27). 939 bytes result sent to driver
17/07/03 16:42:30 INFO CoarseGrainedExecutorBackend: Got assigned task 28
17/07/03 16:42:30 INFO Executor: Running task 0.0 in stage 21.0 (TID 28)
17/07/03 16:42:30 INFO TorrentBroadcast: Started reading broadcast variable 26
17/07/03 16:42:30 INFO MemoryStore: Block broadcast_26_piece0 stored as bytes in memory (estimated size 1278.0 B, free 1589.2 KB)
17/07/03 16:42:30 INFO TorrentBroadcast: Reading broadcast variable 26 took 12 ms
17/07/03 16:42:30 INFO MemoryStore: Block broadcast_26 stored as values in memory (estimated size 2016.0 B, free 1591.2 KB)
17/07/03 16:42:30 WARN CaffeProcessor: Some transformer threads haven't been terminated yet
17/07/03 16:42:30 WARN CaffeProcessor: Some transformer threads haven't been terminated yet
17/07/03 16:42:30 INFO Executor: Finished task 0.0 in stage 21.0 (TID 28). 978 bytes result sent to driver
17/07/03 16:43:05 INFO CoarseGrainedExecutorBackend: Got assigned task 29
17/07/03 16:43:05 INFO Executor: Running task 0.0 in stage 22.0 (TID 29)
17/07/03 16:43:05 INFO TorrentBroadcast: Started reading broadcast variable 27
17/07/03 16:43:05 INFO MemoryStore: Block broadcast_27_piece0 stored as bytes in memory (estimated size 3.4 KB, free 1594.5 KB)
17/07/03 16:43:05 INFO TorrentBroadcast: Reading broadcast variable 27 took 23 ms
17/07/03 16:43:05 INFO MemoryStore: Block broadcast_27 stored as values in memory (estimated size 6.4 KB, free 1600.9 KB)
17/07/03 16:43:05 INFO CacheManager: Partition rdd_44_0 not found, computing it
17/07/03 16:43:05 INFO GenerateUnsafeProjection: Code generated in 45.61732 ms
17/07/03 16:43:05 INFO BlockManager: Found block rdd_44_0 locally
17/07/03 16:43:05 INFO GeneratePredicate: Code generated in 19.881757 ms
17/07/03 16:43:05 INFO GenerateColumnAccessor: Code generated in 70.923563 ms
17/07/03 16:43:05 INFO GenerateSafeProjection: Code generated in 12.4974 ms
17/07/03 16:43:05 INFO Executor: Finished task 0.0 in stage 22.0 (TID 29). 1749 bytes result sent to driver
17/07/03 16:43:05 INFO CoarseGrainedExecutorBackend: Got assigned task 30
17/07/03 16:43:05 INFO Executor: Running task 0.0 in stage 23.0 (TID 30)
17/07/03 16:43:05 INFO TorrentBroadcast: Started reading broadcast variable 28
17/07/03 16:43:05 INFO MemoryStore: Block broadcast_28_piece0 stored as bytes in memory (estimated size 3.4 KB, free 1604.2 KB)
17/07/03 16:43:05 INFO TorrentBroadcast: Reading broadcast variable 28 took 25 ms
17/07/03 16:43:05 INFO MemoryStore: Block broadcast_28 stored as values in memory (estimated size 6.4 KB, free 1610.6 KB)
17/07/03 16:43:05 INFO CacheManager: Partition rdd_44_1 not found, computing it
17/07/03 16:43:05 INFO BlockManager: Found block rdd_44_1 locally
17/07/03 16:43:05 INFO Executor: Finished task 0.0 in stage 23.0 (TID 30). 1749 bytes result sent to driver

It seems like an error "java.lang.ClassCastException: java.lang.Double cannot be cast to java.lang.Float".

I post my code here:

cos=CaffeOnSpark(sc)
args={}
args['conf']='/home/yunzhi/CaffeOnSpark/data/DAE/solver_movielens.prototxt'
args['model']='file:///tmp/lenet.model'
args['devices']='1'
args['clusterSize']='1'
cfg=Config(sc,args)

sqlCtx = SQLContext(sc)
df_train = sqlCtx.read.parquet('/home/yunzhi/CaffeOnSpark/data/DAE/train_movie_data')
dl_train_source = DataSource(sc).getSource(cfg,True)
dl_validation_source = DataSource(sc).getSource(cfg,False)
validation_result_df = cos.trainWithValidation(dl_train_source, dl_validation_source)

I also checked the type of my input by type(df_train.take(1)[0].data[0]) It gave float

Could you help me find where is this error from? Thank you!

Yunzhi-b commented 7 years ago

@junshi15 To be more specific, I put here my code for transforming matrix data to adjust DataFrameSource

rdd = sc.parallelize(train_dense.tolist())
sqlContext = SQLContext(sc)
# In my model, the label and data are the same.
rdd1 = rdd.map(lambda l: Row(label=l,data=l))
schemaRDD = sqlContext.createDataFrame(rdd1)
schemaRDD.saveAsParquetFile('train_movie_data')

And my prototxt is :

layer {
  name: "data"
  type: "CoSData"
  top: "data"
  top: "label"
  include {
    phase: TRAIN
  }
  source_class: "com.yahoo.ml.caffe.DataFrameSource"
  cos_data_param {
    source: "/home/yunzhi/CaffeOnSpark/data/DAE/train_movie_data/"
    batch_size: 20
    shuffle: true
    top {
      name: "data"
      type: FLOAT_ARRAY
      channels: 3533
      sample_num_axes: 1 
    }
    top {
      name: "label"
      type: FLOAT_ARRAY
      channels: 3533
      sample_num_axes: 1 
    }
  }
}
layer {
  name: "data"
  type: "CoSData"
  top: "data"
  top: "label"
  include {
    phase: TEST
  }
  source_class: "com.yahoo.ml.caffe.DataFrameSource"
  cos_data_param {
    source: "/home/yunzhi/CaffeOnSpark/data/DAE/train_movie_data/"
    batch_size: 20
    top {
      name: "data"
      type: FLOAT_ARRAY
      channels: 3533
      sample_num_axes: 1 
    }
    top {
      name: "label"
      type: FLOAT_ARRAY
      channels: 3533
      sample_num_axes: 1 
    }
  }
}
layer {
  name: "noisydata"
  type: "Python"
  bottom: "data"
  top: "noisydata"
  top: "mask"
  include {
    phase: TRAIN
  }
  python_param {
    module: "noisyLayer"
    layer: "NoisyLayer"
    param_str: '{"hide_ration": 0.25}'
  }
}
layer {
  name: "noisydata"
  type: "Python"
  bottom: "data"
  top: "noisydata"
  top: "mask"
  include {
    phase: TEST
  }
  python_param {
    module: "noisyLayer"
    layer: "NoisyLayer"
    param_str: '{"hide_ration": 0.0}'
  }
}
layer {
  name: "ip1"
  type: "InnerProduct"
  bottom: "noisydata"
  top: "ip1"
  param { lr_mult: 1 decay_mult: 1 }
  param { lr_mult: 1 decay_mult: 1 }
  inner_product_param {
    num_output: 600
    weight_filler {
      type: "xavier"
    }
  }
}
layer {
  name: "relu1"
  type: "ReLU"
  bottom: "ip1"
  top: "ip1"
}
layer {
  name: "ip2"
  type: "InnerProduct"
  bottom: "ip1"
  top: "ip2"
  param { lr_mult: 1 decay_mult: 1 }
  param { lr_mult: 1 decay_mult: 1 }
  inner_product_param {
    num_output: 3533
    weight_filler {
      type: "xavier"
    }
  }
}
layer {
  name: "out"
  type: "TanH"
  bottom: "ip2"
  top: "ip22"
}
layer {
  name: "loss"
  type: "Python"
  bottom: "ip22"
  bottom: "label"
  bottom: "mask"
  top: "loss"
  python_param {
    module: "DAELossLayer"
    layer: "DAELossLayer"
    param_str: '{"coef_denoising": 1.,"coef_recons":0.5}'
  }
  loss_weight: 1
}

Thanks again

junshi15 commented 7 years ago

I see this line: StructType(StructField(data,ArrayType(DoubleType,true),true), StructField(label,ArrayType(DoubleType,true),true))

It is possible when you generated your data frame, you used double instead of float?

Yunzhi-b commented 7 years ago

Thank you! @junshi15 I have solved the problem and my model trains well. The problem is exactly from double type I used for the input. Before, I checked the type in python, it was Float. But Float in Python may be considered as Double in C. So I should exactly change the type to Float (float in C, not just float in Python). What I use to achieve is the package of Spark: Sql.types