scalanlp / breeze

Breeze is a numerical processing library for Scala.
www.scalanlp.org
Apache License 2.0
3.44k stars 691 forks source link

Question: What about Spark-like DataFrames? #470

Open frgomes opened 8 years ago

frgomes commented 8 years ago

As far as I understand, Breeze was designed in order to take advantage of BLAS/LAPACK and even specialized implementations for GPUs. So, chances are that my question below is simply out of context, or even nonsense. Anyway, here you go:

What about supporting Pandas-style data frames, in a similar way Saddle does? What would be the pros and cons? What would be the effort needed?

My question arises from the fact that it's actually possible to define and populate a DenseMatrix[String] as the code below shows, despite that (I admit!) it does not make any sense from the math point of view.

def csvRead(file: java.io.File,
            separator: Char=',',
            quote: Char='"',
            escape: Char='\\',
            skipLines: Int = 0): DenseMatrix[String] = {
  var mat = CSVReader.read(new java.io.FileReader(file), separator, quote, escape, skipLines)
  if(mat.length == 0) {
    DenseMatrix.zeros[String](0, 0)
  } else {
    DenseMatrix.tabulate(mat.length,mat.head.length)((i,j)=>mat(i)(j))
  }
}
dlwh commented 8 years ago

(matrices of strings make sense: there's string concat and an identity string!)

So, I would love to have something like data frames, but I don't really know how to do them in a Breeze-y way yet.

Saddles data frames aren't really very Scalastic, and definitely not very Breeze-y. They deal with heterogeneity by just erasing to Any and so they don't get any safety at all. Instead, I'd want something like a record type parameter a la shapeless.

Maybe even something like:

case class MyRecord(str: String, count: Int)

val frame: DataFrame[MyRecord] = DataFrame.loadCSV[MyRecord](path)

frame.str // Vector[String]
frame.count // Vector[Int]

The api sketched above can be done with macros fairly easily given a FromCSV[T] type class or something similar.

Of course dataframes are also for the case where you really have no static knowledge of the dataset you're loading, and so you can't really insist on knowing the entries of the matrix at compile time, but it would be nice to be able to say "this data source conforms to this struct" or "I don't know what data source this conforms to"

I think that's what I would like to see.

If you're excited about it and are looking for a challenge, I'd definitely be up for helping. otherwise, this will go on my "someday" pile.

Note that we have Counters and Counter2s, which do a lot of what you would do with Saddle's Frames, albeit not as efficiently and without as much sugar. If efficiency or ease of use becomes an issue, we can do something more like what Saddle does.

frgomes commented 8 years ago

Hi David, thanks a lot for your prompt answer.

The use case I have at the moment is something like: discover where's the data I'm interested on and create a DenseVector[Double] from it. I created implicit classes which allow the code look like below:

val currency = "USD"
val term = "30days"
// load CSV
val database : DenseMatrix[String] = csvRead(new java.io.File(path))
// discover colum for USD from first row
val posCurrency : Int = database(0, ::).t.indexOf(currency).toTreeSet.head
// discover column for 30days from second row, taking the first one after the currency
val posTerm     : Int = database(1, ::).t.indexOf(term).toTreeSet.from(posCurrency).head
// obtain specialized DenseVectors
val dates  : DenseVector[String] = database(::, 0).slice(row0, database.rows)
val prices : DenseVector[Double] = database(::, posTerm).slice(row0, database.rows).asDouble

I have a question: How could we allow NA cells? Two ideas:

  1. data frames are based on sparse data structures. Possible drawback: performance.
  2. data frames are based on dense data structures. Possible drawback: addicional bit arrays for active/inactive control (i.e.: NA).

Maybe one way of contributing would be making my implicit classes public. Maybe some more people could get interested and contribute their ideas. By the time we would be able to understand what makes sense and what does not make sense. And them the code could be refactored/merged into Breeze.

Thanks

dlwh commented 8 years ago

indexOf seems like a good method for vectors.

NAs should probably be dealt with out-of-band, which probably means a separate bitmask. You could imagine something like

case class Frame[T](rows: Index[String], columns: Index[String], data: Matrix[T], nas: BitVector) 

or something similar

frgomes commented 8 years ago

Thanks for you pointers on case class Frame. I will study the classes involved and will continue later on this week.

Meanwhile, in case you are curious of indexOf and other things, all I did is below:


def csvRead(file: java.io.File,
            separator: Char=',',
            quote: Char='"',
            escape: Char='\\',
            skipLines: Int = 0): DenseMatrix[String] = {
  var mat = CSVReader.read(new java.io.FileReader(file), separator, quote, escape, skipLines)
  if(mat.length == 0) {
    DenseMatrix.zeros[String](0, 0)
  } else {
    DenseMatrix.tabulate(mat.length, mat.head.length)((i,j) => mat(i)(j))
  }
}

import scala.collection.immutable.TreeSet

implicit class RichSeqInt(seq: Seq[Int]) {
  def toTreeSet: TreeSet[Int] =
    (TreeSet.empty[Int] /: seq) {
      (acc, item) =>
        acc + item
    }
}

implicit class RichVector(v: DenseVector[String]) {
  def indexOf(value: String): Seq[Int] = 
    v.mapPairs({case (k,v) => if(v == value) Some(k) else None}).toArray.flatten
  def asDouble : DenseVector[Double] =
    for((index, data) <- v.pairs if(data.isInstanceOf[Double])) yield({ data.toDouble })
  def asInt : DenseVector[Int] =
    for((index, data) <- v.pairs if(data.isInstanceOf[Int])) yield({ data.toInt })
  def asFloat : DenseVector[Float] =
    for((index, data) <- v.pairs if(data.isInstanceOf[Float])) yield({ data.toFloat })
  def asLong : DenseVector[Long] =
    for((index, data) <- v.pairs if(data.isInstanceOf[Long])) yield({ data.toLong })
  //import scala.{specialized=>spec}
  //def as[@spec(Double, Int, Float, Long) V](implicit man: ClassTag[V]): DenseVector[V] =
  //  (for((index, data) <- v.pairs if(data.isInstanceOf[V])) yield({ man.unapply(data) }))
}

Thanks

darrenjw commented 8 years ago

In case you are interested, I recently wrote a post on data frames in Scala:

https://darrenjw.wordpress.com/2015/08/21/data-frames-and-tables-in-scala/

The bottom line is that there are multiple existing implementations, but none are perfect. That said, "framian" is quite promising as a starting point, and better integration between framian and breeze is something that I would be very interested in.

dlwh commented 8 years ago

thanks! Framian is definitely the most interesting. It's interesting they decided to go with untyped values. I agree it's important to support that functionality (perhaps even as a primary case), but it also seems desirable to say "this frame has these column types with these rows"

On Mon, Nov 30, 2015 at 5:55 AM, Darren Wilkinson notifications@github.com wrote:

In case you are interested, I recently wrote a post on data frames in Scala:

https://darrenjw.wordpress.com/2015/08/21/data-frames-and-tables-in-scala/

The bottom line is that there are multiple existing implementations, but none are perfect. That said, "framian" is quite promising as a starting point, and better integration between framian and breeze is something that I would be very interested in.

— Reply to this email directly or view it on GitHub https://github.com/scalanlp/breeze/issues/470#issuecomment-160635389.

dlwh commented 8 years ago

also, what's the point of hlist integration if you're going to erase the types once they're in the frames?

On Mon, Nov 30, 2015 at 9:52 AM, David Hall david.lw.hall@gmail.com wrote:

thanks! Framian is definitely the most interesting. It's interesting they decided to go with untyped values. I agree it's important to support that functionality (perhaps even as a primary case), but it also seems desirable to say "this frame has these column types with these rows"

On Mon, Nov 30, 2015 at 5:55 AM, Darren Wilkinson < notifications@github.com> wrote:

In case you are interested, I recently wrote a post on data frames in Scala:

https://darrenjw.wordpress.com/2015/08/21/data-frames-and-tables-in-scala/

The bottom line is that there are multiple existing implementations, but none are perfect. That said, "framian" is quite promising as a starting point, and better integration between framian and breeze is something that I would be very interested in.

— Reply to this email directly or view it on GitHub https://github.com/scalanlp/breeze/issues/470#issuecomment-160635389.

frgomes commented 8 years ago

Hello Darren,

Thanks a lot for your message. Yeah... I've been visiting your articles last weekend and they helped me a lot.

I will spend more time watching the presentation about Framian. I've just partially watched it, in a hurry. I had a very busy day :-(

From your description (and from the very little I watched and read about Framian), seems to be the way to go from the perspective of completeness and closeness to Pandas. But it's design would hardly fit to the way Breeze data structures work.

I think the first step would be a proper planning, proper design and specification of functionalities and algorithms involved, before writing a single line of code. Since data frames will be added to Breeze lately instead of in advance, maybe spending more time designing would translate in less mistakes/restrictions during implementation.

Maybe we could study snippets of code written in Pandas, Framian, Saddle or eventually others and list features first? Eventually we could "draw" how the API should look like and integrate with existing data structures?

Thougths?

Thanks a lot, -- Richard

dlwh commented 8 years ago

Yeah, my thought process when working on breeze is usually to decide what I want client code to look like then go from there. I do not use dataframes enough to know what it "should" look like in Scala/Breeze. My impulse is what i sketched earlier, possibly with scala.Dynamic support for when you don't know what the column names are.

-- David

On Mon, Nov 30, 2015 at 4:43 PM, Richard Gomes notifications@github.com wrote:

Hello Darren,

Thanks a lot for your message. Yeah... I've been visiting your articles last weekend and they helped me a lot.

I will spend more time watching the presentation about Framian. I've just partially watched it, in a hurry. I had a very busy day :-(

From your description (and from the very little I watched and read about Framian), seems to be the way to go from the perspective of completeness and closeness to Pandas. But it's design would hardly fit to the way Breeze data structures work.

I think the first step would be a proper planning, proper design and specification of functionalities and algorithms involved, before writing a single line of code. Since data frames will be added to Breeze lately instead of in advance, maybe spending more time designing would translate in less mistakes/restrictions during implementation.

Maybe we could study snippets of code written in Pandas, Framian, Saddle or eventually others and list features first? Eventually we could "draw" how the API should look like and integrate with existing data structures?

Thougths?

Thanks a lot, -- Richard

— Reply to this email directly or view it on GitHub https://github.com/scalanlp/breeze/issues/470#issuecomment-160810187.

frgomes commented 8 years ago

Some random ideas here, for lack of having a better place to discuss:

Reading CSV files...

  1. without any special instructions: results on a Frame[(String*)] (conceptually)
  2. passing Record (i.e.: a case class): results on a Frame[Record] (conceptually)

When I say that returns something "conceptually" I mean that you could use an extractor resembling the type outlined. Internally, columns would be a dynamic data structure.

Flexibility The basic idea is that a Frame can be extended, i.e.: more columns and/or rows can be added, removed, moved and reordered as needed.

Performance Another requirement would be backwards "compatibility" with Tensors. It would be desirable to obtain Tensors from Frames. When I say "compatibility" I mean that all existing benefits from perspective of performance must necessarily be kept. From this perspective, it would not possibly make sense to extract a DenseVector[String] from a Frame and send it to a GPU. Columns sent to a GPU must be AnyVal and Indexes must be Ordered[T].

Convenience Frames are made of column headers, row headers and data. If you transpose a Frame, the resulting data structure will swap column headers with row headers. A header may have several levels, so that you can dinamycally interrogate which column number (i.e.: an entry on a primary Index) corresponds to Seq("Sales","Americas","Canada","winter")

Strategy I particularly wouldn't like to use of scala.Dynamic. At least during inception and the first iteration. Why? Because we would easily "distracted" by the number of tricks that can be made, by the variety of tricks that can be made... and we would lose the focus of a clean API, which is strongly typed and which even could be employed by Java applications.

If in future people decide that scala.Dynamic is a desirable, I suppose the decision will come from experience collected during the first iteration. And I suppose that enough care will be taken in order to produce a clean API.

Inception, examples, use-cases https://github.com/frgomes/breeze/tree/frames-inception/frames

Thoughts?

dlwh commented 8 years ago

On Wed, Dec 2, 2015 at 2:59 PM, Richard Gomes notifications@github.com wrote:

Some random ideas here, for lack of having a better place to discuss:

Reading CSV files...

  1. without any special instructions: results on a Frame(String*)
  2. passing Record (i.e.: a case class): results on a FrameRecord

When I say that returns something "conceptually" I mean that you could use an extractor resembling the type outlined. Internally, columns would be a dynamic data structure.

This seems right.

Flexibility The basic idea is that a Frame can be extended, i.e.: more columns and/or rows can be added, removed, moved and reordered as needed.

probably this shouldn't be in-place, but otherwise I agree.

Performance Another requirement would be backwards "compatibility" with Tensors. It would be desirable to obtain Tensors from Frames. When I say "compatibility" I mean that all existing benefits from perspective of performance must necessarily be kept. From this perspective, it would not possibly make sense to extract a DenseVector[String] from a Frame and send it to a GPU. Columns sent to a GPU must be AnyVal and Indexes must be Ordered[T].

Not sure why Ordered[T] is useful for index. My inclination would be for a Frame to be a wrapper around a Matrix[LUB[Types]] with a breeze.util.Index[RowIndex] and breeze.util.Index[ColIndex] as well. Maybe also an N/A bitmask.

Convenience Frames are made of column headers, row headers and data. If you transpose a Frame, the resulting data structure will swap column headers with row headers. A header may have several levels, so that you can dinamycally interrogate which column number (i.e.: an entry on a primary Index) corresponds to Seq("Sales","Americas","Canada","winter")

What does a header having several levels mean?

Strategy I particularly wouldn't like to use of scala.Dynamic. At least during inception and the first iteration. Why? Because we would easily "distracted" by the number of tricks that can be made, by the variety of tricks that can be made... and we would lose the focus of a clean API, which is strongly typed and which even could be employed by Java applications.

strong-typing + Java-compatibility is going to be much too onerous or lead to a terrible (i.e. Java-ish) API, I am positive of that. Almost nothing in Breeze is directly usable from Java, and I think nearly all of Breeze's API would just not be possible if Java compatibility were a concern.

If in future people decide that scala.Dynamic is a desirable, I suppose the decision will come from experience collected during the first iteration. And I suppose that enough care will be taken in order to produce a clean API.

Dynamic is just for the Frame[Record] use case, which will allow you to say frame.fieldName and get back a Vector of that column's type. It can be enforced at compile time with macros. (That is, there's nothing "dynamic" about it. Dynamic is just there to make macro magic work.)

