telefonicaid / fiware-cygnus

A connector in charge of persisting context data sources into other third-party databases and storage systems, creating a historical view of the context
https://fiware-cygnus.rtfd.io/
GNU Affero General Public License v3.0
65 stars 105 forks source link

Support multiple file format in HDFS (CSV, Parquet... in addition to JSON) #303

Closed frbattid closed 9 years ago

frbattid commented 9 years ago

Apache Parquet is a columnar storage format available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model or programming language. (http://parquet.incubator.apache.org/)

Current cygnusagent.sinks.hdfs-sink.attr_persistence parameter can be extended not only for working with row and column but parquet as well.

Even, the row and column modes could be renamed to json-row and json-column respectively. The cygnusagent.sinks.hdfs-sink.attr_persistence colud be also renamed as cygnusagent.sinks.hdfs-sink.file_format.

These changes would allow for further future file formats.

frbattid commented 9 years ago

As discussed offline with @elenatid, another file format could be CSV, both row and column.

frbattid commented 9 years ago

Regarding the CSV persistence, a problem arise when dealing with the attribute's metadata.

Metadata is a field that cannot be easily serialized as CSV because its value may be an array of objects. Thus, the CSV records may be result with different lengths, and this may be a problem for Hive tables.

The following solution was discussed with @elenatid: to link from the metadata field to a new HDFS file containing a new record for each object within the array. This link will be simply the name of the file, for instance:

# content of /usr/frb/def_serv/def_serv_path/room1_room/room1_room.txt:
23-05-2015T06:37:41,1230923875,room1,room,temperature,float,23.5,/user/frb/def_serv/def_serv_path/room1_room_temperature_md/room1_room_temperature_md.txt

# content of /user/frb/def_serv/def_serv_path/room1_room_temperature_md/room1_room_temperature_md.txt
1230923875,timestamp_meassure,long,1230923874
1230923875,timestamp_sent,logn,1230923875

The recvTimeTs value within /usr/frb/def_serv/def_serv_path/room1_room/room1_room.txt is used as a key for finding the metadata associated to each record. I mean, if a second update is notified then we will have:

# content of /usr/frb/def_serv/def_serv_path/room1_room/room1_room.txt:
23-05-2015T06:37:41,1230923875,room1,room,temperature,float,23.9,/user/frb/def_serv/def_serv_path/room1_room_temperature_md/room1_room_temperature_md.txt
23-05-2015T06:37:51,1230933870,room1,room,temperature,float,24.1,/user/frb/def_serv/def_serv_path/room1_room_temperature_md/room1_room_temperature_md.txt

# content of /user/frb/def_serv/def_serv_path/room1_room_temperature_md/room1_room_temperature_md.txt
1230923875,timestamp_meassure,long,1230923874
1230923875,timestamp_sent,long,1230923875
1230933870,timestamp_meassure,long,1230933869
1230933870,timestamp_sent,long,1230933870

As said, this works if the metadata length changes:

# content of /usr/frb/def_serv/def_serv_path/room1_room/room1_room.txt:
23-05-2015T06:37:41,1230923875,room1,room,temperature,float,23.9,/user/frb/def_serv/def_serv_path/room1_room_temperature_md/room1_room_temperature_md.txt
23-05-2015T06:37:51,1230933870,room1,room,temperature,float,24.1,/user/frb/def_serv/def_serv_path/room1_room_temperature_md/room1_room_temperature_md.txt
23-05-2015T06:38:01,1230943871,room1,room,temperature,float,24.2,/user/frb/def_serv/def_serv_path/room1_room_temperature_md/room1_room_temperature_md.txt

# content of /user/frb/def_serv/def_serv_path/room1_room_temperature_md/room1_room_temperature_md.txt
1230923875,timestamp_meassure,long,1230923874
1230923875,timestamp_sent,long,1230923875
1230933870,timestamp_meassure,long,1230933869
1230933870,timestamp_sent,long,1230933870
1230943871,timestamp_meassure,long,1230943870
1230943871,timestamp_sent,long,1230943871
1230943871,verified,boolean,true

For any other attribute, we would have /user/frb/def_serv/def_serv_path/room1_room_<attr_name>_md/room1_room_<attr_name>_md.txt.

The mechanism is valid both for row and column mode.

frbattid commented 9 years ago

Implemented in PR https://github.com/telefonicaid/fiware-cygnus/pull/492