quinngroup / dr1dl-pyspark

Dictionary Learning in PySpark
Apache License 2.0
1 stars 1 forks source link

P1: Code stubs for parallel matrix-vector multiplication #20

Closed XiangLi-Shaun closed 8 years ago

XiangLi-Shaun commented 8 years ago

We would need the code for parallel matrix-vector multiplication and the corresponding test cases to see whether we could get a performance boost from doing so, which would support the development of the parallel version of r1DL and sccDL.

magsol commented 8 years ago

This is 1 of the 4 initial Spark primitives we need for dictionary learning: matrix-vector multiplication.

It can be accomplished through steps:

  1. Broadcast the vector v, e.g. sc.broadcast(v) to all worker nodes.
  2. map over the RDD.
  3. Each mapper should multiply its corresponding row of the matrix with the broadcasted vector and return the inner product (and an index to the row number).
  4. The driver collects the return values and assembles them into a vector, as indexed by the row numbers.
MOJTABAFA commented 8 years ago

@magsol when in our program need matrix multiplications ? in np.dot(s,u_old) and np.dot(S[:, idxs_n], v[idxs_n]) ? so in these cases , for the first one we should broadcast the u_old vector and for the second one v[idxs_n] ? am I right ? for the first one it seems easy , but what about for the 2nd one ?

magsol commented 8 years ago

The second one is easy too; we can broadcast the vector v and the list of indices.

On Tue, Dec 15, 2015 at 6:59 PM MOJTABAFA notifications@github.com wrote:

@magsol https://github.com/magsol when in our program need matrix multiplications ? in np.dot(s,u_old) and np.dot(S[:, idxs_n], v[idxs_n]) ? so in these cases , for the first one we should broadcast the u_old vector and for the second one v[idxs_n] ? am I right ? for the first one it seems easy , but what about for the 2nd one ?

— Reply to this email directly or view it on GitHub https://github.com/quinngroup/pyspark-dictlearning/issues/20#issuecomment-164936920 .

iPhone'd

MOJTABAFA commented 8 years ago

@magsol Today I tried to cope with the first spark stage together with Milad , but unfortunately we stocked in the first step . As I discussed with you yesterday the main problem is we have a Textfile , however, we should change it to a float matrix to be able to do the computation on it. I tried following codes :

import numpy as np
lines = sc.textFile("/home/targol/Desktop/s.txt")
dataArrays = lines.map(lambda s:s.split("\t"))
data = raw_data.flatMap(lambda s: s.split("\t")).collect()

there we can remove /t from textfile but the problem is the file will be changed into list of strings. I mean the rows would be string and we can not change the screen lists to float. also we have tested the

dataArrays = data.map(lambda s : np.asarray(s))
dataArrays.collect()
dataArrays.first()

so in this case we can change the lists to arrays. hence we can access the array elements for example the first element could be as :

>>> dataArrays[2]
'0.728215'

but we cannot do that on all elements even with following codes :

def parseVector(line):
    return np.array([float(x) for x in line.split(',')])
 a = parseVector(dataArrays.first())
a = parseVector(dataArrays.first())
a
def parse(data):
a = parseVector(dataArrays.first())
floatData = dataArrays.map(lambda s : parseVector(s))
.
.
.
float = dataArrays.map(lambda x : x.foreach(lambda s: float(s)))
float
float.collect

always the Error is as follows :

    "It appears that you are attempting to broadcast an RDD or reference an RDD from an "
Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

even we tried to exactly code the page 219 of spark book which you already gave me : img_2839

but even this code again generated the previous Error.

So I confused alot again today. Please help us and let us know how should we can continue.

magsol commented 8 years ago

Ok, lots of issues here to clear up. I'm glad you're working through this; keep it coming!

Branching

