src-d / gemini

Advanced similarity and duplicate source code at scale.
GNU General Public License v3.0
54 stars 16 forks source link

Run Gemini file-level duplicate detection on PGA #42

Open bzz opened 6 years ago

bzz commented 6 years ago

Document in README the resources, needed to successfully process 1k, 2k, 10k, 100k and whole PGA of the .siva files.

So good start would be

smacker commented 6 years ago

optimize, in order to utilize that resource better (i.e in case of throughput - have more executor JVMs running on the same machine)

how do to that? we don't control spark cluster.

bzz commented 6 years ago

how do to that? we don't control spark cluster.

let's measure, identify and document the bottleneck first, set preliminary expectations on resources for 100k and then discuss the possible options that we might have i.e this can be powerful argument for changing https://github.com/src-d/charts/tree/master/spark to Apache Spark on k8s.

We would be able to improve the performance expectation model, based on more data later on.

smacker commented 6 years ago

Hit https://github.com/src-d/engine/issues/323

bzz commented 6 years ago

Thanks for keeping it updated!

BTW, super-nice issue description and example how to reproduce 👍

bzz commented 6 years ago

Engine issue is resolved in https://github.com/src-d/engine/releases/tag/v0.5.1

smacker commented 6 years ago

yep. But the engine api has changed a bit. We need to update gemini.

smacker commented 6 years ago

Run gemini on new 1k dataset with new engine. And it works!!!!

The bad new is timing: 24 min. I don't really know how to profile it, but I saw that only 1 job is taking much time, most probably there is 1 huge repo.

smacker commented 6 years ago

10k has failed with https://github.com/src-d/engine/issues/332

smacker commented 6 years ago

currently is blocked by https://github.com/src-d/engine/issues/336

bzz commented 6 years ago

To move this forward, as DR team is super-busy now, can we please submit a PR to engine that just logs RevWalkException without failing, same way as MissingObjectException is handled and run Gemini with this custom built version of Engine from this PR to avoid waiting for a release?

bzz commented 6 years ago

@carlosms could you please check if https://github.com/src-d/engine/pull/347 solves the issue and allows us to move forward with https://github.com/src-d/gemini/issues/42 ?

If that PR is tested on real data and solves the issue - it may be worth posting this information on the PR as well.

bzz commented 6 years ago

Engine 0.5.7 was release 🎉 with many bug fixes and discussion like https://github.com/src-d/minutes/pull/210/files#diff-a0ec2b18d53b6bebfc2a342ed864a52fR34 should rise the priority of finishing running Gemini file duplication up to PGA sizes.

bzz commented 6 years ago

Title and description are updated to represent the current goal.

smacker commented 6 years ago

10k repos are processed successfully with engine 0.5.7. Full PGA is failing with OOM with default params. Need to tune them.

bzz commented 6 years ago

Plan is:

bzz commented 6 years ago

PGA is downloading to the pipeline HDFS cluster on hdfs dfs -ls hdfs://hdfs-namenode/pga/siva/latest.

WIP by pga-alex pod with pga get -v -j 32 -o hdfs://hdfs-namenode:8020/pga 2>&1 | tee -a /go/pga-1.log

At this rate it will take ~25h to get there.

bzz commented 6 years ago

PGA download is finished 🎉 but it's a bit :suspect: as only 2.4Tb not 2.7Tb as the rumor has it to be. Would verify PGA integrity first with https://github.com/src-d/datasets/issues/53

bzz commented 6 years ago

Pre-conditions for running new Gemini on pipeline staging Apache Spark cluster:

bzz commented 6 years ago

blocked by src-d/backlog#1266

bzz commented 6 years ago
bzz commented 6 years ago

Full PGA was downloaded to HDFS 🎉 https://github.com/src-d/datasets/issues/53#issuecomment-396528917

$ zgrep -o "[0-9a-z]*\.siva" ~/.pga/latest.csv.gz | sort | uniq | wc -l
239807

$ hdfs dfs -ls -R hdfs://hdfs-namenode/pga/siva/latest | grep -c "\.siva$"
239807
bzz commented 6 years ago

Plan

  1. WIP: run latest Gemini Hash \w latest Engine 0.6.3 on single shard (~10Gb, ~1000 repos, ~1/250 of whole) using current staging pipeline cluster configuration
    • [x] Executor page was not available on proxied Spark Web UI https://github.com/src-d/issues-infrastructure/issues/187#issuecomment-396916618
    • [x] Run every stage of Gemini Hash
      • [x] get Files: ~30min (50min before), 2.5Gb (17Mb \wo content), Parquet: 900Mb
      • [x] extract UAST: ~1h, 2.3Gb, Parquet: 700mb https://github.com/src-d/engine/issues/402
      • [x] extract Features: ~1.1h, 230Mb, Parquet: 80Mb
      • [x] docFreq: 3sec, 110Mb, 73Mb json
      • [x] hashFeatures: ~2h now (due to excessive locking, 50min \wo lock), 170Mb, Parquet: 20Mb
      • [x] saveHashes: 16sec, hashes ~100Mb, meta ~30Mb
        • [x] Run Report
      • WIP: ./report using DB from the hash above ~6h
  2. run on whole PGA Improvements directions:
    • will need more performance optimizations
      • share single feClient/connecting per partition, instead of creating for every row
      • uast de-serialize -> serialize -> feature extractor
      • RDD -> DataFrames
    • support on-disk Parquet "cache" at every stage
    • process each shard individually?
