MI-DPLA / combine

Combine /kämˌbīn/ - Metadata Aggregator Platform
MIT License
26 stars 11 forks source link

S3 export #371

Closed ghukill closed 5 years ago

ghukill commented 5 years ago

Suggestion for S3 export of Records. Pointed out this includes a built-in airgap of Combine and published records, helping users decide what constitutes a publish.

With the ability to chunk exports, some S3 communications already taking place, and the existence of Spark/S3 connectors, seems like a relatively easy feature to add.

ghukill commented 5 years ago

POC confirmed.

Will require setting AWS credentials in place where Spark can access. One option is /opt/spark/conf/spark-defaults.conf (requiring Spark session restart, but not cluster):

# s3
spark.hadoop.fs.s3a.access.key=ACCESS_KEY_HERE
spark.hadoop.fs.s3a.secret.key=SECRET_KEY_HERE

Then, it's as straightforward as writing the RDD to S3 instead of filesystem:

# from pyspark shell
from core.spark.console import *

# get rdd to write -- job 1567 -- selecting only documents
rdd_to_write = get_job_as_df(spark, 1567).select('document').rdd

# save as text file to s3
rdd_to_write.saveAsTextFile('s3a://goobertronic/j1567_xml_documents')

Results in S3 console look like:

screen shot 2019-01-07 at 1 00 49 pm

As can be seen here by parts, results will be written based on partitions from RDD which is already user configurable from GUI. This will allow users to write to S3 with the partitions that work best for them.

It will be relatively easy to add S3 as an option for exporting documents, where users will provide the following:

Writing mapped fields -- JSON or CSV -- is a natural followup for this. And/or, writing stateio exports to S3? And allowing stateio imports from S3?

ghukill commented 5 years ago

Confirmed can also set these credentials dynamically from within a spark session with the following:

spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", settings.AWS_ACCESS_KEY_ID)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", settings.AWS_SECRET_ACCESS_KEY)
ghukill commented 5 years ago

Questions remain, where and how should Records be published or exported to S3? Lowest hanging fruit is probably to just extend exporting to S3, with some various options:

ghukill commented 5 years ago

Exporting documents to S3 coming nicely along. Ability to export Spark RDD (including all columns from Record model) or the archive file that would have been written to local filesystem. RDD opens up the possibility of being loaded directly into another Spark instance, and could support S3 harvesting.

Next up, exporting mapped fields and tabular data to S3.

S3 read/writes are done natively from Spark or boto3 for non-Spark. For SPark, credentials are set dynamically for each call with settings.AWS_ACCESS_KEY_ID and settings.AWS_SECRET_ACCESS_KEY. For boto3 calls -- e.g. tasks.py -- credentials stored in ~/.aws/credentials are used, and are set when Combine is started. Credentials need to be rationalized across methods, with an eye towards how to handle multiple credentials.

ghukill commented 5 years ago

A considerable wrinkle, with adjustments:

Was pointed out that RDD written to s3 was essentially a lined file of rdd rows as strings. While these could be parsed, it was not readily, and likely at considerable efficiency cost.

Instead, it likely makes more sense to write to s3 in ways that will mirror how, and why, they will eventually be read. This becomes a matrix of:

types of exports: [records, mapped fields, tabular data] * spark read/write options: [jsonl, parquet, avro, etc.]

Instead of supporting all possible combinations, probably makes the most sense to focus on some known to be handy.

Parquet has worked extremely well for record documents, but that is for a dataframe with a limited number of columns (1:1 with Record table). However, parquet falls apart for mapped fields and tabular data, as the field names have characters which violate parquets column names: ,;{}()\\n\\t= (noting the blank space in the beginning as well). Specifically = is used for both mapped fields and tabular output, and () show up when siblings ids are included.

Tabular data emerges from XML2kvp as JSONL, so that's an option. Mapped fields were not even an option written to s3, as they were handled by other libraries, but we can select them from ES with get_job_es(spark, job_num). We get a dataframe es_df which we can write/read from s3 a la:

es_df.write.json('s3a://BUCKET/es_mapped_jsonl')
es_df = spark.read.json('s3a://BUCKET/es_mapped_jsonl')
ghukill commented 5 years ago

Had considered that even record documents could be stored as JSONL as way to have same method for all, but parquet's compression is not insignificant:

ghukill commented 5 years ago

Went with a mixed approach for the various forms of exporting:

Preparing to merge, closing this issue. Likely will need to address bugs, but will break out into smaller issues.