Thoughts?

— Reply to this email directly or view it on GitHub https://github.com/scalanlp/breeze/issues/470#issuecomment-161460772.

frgomes commented 8 years ago

Headers

What does a header having several levels mean?

For example, you can have a CSV file which contains: row 1: "Sales",,,,,,,, row 2: "Americas",,, "Europe",,, row 3: "USA",, "Canada", "UK",, "France" row 4: "winter","summer","winter","summer","winter","summer","winter","summer",

Then you would like to find the value for the primary Index[Int] relative to Sales/Americas/Canada/summer. The code would look like more or less like this:

val col : Int = frame.header.indexOf("Sales", "Americas", "Canada", "summer").head

Why .head in the end? Because there's no guarantee that the CSV file is well structured. There's chance that we may eventually find several columns matching the query, a situation that needs to be handled by the user's code.

Ordered[T] If I'm not mistaken, Tensor[K,V] allows K and V to be anything but, in practice, a DenseVector[V] implies on Tensor[Int, V] whilst a DenseMatrix[V] implies on Tensor[(Int, Int), V].

From this point of view, at the moment, indexes are already Ordered[K] implicitly because K is Int necessarily. It could be Longin order to better support Big Data... but this is another subject... just thinking loud...

From the perspective of a Frame (and from the users's point of view!), it may be useful to allow Tensor[LocalDate, Double], for example. If we would like to keep the current implementation, current invariants and current performance of Breeze, we would have to guarantee that Indexes are necessarily Ordered[K].

I'm still uncertain about how Tensor[LocalDate, Double] would be handled in a GPU. Yeah!... I've just scratched the surface and there's lots of stuff to be understood before I start to think about a proposal for an implementation.

Inception, examples, use cases As I said before, I think that the first step would be exploring the API from the user's perspective and convenience. This work barely started at: https://github.com/frgomes/breeze/tree/frames-inception/frames

Thanks

dlwh commented 8 years ago

On Thu, Dec 3, 2015 at 2:39 AM, Richard Gomes notifications@github.com wrote:

Headers

What does a header having several levels mean?

For example, you can have a CSV file which contains: row 1: "Sales",,,,,,,, row 2: "Americas",,, "Europe",,, row 3: "USA",, "Canada", "UK",, "France" row 4: "winter","summer","winter","summer","winter","summer","winter","summer",

Then you would like to find the value for the primary Index[Int] relative to Sales/Americas/Canada/summer. The code would look like more or less like this:

val col : Int = frame.header.indexOf("Sales", "Americas", "Canada", "summer").head

Why .head in the end? Because there's no guarantee that the CSV file is well structured. There's chance that we may eventually find several columns matching the query, a situation that needs to be handled by the user's code.

I see. how does the system know the implied alignment? Just convention that empty columns "copy" the previous column?

Ordered[T] If I'm not mistaken, Tensor[K,V] allows K and V to be anything but, in practice, a DenseVector[V] implies on Tensor[Int, V] whilst a DenseMatrix[V] implies on Tensor[(Int, Int), V].

From this point of view, at the moment, indexes are already Ordered[K] implicitly because K is Int necessarily. It could be Longin order to better support Big Data... but this is another subject... just thinking loud...

Everyone forgets about Counters!

From the perspective of a Frame (and from the users's point of view!), it may be useful to allow Tensor[LocalDate, Double], for example. If we would like to keep the current implementation, current invariants and current performance of Breeze, we would have to guarantee that Indexes are necessarily Ordered[K].

You don't really. You just need to maintain a consistent ordering for that particular object: a breeze.util.Index should be sufficient.

I'm still uncertain about how Tensor[LocalDate, Double] would be handled in a GPU. Yeah!... I've just scratched the surface and there's lots of stuff to be understood before I start to think about a proposal for an implementation.

Inception, examples, use cases As I said before, I think that the first step would be exploring the API from the user's perspective and convenience. This work barely started at: https://github.com/frgomes/breeze/tree/frames-inception/frames

Thanks

— Reply to this email directly or view it on GitHub https://github.com/scalanlp/breeze/issues/470#issuecomment-161593042.

frgomes commented 8 years ago

@dlwh :: If I understood your question, you are asking how the CSV reader, or maybe the Frame, knows that "Sales,,,,,,," means actually "Sales" 8 times? Well, the CSV reader doesn't. From the Frame perspective, it does not need to worry about filling the blanks either because when you pass a certain path ["Sales", "Europe", "France", "winter"], it may return any number of matches and it's responsibility of the caller code to determine which piece(s) of the result are useful or needed. This is an example of a search on a header made of 2 layers, but still doing it by hand, in the application layer, not inside still incipient class Frame.Header:

// discover column for USD from first row
val posCurrency : Int = database(0, ::).t.indexOf(currency).toTreeSet.head
// discover column for 30days from second row, taking the first one after the currency
val posTerm     : Int = database(1, ::).t.indexOf(term).toTreeSet.from(posCurrency).head
frgomes commented 8 years ago

I found that Spark has a Dataframe API inspired by R and Pandas. Spark 1.6.0 is going to provide a Dataset API as well, which is conceptually the Dataframes API plus strong types plus a bit of convoluted syntax.

I hope this information happens to be useful for people landing on this thread.

EvanOman commented 8 years ago

Related to what @frgomes said, would there be any way to port the Spark DataFrame/DataSet capabilities into Breeze (without all of the RDD backend stuff)? I am guessing this is a very naive suggestion, but I was just wondering if anyone has looked into it.

DonBeo commented 8 years ago

Personally I have used DF quiet extensively both in R and python. I think these are the main features: 1) Easy read and write to csv or other file system. 2) call elements from row-column name : DF[row_name, col_name] 3) Easily add or remove lines. For example add a new column or remove rows with missing values 4) Quick statistics. For example mean and variance of each column. 5) Groupby+statistics. For example given 2 columns Age and Nationality what is the mean Age for each Nationality 6) Reshape the DF from wide to long format and viceversa link

