Open mmarovic opened 3 years ago
I did some more testing and searching. It looks like the CqlInputFormat issues are going to be fixed in cassandra 4.0 (see https://issues.apache.org/jira/browse/CASSANDRA-15637). Some other issues were fixed there as well, so upgrading to newer cassandra libs could also be the way to go.
From what I've seen in the code, the fixes (at least for the issues from my previous post) are similar to what my attempts were. However, I noticed that they always unwrap the wrapping token range. This might cause issues when using ScyllaDB because of the difference in its system.size_estimates
data. I haven't tested this to be sure.
Since the release date for cassandra 4.0 seems to still be unknown, I made a quick-and-dirty commit that shows what worked for me as a fix, just in case anyone needs this: https://github.com/mmarovic/janusgraph/commit/8cad2d3be123aff922d3cbc10d6f2de253f6915a
Summary:
When reading Janusgraph data into Apache Spark,
org.janusgraph.hadoop.formats.cql.CqlInputFormat
andorg.janusgraph.hadoop.formats.cql.CqlBinaryInputFormat
do not properly manage partition sizes when loading data from Cassandra. This causes memory issues when the entire token range cannot fit into the Spark executor memory.cassandra.input.split.size
andcassandra.input.split.size_mb
, so e.g.cassandra.input.split.size=1
should produce as many input partitions as there are in the Cassandra token rangesstorage.hostname
) are sub-split; the partitions with data from other nodes are not sub-split and they correspond to entire token ranges from these nodesLong description
Introduction
I recently implemented a few Spark jobs to better manage some growing graphs. Initial tests on a smaller environment showed that everything works ok, but when I ran the jobs in pre-production on a much larger graph, various memory issues started to appear. I tried to analyze what causes this and the results and my conclusions are as follows.
According to https://docs.janusgraph.org/advanced-topics/hadoop/, users should use
org.janusgraph.hadoop.formats.cql.CqlInputFormat
as thegraphReader
class to read data from Cassandra into Spark. This class internally usesorg.apache.cassandra.hadoop.cql3.CqlInputFormat
. Analyzing its code shows that Spark partitions initially correspond to Cassandra token ranges. However, we should be able to further split these up into smaller parts using one of the two parameters:cassandra.input.split.size
- defines the maximum number of Cassandra partitions to be loaded into a single Spark partition (default 64 * 1024)cassandra.input.split.size_mb
- defines the maximum size in MB to be loaded into a Spark partitionTo calculate these smaller ranges, the input format uses the Cassandra system table
system.size_estimates
. This table contains the average partition size and the number of partitions in each token range. So for each token range the input format fetches the corresponding row from that table, calculates new splits, and uses these to further split the initial ranges. If no data is found, the input format cannot estimate anything, so it returns the original split - the entire Cassandra token range.Issue 1 -
LocalStrategy
replicationThe
system
keyspace is configured to useLocalStrategy
replication, so each node only contains data about its own token ranges. When the input format queriessystem.size_estimates
, it does so by submitting a CQL query to a Cassandra session, so the query is sent to a single node (chosen in round-robin fashion) to be executed. However, the node that receives a query about a token range might not be the one that actually owns that token range, so in that case the query returns nothing. As a result, no further splits happen for that token range.Also, during initial setup,
org.janusgraph.hadoop.formats.cql.CqlBinaryInputFormat
sets only a single node address tocassandra.input.thrift.address
. This node is then used to query thesystem.size_estimates
table and each query is sent to that node. As a result, all token ranges from that node will be sub-split normally, but token ranges from other nodes won't be sub-split at all. At least, this was my experience while testing.Issue 2 - wrapping token ranges
In a multi-node Cassandra cluster, each node has a number of token ranges equal to its
num_tokens
configuration parameter, which defaults to 256. Token ranges in Cassandra are treated as part of a ring containing values from -2^63 to 2^63 - 1. One token range always "wraps around" - it contains both the smallest and the largest tokens. For some reason, insystem.size_estimates
that token range is split up at 2^63 - 1 into two rows. For example, the token range(9086698877042261913, -9100093929734381511)
is split up into(9086698877042261913, -9223372036854775808)
and(-9223372036854775808, -9100093929734381511)
. This is also visible in the number of rows in that table - on each node the table should containnum_tokens
rows, which is true for all but one node which contains an extra row.The query used to fetch data from
system.size_estimates
is:Running this query for the example range
(9086698877042261913, -9100093929734381511)
will never return any results because no such rows exist in the table - either<range-start>
or<range-end>
will differ because of the additional split in the table. Therefore, the estimates for that token range will never be fetched and it will never be sub-split.In addition, lately I've been doing some testing on ScyllaDB and it seems that this is not the case there. The number of rows in ScyllaDB
system.size_estimates
seems to always be equal tonum_tokens
and I see the correct entry for the wrapping token range on the owner node. If true, this could cause compatibility issues between the two backends, or the fix would have to "know" when to look for this additional split and when not to.Final notes
The problems I listed here are actually caused by the original Cassandra Hadoop input format implementation,
org.apache.cassandra.hadoop.cql3.CqlInputFormat
. I initially checked this on their forums (https://community.datastax.com/questions/10153/how-to-control-partition-size-when-reading-data-wi.html), but the answer highly suggested to port to thespark-cassandra-connector
because this is legacy code (maybe it'll never be fixed here).I managed to fix the issues in my Spark jobs by reimplementing the Cassandra Hadoop input format and then switching the Janusgraph input formats to that. I'll provide a PR here later just in case anyone's interested. However the "right" solution should be to migrate to the
spark-cassandra-connector
.