Open JGailor opened 8 years ago
Cool, it would be interesting to see if this can be sorted out.
How about this? (let's put the recursion-can-cause-a-stackoverflow issue aside)
def read(recordReader: RecordReader[Record], records: LinkedList[Record], lastId: Int) : LinkedList[Record] = {
val batch = recordReader.readBatch()
val id = getLastId(batch)
val recordsSoFar = records ++ batch
if(isMore(lastId)) read(recordReader, recordsSoFar, id)
else recordsSoFar
}
read(recordReader, LinkedList(), 0)
you could even put @tailrec on that one
Thanks @staslev for answering and providing some good ideas; the thing here is that we really don't want to memoize the records we've already seen (a Stream is a good choice if you do). In this case, the table I'm reading from has 20+ million records and is ~ 15GB, so you want to be able to process a batch and then release it to be GC'd.
The general pattern here is read a batch of records, perform a map over the records to transform them, run another function over them that is side-effecting (writing to another database or table, perhaps), and then read another batch out of the database. The tail-recursive solution seems like it's on the right track if I can pass in a function to process the records, then let the tail-rec overwrite the stack frame and release the references to the current batch of records (somewhat similar to Python or Ruby's yield).
Thoughts?
@JGailor thanks for the additional info and context. I was wondering if you had any reasons to not just remove any references to the old batch and let GC do its thing (even without using Streams).
How about something like this: (which is basically what you said but in scala-ish)
@tailrec
def doETL[U](recordReader: RecordReader[Record], transform: Record => U, lastId: Int) : Int = {
val batch = recordReader.readBatch()
val id = getLastId(batch)
batch.map(transform).foreach(writeToDb)
// or perhaps writeToDb(batch.map(transform)) if your side-effect prefers batches
if(isMore(lastId)) doETL(recordReader, transform, id)
else id
}
doETL(recordReader, aMapFunction, 0)
Given that the recordReader
does not keep references to records (or batches) it has read, we don't either, so it would seem that each batch can die peacefully once we're done with it (when the GC monster comes for it).
Also:
doETL
s working on the same data source (or recordReader). Though this may depend on various additional factors, like the implementation of the record reader for instance, and how thread safe it is. So... just thinking out loud.P.S. @ddispaltro thanks :) @tailrec to the rescue.
I would change the if else, for a 'match-case'
Lets see, I have a similar question here, I pretty agree to use val instead of var anytime, but just can't find a solution here myself
I have a table, which is a list of post, and another table is category, and they had many to many relationship by category_to_post (similar), then I just want to get a list of post with category name, and then I will group them by category name.
As I know, when I used JOIN between post and category table, I can get a list of post with category name, after that I would like it to be [String, Seq[Post]], not Seq[(Title: string, .... categoryName: String)] which is the result from JOIN directly. For example:
// when we use SQL to join
SELECT post.*, category.name AS category_name ....
In slick, I will can get the type of Seq[(Post, String)] as result, and it's a data table, let's see 100, and in each row, it will have a category name, now I want to transform it to: Seq[String, Seq[Post]], and it will have a sequence of result, for each result, it will have a category name (identical from others), and a group of Post along with that name..
I did it this way:
def groupBy[T, B](list: Seq[(T, B)]): Seq[(B, Seq[T])] = {
// don't use var, but its internal usage
// might be fine here
// this method will be quick...
// it doesn't query db twice
var category = Set[B]()
val ret = for {
t <- list
} yield {
!category.contains(t._2) match {
case true => {
category += t._2
//println(category)
(t._2, for {n <- list.filter(_._2 == t._2)} yield (n._1))
}
case _ => null
}
}
ret.filter(_ != null)
}
I am thinking to remove the var as well, just didn't get any good idea to do so, so can we use val all the time to avoid to update it in loop?
@hellomaya, I might be missing something here, but won't this give you what you want?
def groupBy[TPost, TCategory](postWithCategoryTuples: Seq[(TPost, TCategory)]): Map[TCategory, Seq[TPost]] = {
postWithCategoryTuples
.groupBy({ case (post, category) => category })
.mapValues(_.map({ case (post, category) => post }))
}
@staslev brilliant!! I will try testing it, I learned Play and Slick as well Scala just week ago, so I really don't know if this work or not, even don't understand how it works now, but I will read it and then test it!! Thanks.
@staslev working pretty well, thank you!! Learned a lot from it, it's a very good help for me to know how Scala worked
@staslev
I got another case, which I will have to update var in a loop, the case also from the use of Scala Slick library, the question is same as above, there is a category, in each category, there are many post in it, the category and the post is in different table, they are many to many relationship.
I want to get 10 category, and then for each of them, get 5 fixed size post in it. Usually I will do it in SQL..
// get 10 category
SELECT * FROM category LIMIT 10
Then I will loop the result from this SQL, to query the db to get 5 post for each, that will initialize 10 db query for it. I want to make the SQL more efficient, so as I already had groupBy method, so I only need to query db one time by union all, like this
SELECT post.* FROM post INNER JOIN category_post ON post.post_id = category_post.post_id WHEN category_post.category_id = 1 LIMIT 5
UNION ALL
SELECT post.* FROM post INNER JOIN category_post ON post.post_id = category_post.post_id WHEN category_post.category_id = 2 LIMIT ..
...
Then I will use groupBy to get the result sort by category name or something else.
But in Slick, to have UNION ALL, it will required this way,
val unionQuery = list ++ list
Then I will need to do 10 time ++, so better use loop
var list ++= list
You can see, it will have to use var, and update it in loop, I think to avoid to use var, and still working on it.
@hellomaya
You can do that with:
val unionQuery = {
listToAddCollection.foldLeft(List())(
(unionQuery, queryToAdd) =>
unionQuery ++ queryToAdd
)
}
@xavier-fernandez
Thank you, thats really quick!! I will read and test it, and learn how it works, then let you know.
@hellomaya If you're going to use Scala, you should really take some time to brush up on FP. Functional Programming in Scala is a good read.
@jawshooah thanks for your advice, and I definitely need to read it..so many questions to use Scala, and most of my code is just imperative not functional..
@xavier-fernandez, I have got the solution based on your advice, and thank you! It more better to use collection(mutable), add all Query in it, then use foldLeft.
@hellomaya you can do that with the result of the yield. Most of the times you can avoid using mutable collections.
I appreciate a lot that comments. ;)
@xavier-fernandez thanks for your advice, I will keep trying it and find how to avoid mutable collections as well. My solution changed to this according to your advice, and looks much better, it will query post in different topic category:
val st = new scala.collection.mutable.Stack[Query[(Post, Rep[String], Rep[Int]), (PostEntity, String, Int), scala.Seq]]
val result = for {
item <- topicResult
} yield {
item.map { c =>
item.size - item.indexOf(c) == 1 match {
case true => st.push(postQuery.filter(_._3 === c.id).take(postCount * 2))
case _ => st.push(postQuery.filter(_._3 === c.id).take(postCount))
}
//println(item.indexOf(c))
}
val k = st.pop
val unionQuery = st.foldRight(k)((a, b) => a ++ b)
val q = unionQuery.sortBy(_._3.asc).result
//println(unionQuery.drop(page).take(count).result)
//println(q.statements.head)
db.run(q)
}
result
@hellomaya an immutable List can also be used as a stack if you want a manually managed one, in combination with something like foldLeft
or maybe a @tailrec
function. Try and transform your sample into something like that.
Some notes:
result
and the name itself means absolutely nothing and adds no information; just return the result of that for expression directlytopicResult
; is it a sequence? I sure hope you're not executing one SQL query for each element in it and I don't understand the unionQuery
_._3
, while useful sometimes, is unreadableIf you're trying to iterate over all items by doing SQL pagination, as an exercise, try doing the following:
type FetchNext[+T] = () => Future[Result[T]]
case class Result[+T](value: Seq[T], nextFn: FetchNext[T])
def iterateAll(perPage: Int): Future[Result[Post]] = {
def query(offset: Int, count: Int): Future[Seq[Post]] = {
???
}
def loop(offset: Int, count: Int): Future[FetchNext[T]] =
query(offset, count).map { list =>
Result(list, () => loop(offset+count, count))
}
loop(offset=0, count=perPage)
}
def processAll(f: Post => Unit): Future[Unit] = {
def loop(step: FetchNext[Result[Post]]): Future[Unit] =
step().flatMap { case Result(list, nextFn) =>
if (list.nonEmpty) {
for (item <- list) f(item)
loop(nextFn) // next batch please
} else {
Future.successful(())
}
}
loop(() => iterateAll(perPage = 20))
}
@alexandru
Thanks for your advice and really learned from it too, and let me explain about what in case it is doing by UNION SQL for your comment about
I don't understand the type of topicResult; is it a sequence? I sure hope you're not executing one SQL query for each element in it and I don't understand the unionQuery
topicResult
is actually aFuture[Seq[TopicEntity]]
, and theTopicEntity
is``case class TopicEntity(name: String, id: Int)
, and more details:- In this case, one Topic could have many Post in it, so Topic is same as Category, they are both DB table.
- I want to get a list of Topic, and then for each of them, get 5 posts in it
- The first idea is just simple, use SQL to get a list of Topic, will return topicResult (It's a Future[Seq[...]] from db.run in Slick), and then for each item in the list, run SQL to get 5 posts by it. But in this idea, if it had 10 item in the list of topic, it will query database server 10 times to get posts as well. It kind waste, so I want to use less SQL query against database server. Then I got this idea
- To use SQL to get a list of Topic, and then for each of them, use UNION ALL, to get all posts for the list, its like
# this is the first 5 post for topic_id 1
SELECT * FROM
post INNER JOIN topic_item
ON post.post_id=topic_item.post_id
WHERE topic_item.topic_id=1 LIMIT 5
UNION ALL
# this is the first 5 post for topic_id 2
SELECT * FROM
post INNER JOIN topic_item
ON post.post_id=topic_item.post_id
WHERE topic_item.topic_id=2 LIMIT 5
...
# until topic_id 10
So you can see that the topicResult
is actually a Future[Seq[TopicEntity]]
, and the TopicEntity
is case class TopicEntity(name: String, id: Int)
.
After I got the result from the UNION SQL, I can use groupBy to make a Map to get something like Map[Seq[(topic name, Seq[PostEntity])]]
, so I can show them as a list of topic at page, and in each topic, it had a list of 5 post in it. By using @staslev advice, I learned how to do this groupBy in functional style too.
About the point 1 and 3, you have pointed out:
you're not using result and the name itself means absolutely nothing and adds no information; just return the result of that for expression directly
doing things like _._3, while useful sometimes, is unreadable
I agree with your point, and I am trying to make it better according to that.
And about the example you showed me, and this
an immutable List can also be used as a stack if you want a manually managed one, in combination with something like foldLeft or maybe a @tailrec function. Try and transform your sample into something like that.
I am still learning it, and see how it works, will let you know what I have learned from it.
@xavier-fernandez
Thanks for your advice, and I will try make more better according to your advice!!
Specifically, "2.3. SHOULD NOT update a var using loops or conditions"
I think it would be good to create some examples of when this should be done other than the toy examples provided. A good example of this is a problem I'm wrestling with now, where I want to read the records from a relational database table in batches, and on each iteration I need to look at the last id returned from the previous result and check to see if there are any records and I should continue to iterate. I've asked this question all over the place and haven't gotten a suggestion that fixes the problem and is performant.
I was able to avoid the vars using Stream the memory utilization is just awful on large tables, but I'd like to see some guidance in a best practices document for dealing with these sort of real-world situations, or some reinforcement that this is a case where you need to observe the "SHOULD" of the "SHOULD NOT".