Pandas has much more complex operations as pivot tables and more. Personally I have never used them. I would be happy to contribute on this topic but I do not know if my scala knowledge is good enough.

lJoublanc commented 7 years ago

Hi. I'm also a regular user of pandas, and I thought I would add some information about the primitives that are used there, and how these could be transliterated to scala. That's not necessarily the correct approach, but I just want do shed some light on how a 1:1 implementation might look.

val idx : Index[java.time.LocalDate] = Seq("2017-01-02", "2017-01-03", "2017-01-09") map (LocalDate.parse)
val df : DataFrame[String] = DataFrame(
  idx,
  "price" -> Seq(230.4, 231.2, 230.0),
  "sales" -> Seq(26, 30, 2) 
) // note the shared index. Data must be of same dimension.

val df2 : DataFrame[String] = DataFrame(
  "price" -> Series[LocalDate,Double](LocalDate.parse("2017-01-02") -> 230.4, LocalDate.parse("2017-01-03") -> 230.0)
  "sales" -> Series[LocalDate,Int](LocalDate.parse("2017-01-02") -> 26, LocalDate.parse("2017-01-30") -> 30)
)

// The above automatically aligns indices, filling n/a values.
//+----------+--------+--------+
//|          |price   |sales   |
//+----------+--------+--------+
//|2017-01-02|230.4   |26      |
//|2017-01-03|n/a     |2       |
//|2017-01-09|230.0   |n/a     |
//+----------+--------+--------+