bzz commented 6 years ago

Blocked, as all Feature Extractors are deployed under https://github.com/src-d/issues-infrastructure/issues/184 are part of new, separate Apache Spark cluster in a different k8s namespace -n feature-extractor, that does not seem to have access to HDFS 😕

bzz commented 6 years ago

Hash has finished successfully, I'm submitting PRs now to Gemini that enabled it.

Report is

bzz commented 6 years ago

1h for hashing a ~1/250 of PGA on 3 machines of pipeline staging cluster

Configuration

``` --conf "spark.executor.memory=16g" \ --conf "spark.local.dir=/spark-temp-data" \ --conf "spark.executor.extraJavaOptions='-Djava.io.tmpdir=/spark-temp-data -Dlog4j.configuration=log4j.properties'" \ --conf "spark.driver.memory=8g" \ --conf "spark.tech.sourced.engine.skip.read.errors=true" \ --conf "spark.files.maxPartitionBytes=12582912" \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.eventLog.enabled=true" \ --conf "spark.eventLog.dir=hdfs://hdfs-namenode.default.svc.cluster.local/apps/gemini/spark-logs" \ --files src/main/resources/log4j.properties \ ```

Command

time MASTER="spark://fe-spark-spark-master:7077" ./hash -v \
  -k dockergemini4 \
  -h scylladb.default.svc.cluster.local \
  hdfs://hdfs-namenode.default.svc.cluster.local/pga/siva/latest/ff | tee hash-pga-ff-4.logs

Output

Feature Extraction exceptions  
Processed: 4304060, skipped: 120
 - TimeoutException -> 109
 - StatusRuntimeException -> 11

real    61m2.058s

FE exceptions

ERROR SparkFEClient: feature extractor error: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
ERROR SparkFEClient: feature extractor error: io.grpc.StatusRuntimeException: INTERNAL: Exception deserializing request!   
FATAL vendor/golang.org/x/text/unicode/norm/tables.go: rpc error: code = ResourceExhausted desc = grpc: received message larger than max (4323138 vs. 4194304)

UAST extraction exceptions

WARN Bblfsh: FATAL src/main/java/org/yardstickframework/BenchmarkServerStartUp.java: EOF
WARN Bblfsh: FATAL xs6/extensions/crypt/crypt_ghash.js: message is not defined; unsupported: non-object root node
WARN Bblfsh: FATAL vendor/golang.org/x/text/encoding/charmap/tables.go: rpc error: code = ResourceExhausted desc = grpc: received message larger than max (5617479 vs. 4194304)

DB

$  kubectl exec -it scylladb-0 -- /bin/bash
$ cqlsh

use dockergemini4;
select count(1) from meta;
 127379

select count(1) from hashtables;
 426560
marnovo commented 6 years ago

Thanks a lot for the detailed results, @bzz!

Question: how are we sampling the repos for each of these tests?

bzz commented 6 years ago

Question: how are we sampling the repos for each of these tests?

Good question. We always just used only a single shard of PGA dataset - all the repos, who's siva file names start with prefix /ff/.

Overall, on Apache Spark performance depends on data distribution A LOT, so attaching .siva file size distribution histogram in 10mb buckets

hdfs dfs -du hdfs://hdfs-namenode/pga/siva/latest/ff/ | grep "\.siva$" | awk -v "size=100048576" -f hist.awk
0 100048576 912
100048576 200097152 17
200097152 300145728 2
300145728 400194304 1
400194304 500242880 1
500242880 600291456
600291456 700340032
700340032 800388608
800388608 900437184
900437184 1000485760 1
1000485760 1100534336 1
bzz commented 6 years ago

Local: 1mb, 30k features Cluster: 170Mb, 5.5mil features

DataFrame

local: 8sec, cluster: 4sec

val freqDf = features.withColumnRenamed("_1", "feature").withColumnRenamed("_2", "doc")
  .select("feature", "doc")
  .distinct
  .groupBy("feature")
  .agg(count("*").alias("cnt"))
  .map(row => (row.getAs[String]("feature"), row.getAs[Long]("cnt")))
  .collect().toMap

RDD

local: 4sec, cluster: 5s

val freq = features.rdd
  .map { case (feature, doc, _) => (feature, doc) }
  .distinct
  .map { case (token, _) => (token, 1) }
  .reduceByKey(_ + _)
  .collectAsMap()

