spotify / hdfs2cass

Hadoop mapreduce job to bulk load data into Cassandra
Apache License 2.0
75 stars 21 forks source link

add CQL collection support #7

Closed mattnworb closed 9 years ago

mattnworb commented 9 years ago

This change adds support for CQL collections (Sets, Lists, Maps) for jobs that return CQLRecords.

The support is implemented by re-using the TypeSerializer implementations from within Cassandra itself.

In order to serialize a collection, the serializer for the element type needs to be known as well - it is a bit hacky but the current solution involves peeking at the first element in the collection to get it's runtime type, and retrieving a TypeSerializer from a Map within CassandraRecordUtils.

In implementing this I noticed that the serialization of Collections has changed between V2 and V3 of the CQL protocol. V3 is only supported by Cassandra v2.1 so far, while V2 is understood by v2.0 (Cassandra v2.1 should support protocol V2 also).

Since this implementation reuses the TypeSerializers from within cassandra-all:2.0.11:jar, it is implicitly using V2 of the protocol format. This should be fine as long as the target Cassandra server is using v2.0. I have not tested using these serializers when exporting to Cassandra v1.x or v2.1+.

rzvoncek commented 9 years ago

Hey. I found out it's possible to figure out a type of a column without peeking the first item. It's something along the lines of:

    CFMetaData cfmd = CFMetaData.compile(schema, keyspace);
    ColumnDefinition column = cfmd.getColumnDefinitionFromColumnName(colName);
    column.getValidator().getSerializer();

We know both the schema and the keyspace. CassandraClusterInfo gets them, and they are used in CQLTarget here.

It looks like it could be possible to figure stuff out from the CQL schema we already have. Do you think trying to figure this out is worth the effort?

mattnworb commented 9 years ago

@rzvoncek one possible complication with that approach would be that the cluster metadata isn't known until we get to CQLResourceIO.write(DataResource, PCollection<CQLRecord>); so it seems like we would have to refactor CQLRecord a bit so that it could store the Maps/Sets/Lists internally in some form and then delay translating those collections to ByteBuffers until the writing to sstables actually occurs in the reduce phase.

rzvoncek commented 9 years ago

@mattnworb you're right. Let's not over-complicate .)