df2("price") // df.apply(column : C) : Series[I,_]
//+----------+--------+
//|          |price   |
//+----------+--------+
//|2017-01-02|230.4   |
//|2017-01-03|n/a     |
//|2017-01-09|230.0   |
//+----------+--------+

df2("price")("2017-01-09") // return 230.0 : Double. with an implicit conversion String => LocalDate

df2("price")("2017-01-03") // return n/a : Double.

df2("price")("2017-01-01") // throw index out of bounds exception.

df2(LocalDate.parse("2017-01-01") :: LocalDate.parse("2017-01-05")) // df.apply(r : IndexRange) : df.type
//+----------+--------+--------+
//|          |price   |sales   |
//+----------+--------+--------+
//|2017-01-02|230.4   |26      |
//|2017-01-03|n/a     |2       |
//+----------+--------+--------+

In a dataframe the apply or indexing operator is first by column, then by row; this is contrary to matrix indexing. In scala it would make sense to have these curried, the reason being that when dealing with multi indices, you may have arity > 1 e.g. series(team = "TeamA", driver = 1)("LapTime"). There is also an overload for accessing ranges (the last example). This is the exception - indexing in this case is first by row (range) and then by column. Confusing, I know! Again, this is quite counter-intuitive if you come from an engineering/maths background and work with matrices. However, this is the convention in Pandas, and I think in R as well.

In terms of implementation, they key is that data should be stored column wise, allowing fast array operations, for e.g. adding values of one Series (or df columns) to another. UFuncs/algebraic operator overloads in this case have the extra job of aligning indices, and filling n/a values where these are missing, before performing calculations.

I think as you've mentioned in the posts above, one of the key problems becomes tracking the column type, when it is heterogeneous. i.e. you can have a DataFrame[String,[Double,Int,_ <: Enum]] (not real scala syntax) if you have three columns of double,int and enum.

frgomes commented 6 years ago

@EvanOman @dlwh :: Two years later, I've written some code which mimics part of the Dataset API provided by Spark. One important aspect of my implementation is that it is Stream oriented, since we need our "DataFrame-like" things to be composable in a pipeline.

However, reading a CSV and producing a Breeze matrix is far from anything we could label as DataFrame.

Just to let you guys know and sharing my experiences.

frgomes commented 6 years ago

Related to #441, even though the scope here is far more audacious.