Closed ChetanBhasin closed 9 years ago
Hi @ChetanBhasin,
I would have serious doubts the problem is Phantom specific given we do have long winded tests for such scenarios which greatly exceed the capacity I think you are mentioning:
ContactPoint
definition.For the ContactPoint
definition, you can simply use
ContactPoints(Seq(seedHost))
.withClusterBuilder(_.withSocketOptions(new SocketOptions().setReadTimeoutMillis(500)))
.keySpace("test")
Hi @alexflav23,
Thanks for the reply. I'll try to describe the problem as much as I understand it myself. It's not about how Phantom behaves in a production cluster.
Increasing the timeout helps when we have a timeout problem. In this case, however, I believe that it's because Cassandra is running out of memory in JVM. In cases when the size of the data being querried from Cassandra (in all the rows which match the query) is big (say half the size of RAM on the node) will make cassandra handle a lot of data at once.
CQL Client provided with Cassnadra distribution solves this problem by implimenting paging. The Datastax Java driver allows you to set a page size to make sure that Cassandra is not overwhelmed with one big request.
While using the Enumerator for fetching data, I thought Phantom was automatically using paging. What I'm trying to find out is if there is a way one can set a page size in Phantom without using the Java driver underneth.
I may be wrong in my assumption that paging is not implimented in Enumerators, but I thought it would be work asking since I couldn't find paging in code.
Hi @ChetanBhasin,
Phantom also uses the native paging of the Datastax Java driver underneath, our async iterators are just a nicer way to use the underlying Java Driver. To reduce paging, you should use setFetchSize()
. However, it sounds like the size of your requirements would mandate something like Spark running the show, and Datastax offer a connector for that, which distributes table reads across Spark nodes to reduce memory pressure.
Phantom is designed for application logic, and while we plan to offer phantom-spark
in the future as part of phantom-enterprise
, currently you should investigate using a mix of the two technologies to address the data challenges.
Regards.
Thanks @alexflav23,
My application is supposed to stream the data in Cassandra so I doubt if Spark is needed for this case. Which is why I was looking for the concept of paging.
Nontheless, is there a way in Phantom to directly change fetch size without working on the Java driver it is built over?
Hi @ChetanBhasin ,
Have a look here for some ideas of how we've done that in the past.
Hope some of that will help.
Regards.
Hi @ChetanBhasin,
Just to reiterate what @alexflav23 is saying, there is no way that phantom is the problem here. The code for generating the enumerator is found here. It is not complicated. We simply loop through the result set as supplied by datastax. The default fetch size for a result set is 5000. So, we never require more than 5k results in heap on the client or the server at a time.
Possible problems:
1) Buggy Cassandra holding more in memory than necessary on the server side. Unlikely. 2) You are storing massive blobs such that the default fetch size is wrong for you. More likely.
If you want to change the default fetch size, just change the config in the cluster. If your columns are too big, break them up. Don't have giant maps. Your problems point to your modelling in cassandra, not the datastax client, and not phantom.
@benjumanji Thanks for the reply.
I did figure out the problem. You're right, it's bad configuration and big maps in Cassandra which are causing this problem. I'll close this issue.
Thanks for the heplp you people. And I'm sorry for dragging it longer than it should have been in first place.
When the fetch sizes are large while using Play framework's async Iteratees, it crashes Cassandra.
I made a SO question with logs only to find this out.
There is another question that mentions this problem.
I wonder if changing fetch size on the Datastax java driver underneath could solve this problem, but haven't found a Scala way to do that easily.