gwu-libraries / TweetSets

Service for creating Twitter datasets for research and archiving.
MIT License
25 stars 2 forks source link

T128 refactor loader #128, #122 #138

Closed dolsysmith closed 3 years ago

dolsysmith commented 3 years ago

Summary

This branch contains the following changes:

  1. Upgrades Spark to v. 3.1.2
  2. Upgrades Java to v. 11.11+9
  3. Upgrades Python to 3.8 (and other dependencies as required)
  4. Upgrades Pyspark to 3.1.2
  5. spark-loader command now uses the Spark DataFrame API to 1) load Tweets into ES and 2) create extracts.

Setup

You will need to have an NFS mount shared between your primary and cluster nodes/VM's in order for this branch to work properly. See the instructions in the original issue.

Make sure that loader.docket-compose.yml is configured to build the Docker image for the Dockerfile-loader.Likewise for the spark-master and spark-worker containers in docker-compose.yml on both primary and cluster nodes.

You'll also need to update your .env file to point the DATASET_PATH variable to the shared NFS mount. (On my VM's, this is /storage/dataset_loading.)

Currently, the data extracts are written to the same directory used for loading datasets. So they will not appear in the TweetSets UI, which is still looking for them elsewhere. But I didn't want to touch tweetset_server.py in this branch, given the work Laura has been doing with the Python 3.8 upgrade.

Testing

It will be useful to load the same dataset with the regular loader and then with the Spark loader, in order to compare results in the UI.

  1. Bring up TweetSets (making sure it builds the new images for Dockerfile-spark.
  2. Create a dataset.json file in the directory with the JSON of the tweets to load.
  3. Bring up the loader container.
  4. Load a sample dataset using the regular (non-Spark) loader.
  5. Edit the dataset.json with a name to distinguish it from the previous load.
  6. Load the dataset again with the following command:
spark-submit \
 --jars elasticsearch-hadoop.jar \
 --master spark://$SPARK_MASTER_HOST:7101 \
 --py-files dist/TweetSets-2.1.0-py3.8.egg,dependencies.zip \
 --conf spark.driver.bindAddress=0.0.0.0 \
 --conf spark.driver.host=$SPARK_DRIVER_HOST \
 tweetset_loader.py spark-create /dataset/sample/json

This presumes that your sample dataset is in the storage/dataset_loading/sample directory (or whatever NFS mount is mapped to /dataset in the .ENV file). In my testing, I put the tweet JSON files and dataset.json in a json subdirectory, but that's not strictly necessary.

  1. Unfortunately, the UI will not work on this branch, due to the changes required for Python 3.8. So to test it in the UI, you should: a. Bring down TweetSets on both clusters. b. Check out the master branch or Laura's Py38 branch (t126-python-38). c. Remove the server image (on the primary node) to force rebuild: docker image rm ts_server-flaskrun:latest d. Brink TweetSets back up.

Expected Results

Elasticsearch indexing

Let me know if you see inconsistencies in the indexing that don't make sense with the above.

Dataset extracts

Each should contain one or more zipped files. I've tested the CSV files against those created by twarc.json2csv (1.12.1) and documented some minor differences in the data dictionary. Feel free to test them against full extracts created in the UI, but do keep in mind that the current version of TS is using an older version of json2csv.

In my testing, Spark created far too many files for the mention extracts. I assume that is a setting that can be configured, but I haven't looked into that yet.

Performance

I am curious to hear how you experience it. I would expect this code to be faster at least for large datasets -- it was so in testing on my own laptop -- but I haven't had a chance to test a large dataset on the VM's. There may be Spark settings we can tweak to improve performance, but my impression is that for some of these, what's optimal depends on the environment, and our dev VM's are not terribly good proxies for production. We might have more success testing this aspect on tweetsets-dev.

dolsysmith commented 3 years ago

Updated spark_utils.py so that the original JSON (unparsed) is stored in the tweet field. This method avoids discrepancies arising from the difference between Python's and Spark's handling of null fields in the original tweet JSON. The short version is that some fields in Twitter's JSON schema are present with null values, and others can be absent entirely. Spark, being schema-based, will treat absent fields and fields present but with a null value the same. Python, on the other hand, will look for the presence of a key, and according to the logic in json2csv, sometimes it will behave differently depending on whether a key is present (but empty) or absent entirely. This change should obviate this problem, as well as preserve the original structure of the Twitter API JSON.

dolsysmith commented 3 years ago

Updated the JSON schema file so that the full_text and retweeted_status.full_text fields are read (if present) by the loader.

dolsysmith commented 3 years ago

Updated branch as follows:

  1. Writing to ES (tweetset_loader.py, line 354) now uses only the subset of fields we want to index.
  2. elasticsearch-hadoop configuration updated to exclude tweet_id field from indexing.
  3. JSON object of tweet now joined to parsed tweet by unique tweet id. (I was joining on a row number previously, but such numbering is evidently not determinate when loading from multiple files. In other words, loading the same dataset twice can produce different row numberings if the data are distributed across multiple files.)
dolsysmith commented 3 years ago

New approach: we load the JSON-L as an RDD, then convert that to a DataFrame, allowing us to preserve the original string representation of the JSON as a separate column and obviating the need for a join (which creates problems on smaller datasets). This approach seems stable, but performance has taken a hit (relative to previous implementations) that is evident on larger datasets. Loading 20 GB with the new implementation (not counting the time to create extracts) took ~35 min, vs. 20 min using the implementation currently in production.

lwrubel commented 3 years ago

Is that comparison against the previous implementations of Spark 3? Or compared to TweetSets 2.1?

lwrubel commented 3 years ago

Oh never mind, I see you said compared to production. Sorry!

dolsysmith commented 3 years ago

This versions uses the RDD API for loading to TweetSets (in order to preserve the original JSON as is) and the DataFrame API to create the extracts. Performance is comparable to what's in production for loading and significantly improved for creating extracts.

The Spark SQL code includes fields that we use for indexing in Elasticsearch; these are dropped when creating the CSV. I will leave them there for now (I don't think their presence really impacts performance) with an eye toward a future release where we no longer need to load the full JSON into Elasticsearch. At that point, we can use the DataFrame API for everything (which should improve performance further).