First of all, I think it'd be very useful if you were able to commit the code you're using. However, we don't want to "pollute" the repository with experimental code that you're just tinkering with. Fortunately, git has a solution: branching. Effectively, you create a "branch" in the revision tree, leaving the "master" branch (default branch you've been working on this whole time) separate from your "experimental" branch, where you can go wild with whatever edits you want. That way, I can still track your progress and pull down your code, but it won't nuke our "master" branch that has our stable code.

I'd highly recommend getting an experimental branch set up that we can tinker with. Once we have a solution, you can either implement it directly in the master branch, or "merge" the changes from one branch into another.

Datatype conversion

You're almost there, just missing a crucial step that is in the book.

dataArrays = lines.map(lambda s:s.split("\t"))

That does indeed split each line up on the tab \t character. But you're right: everything is still a string. All that's changed is that, instead of each line being a single string object, each line is now a list of string objects. We need to convert the list of strings into a list of floats.

And that's where my next big point comes in: RDD APIs.

RDDs

The error you're getting is related to how distributed computing works. Whenever you invoke a method on an RDD, e.g. rdd.map or rdd.flatMap, think of it as a Hadoop Mapper or Reducer. That is, inside that method, you are now running on some individual machine in your cluster. As such, you can't call another RDD method! That would be akin to being inside a Hadoop Mapper and trying to call another Hadoop Mapper; it violates the paradigm. Also, recursive distributed programming sounds awful.

The code in the book is correct; I just think you're not matching up your braces correctly. Here's what the Python code would look like:

raw_data = sc.textFile('/path/to/csv/data')  # creates an RDD of strings
data = raw_data.filter(lambda x: x.startswith("#")).map(lambda x: map(float, x.split(",")))

Another point of confusion here may be the two map calls: they are entirely different functions. The first one, .map(lambda x:, is the rdd.map() function. The second is a built-in, local Scala map function, for which Python also has an equivalent. It is not distributed, nor is it even part of Spark, but rather the core Python language. That's why this works, even though you're in a "mapper" at this point.

What that second map does: remember, the data x is a single string that we want to convert to a list of floats (really, an array of floats). The map function takes as input a list of items, and outputs a new list of items that has had the specified function (in this case, float) applied to it. So the input object, x.split(","), is a list of strings (the result of a single string being split up on the "," character into a list of strings), the function is float, and each element in the input list is run through the float function, converting the string to a float. The list of floats is then returned, and voila!

(in our case, we also want to take one extra step and convert the list to a NumPy array)

milad181 commented 8 years ago

I had a few challenges during installing thunder package on the temporary spark server and I faced with a few error mostly related to some math dependents even after installing anaconda package.

We worked with two different version of python2 and 3 and each of which came up with unknown errors. Please find the attached logfile which shows errors we faced on python2 while installing thunder.

Any help will be highly appreciated. pip.txt

MOJTABAFA commented 8 years ago

@magsol Actually, we worked on issue but the work is more complex than what I thought before.

  1. for converting the text matrix to a float list and then making nparrays from lists finally I did through the following code, the main problem was on the last element of each row which is ' ', so we have to drop it from end of the list with a function as it described as follows :
import numpy as np
def floater(x):
    for i in range(len(x)):
        if(x[i]!= ''):
            x[i]= float(x[i])

        else:
             del x[i]

    return (x)

#importing the data 

lines = sc.textFile("/home/targol/Desktop/s.txt")

# floating the all the matrix data
dataArrays = lines.map(lambda s:floater(s.split("\t")))

#changing the list of float data to np array

dataArrays = lines.map(lambda s:np.asarray(floater(s.split("\t"))))

So now our matrix is a float matrix which is ready to be processed.

2. Thunder We worked with milad on Thunder , It was really a big challenge even for milad who is professional in shell and system and unfortunately we encountered alot of problems. Now there is a question for me , for whitening why we cannot do like before in python as follows ?:

### whitening process
S = dataArrays.first() -dataArrays.first().mean()
S = S / sla.norm(S, axis = 0) 

3. matrix multiplication : Actually It's different from what we imagined before : before we thought that we can broadcast our vector and also sending each road to different clients and do the dot product there. but the problem is our vector length is equal to length of matrix columns. I mean for example in our test file we have a matrix of test with (100,5) dimensions and our u_old dimensions are (1,100) thereby we're dealing with columns! so we should send the columns to different clients , but in spark RDDs only can be sent by columns. From another point of view, we can transpose the S matrix and in this case we can send the rows to clients and handle the dot operation. But the problem is in RDDs we cannot transpose the matrix .The only solution which I thought about was using indexed = dataArrays.zipWithIndex() thereby we can allocate an index to each row and we can multiply each element of matrix row by corresponding element in vector then saving the corresponded dot in new matrix using the index row but still there is problem of pointing to index, I dont know how we point to index ?.

Actually I totally get confused. please help us.

by the way still our program is not so large but I'll put them in a new branch very soon.

magsol commented 8 years ago

More thorough answers later, but a few quick points--

@magsol Actually, we worked on issue but the work is more complex than what I thought before.

for converting the text matrix to a float list and then making nparrays from lists finally I did through the following code, the main problem was on the last element of each row which is ' ', so we have to drop it from end of the list with a function as it described as follows :

No, this code is extremely inefficient. You have an entire extraneous RDD.map just to convert to numpy arrays when you could do that at the end of the previous map. Also, the explicit for loop is going to slow down the program drastically.

The implementation in the book was fine and will do what you need it to do. Why not use that?

We worked with milad on Thunder , It was really a big challenge even for milad who is professional in shell and system and unfortunately we encountered alot of problems. Now there is a question for me , for whitening why we cannot do like before in python as follows ?:

whitening process

S = dataArrays.first() -dataArrays.first().mean() S = S / sla.norm(S, axis = 0)

This won't work because S is a row-distributed RDD. RDD.first returns the first element of the RDD, in this case row 0. So you're subtracting off the mean of row 0 from row 0, and that's it. Everything else is unchanged.

Use the thunder library and the RowMatrix data structure. I'll help with that.

matrix multiplication : Actually It's different from what we imagined before : before we thought that we can broadcast our vector and also sending each road to different clients and do the dot product there. but the problem is our vector length is equal to length of matrix columns. I mean for example in our test file we have a matrix of test with (100,5) dimensions and our u_old dimensions are (1,100) thereby we're dealing with columns! so we should send the columns to different clients , but in spark RDDs only can be sent by columns. From another point of view, we can transpose the S matrix and in this case we can send the rows to clients and handle the dot operation. But the problem is in RDDs we cannot transpose the matrix .The only solution which I thought about was using ''' indexed = dataArrays.zipWithIndex() ''' thereby we can allocate an index to each row and we can multiply each element of matrix row by corresponding element in vector then saving the corresponded dot in new matrix using the index row but still there is problem.

My answer to this is the same as the previous: use thunder, then we don't have to worry about indexing ourselves (thunder will handle it). I'll help you get that up and running.

magsol commented 8 years ago

I'm not at my computer so I can't access the pip.txt file you posted, but I do know thunder requires Python 2, so that may be one source of errors.

iPhone'd

On Dec 18, 2015, at 21:06, milad181 notifications@github.com wrote:

I had a few challenges during installing thunder package on the temporary spark server and I faced with a few error mostly related to some math dependents even after installing anaconda package.

We worked with two different version of python2 and 3 and each of which came up with unknown errors. Please find the attached logfile which shows errors we faced on python2 while installing thunder.

Any help will be highly appreciated. pip.txt

— Reply to this email directly or view it on GitHub.

magsol commented 8 years ago

I've created a new branch quinn-branch that contains functional code for reading the raw data off the filesystem and converting it to a thunder RowMatrix. Have a look and let me know if you have any questions.

In the meantime, check out the thunder documentation for installation and see if that addresses your problems.

magsol commented 8 years ago

@milad181 Yes, the issue is you're missing a fortran compiler (which SciPy needs in order to build the underlying LAPACK / ARPACK libraries it relies on). I'd personally recommend downloading Anaconda and using that instead. Check out the Dockerfile I committed in quinn-branch for the steps needed.