DataFrame API does not seem to change performance much, but still has nice benefit of uniform API.

bzz commented 6 years ago

There are 141 .siva files bigger then 1Gb, with rest 260+k being smaller. Those outliers can be moved, to get shorter tail of task execution time on average

``` hdfs dfs -ls -R hdfs://hdfs-namenode/pga/siva/latest/ | grep "\.siva$" | awk '{if ($5 > 1073741824) print $8}' | wc -l 141 hdfs dfs -ls -R hdfs://hdfs-namenode/pga/siva/latest/ \ | grep "\.siva$" | awk '{if ($5 > 1073741824) print $8}' \ | cut -d '/' -f 7- \ | xargs -I{} sh -c 'hdfs dfs -mkdir -p $(dirname hdfs://hdfs-namenode/pga/siva/biggest/{}); hdfs dfs -mv hdfs://hdfs-namenode/pga/siva/latest/{} hdfs://hdfs-namenode/pga/siva/biggest/{}' ```
bzz commented 6 years ago

After moving biggest files, jobs fail with

org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 0, required: 32343809.

After setting spark.kryoserializer.buffer.max=1g jobs fail with

tech.sourced.siva.SivaException: Exception at file 022c7272f0c1333a536cb319beadc4171cc8ff6a.siva: At Index footer, index size: Java implementation of siva doesn't support values greater than 9223372036854775807

which at this point might indicate broken .siva files on pga get

smacker commented 6 years ago

@bzz here is your issue: https://github.com/src-d/engine/issues/414 different file but the same error.

bzz commented 6 years ago

Simple processing of full PGA from /pga2 \w Engine finished in 59.1 h, using 16cores/8Gb RAM on 9 machines on staging pipeline cluster 🎉

Removing outliers, ~140 .siva files (of ~270k) which are >1Gb each, would speed it up x2-3 times.

``` export SPARK_HOME="/opt/spark-2.2.0-bin-hadoop2.7" $SPARK_HOME/bin/spark-shell --master "spark://fe-spark-spark-master:7077" \ --name "Spark shell \w Gemini - PGA2, count file sizes" \ --conf "spark.executor.memory=8g" \ --conf "spark.cores.max=144" \ --conf "spark.hadoop.javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=metastore_db2;create=true" \ --conf "spark.local.dir=/spark-temp-data" \ --conf "spark.executor.extraJavaOptions=-Djava.io.tmpdir=/spark-temp-data -Dlog4j.configuration=log4j.properties -XX:+HeapDumpOnOutOfMemoryError -XX:MaxDirectMemorySize=20G" \ --conf "spark.driver.memory=8g" \ --conf "spark.tech.sourced.engine.skip.read.errors=true" \ --conf "spark.files.maxPartitionBytes=12582912" \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.kryoserializer.buffer.max=1g" \ --conf "spark.eventLog.enabled=true" \ --conf "spark.eventLog.dir=hdfs://hdfs-namenode.default.svc.cluster.local/apps/spark" \ --files src/main/resources/log4j.properties \ --jars target/gemini-deps.jar,target/gemini-uber.jar ``` ```scala val files = repos.getHEAD .getCommits .getTreeEntries .getBlobs .filter('is_binary === false) files .sort("repository_id") .coalesce(1000) .write.parquet("hdfs://hdfs-namenode.default.svc.cluster.local/pga2/parquet/files") ```


Caching all files on-disk in Parquet fails though, \w

Job aborted due to stage failure: Total size of serialized results of XX tasks (1025 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

This happens due to the fact that DF API for a String column keeps in-memory the longest string, which is a full file content and is more then 1Gb.

bzz commented 6 years ago

On-disk Parquet cache was failing due to the number of tasks ~40k beeing to high for our clusted configuration, was fixed by reducing the number and can proceed over full PGA (~50h) 🎉 but is failing at the end now 😖 with

Job aborted due to stage failure: Task 49 in stage 4.1 failed 4 times, most recent failure:
Lost task 49.3 in stage 4.1 (TID 27912, 10.2.56.116, executor 1):
java.io.FileNotFoundException: /spark-temp-data/spark-fb7fbf1e-033e-4122-9464-16acdc52fe34/executor-801bb555-6e93-4c4e-b3f8-46bc33ca9639/blockmgr-309ecee9-419e-45ab-a595-03dc3157b641/26/temp_shuffle_2eddf3bb-b5c0-488c-b397-47e4e4921a32 
(No such file or directory)
Simple example to reproduce ```scala val path = "hdfs://hdfs-namenode.default.svc.cluster.local/pga2/siva/latest/" val engine = Engine(spark, path, "siva") val repos = engine.getRepositories val files = repos.getHEAD .getCommits .getTreeEntries .getBlobs .filter('is_binary === false) files .coalesce(1000) .sort("repository_id") .write.parquet("hdfs://hdfs-namenode.default.svc.cluster.local/pga2/parquet/files") ```