Apache Spark™ is a fast and general engine for large-scale data processing.
This Gem allows the use Spark functionality on Ruby.
Word count in Spark's Ruby API
file = spark.text_file("hdfs://...")
file.flat_map(:split)
.map(lambda{|word| [word, 1]})
.reduce_by_key(lambda{|a, b| a+b})
Add this line to your application's Gemfile:
gem 'ruby-spark'
And then execute:
$ bundle
Or install it yourself as:
$ gem install ruby-spark
Run rake compile
if you are using gem from local filesystem.
This command will download Spark and build extensions for this gem (SBT is used for compiling). For more informations check wiki. Jars will be stored at you HOME directory.
$ ruby-spark build
You can use Ruby Spark via interactive shell (Pry is used)
$ ruby-spark shell
Or on existing project.
If you want configure Spark first. See configurations for more details.
require 'ruby-spark'
# Configuration
Spark.config do
set_app_name "RubySpark"
set 'spark.ruby.serializer', 'oj'
set 'spark.ruby.serializer.batch_size', 100
end
# Start Apache Spark
Spark.start
# Context reference
Spark.sc
Finally, to stop the cluster. On the shell is Spark stopped automatically when environment exit.
Spark.stop
After first use, global configuration is created at ~/.ruby-spark.conf. There can be specified properties for Spark and RubySpark.
Single text file:
rdd = sc.text_file(FILE, workers_num, serializer=nil)
All files on directory:
rdd = sc.whole_text_files(DIRECTORY, workers_num, serializer=nil)
Direct uploading structures from ruby:
rdd = sc.parallelize([1,2,3,4,5], workers_num, serializer=nil)
rdd = sc.parallelize(1..5, workers_num, serializer=nil)
There is 2 conditions:
If you do not specified serializer -> default is used (defined from spark.ruby.serializer.* options). Check this if you want create custom serializer.
All operations can be divided into 2 groups:
More informations:
You can also check official Spark documentation. First make sure that method is implemented here.
rdd.map(function)
rdd.flat_map(function)
rdd.map_partitions(function)
rdd.filter(function)
rdd.cartesian(other)
rdd.intersection(other)
rdd.sample(with_replacement, fraction, seed)
rdd.group_by_key(num_partitions)
...many more...
rdd.take(count)
rdd.reduce(function)
rdd.aggregate(zero_value, seq_op, comb_op)
rdd.histogram(buckets)
rdd.collect
...many more...
# Every batch will be serialized by Marshal and will have size 10
ser = Spark::Serializer.build('batched(marshal, 10)')
# Range 0..100, 2 workers, custom serializer
rdd = Spark.sc.parallelize(0..100, 2, ser)
# Take first 5 items
rdd.take(5)
# => [0, 1, 2, 3, 4]
# Numbers reducing
rdd.reduce(lambda{|sum, x| sum+x})
rdd.reduce(:+)
rdd.sum
# => 5050
# Reducing with zero items
seq = lambda{|x,y| x+y}
com = lambda{|x,y| x*y}
rdd.aggregate(1, seq, com)
# 1. Every workers adds numbers
# => [1226, 3826]
# 2. Results are multiplied
# => 4690676
# Statistic method
rdd.stats
# => StatCounter: (count, mean, max, min, variance,
# sample_variance, stdev, sample_stdev)
# Compute a histogram using the provided buckets.
rdd.histogram(2)
# => [[0.0, 50.0, 100], [50, 51]]
# Mapping
rdd.map(lambda {|x| x*2}).collect
# => [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, ...]
rdd.map(:to_f).collect
# => [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, ...]
# Mapping the whole collection
rdd.map_partitions(lambda{|part| part.reduce(:+)}).collect
# => [1225, 3825]
# Selecting
rdd.filter(lambda{|x| x.even?}).collect
# => [0, 2, 4, 6, 8, 10, 12, 14, 16, ...]
# Sampling
rdd.sample(true, 10).collect
# => [3, 36, 40, 54, 58, 82, 86, 95, 98]
# Sampling X items
rdd.take_sample(true, 10)
# => [53, 87, 71, 74, 18, 75, 55, 94, 46, 32]
# Using external process
rdd.pipe('cat', "awk '{print $1*10}'")
# => ["0", "10", "20", "30", "40", "50", ...]
# Content:
# "first line"
# "second line"
rdd = sc.text_file(PATH)
# ["first", "line", "second", "line"]
rdd = rdd.flat_map(lambda{|line| line.split})
# [["first", 1], ["line", 1], ["second", 1], ["line", 1]]
rdd = rdd.map(lambda{|word| [word, 1]})
# [["first", 1], ["line", 2], ["second", 1]]
rdd = rdd.reduce_by_key(lambda{|a, b| a+b})
# {"first"=>1, "line"=>2, "second"=>1}
rdd.collect_as_hash
slices = 3
n = 100000 * slices
def map(_)
x = rand * 2 - 1
y = rand * 2 - 1
if x**2 + y**2 < 1
return 1
else
return 0
end
end
rdd = Spark.context.parallelize(1..n, slices, serializer: 'oj')
rdd = rdd.map(method(:map))
puts 'Pi is roughly %f' % (4.0 * rdd.sum / n)
rdd = sc.parallelize([10_000], 1)
rdd = rdd.add_library('bigdecimal/math')
rdd = rdd.map(lambda{|x| BigMath.PI(x)})
rdd.collect # => #<BigDecimal, '0.31415926...'>
Mllib functions are using Spark's Machine Learning Library. Ruby objects are serialized and deserialized in Java so you cannot use custom classes. Supported are primitive types such as string or integers.
All supported methods/models:
# Import Mllib classes into Object
# Otherwise are accessible via Spark::Mllib::LinearRegressionWithSGD
Spark::Mllib.import(Object)
# Training data
data = [
LabeledPoint.new(0.0, [0.0]),
LabeledPoint.new(1.0, [1.0]),
LabeledPoint.new(3.0, [2.0]),
LabeledPoint.new(2.0, [3.0])
]
# Train a model
lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initial_weights: [1.0])
lrm.predict([0.0])
Spark::Mllib.import
# Dense vectors
data = [
DenseVector.new([0.0,0.0]),
DenseVector.new([1.0,1.0]),
DenseVector.new([9.0,8.0]),
DenseVector.new([8.0,9.0])
]
model = KMeans.train(sc.parallelize(data), 2)
model.predict([0.0, 0.0]) == model.predict([1.0, 1.0])
# => true
model.predict([8.0, 9.0]) == model.predict([9.0, 8.0])
# => true