bio-guoda / guoda-services

Services provided by GUODA, currently a container for tickets and wikis.
MIT License
2 stars 0 forks source link

generate index column #76

Closed ialzuru closed 4 months ago

ialzuru commented 5 years ago

When working with the datasets and trying to find duplicates it is convenient to generate an index column (with unique values) in order to be able to reference specific rows. The following code creates a column that it is a sequential integer number:

w = Window().orderBy(lit('A')) df = df.withColumn("row_num", row_number().over(w))

The problem of this trick is that runs an "order by" on the entire dataset. Therefore, for small-size datasets works, but for the big GBIF-iDigbio dataset does not work. After ~1 hour running, the namenodes of the cluster run out of space. What would be a fast and doable process to add an index column? Is a hash on the entire row content guaranteed to be unique? Thanks

jhpoelen commented 5 years ago

@ialzuru do you have some error logs? What is the configuration you used to run the job in the cluster?

jhpoelen commented 5 years ago

orderBy is a pretty heavy operation, especially when done across >4 billion records. If you do end up with a giant orderBy (or sort), suggest to first reduce the data frame as much as possible (dropping unused columns), then do expensive operations.

Perhaps more efficient for your use case is to use spark's zipWithIndex or zipWithUniqueId . Looks like zipWithUniqueId might be most efficient because docs claim that it doesn't generate a spark job. For more info, see https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD@zipWithUniqueId():org.apache.spark.rdd.RDD[(T,Long)] .

jhpoelen commented 5 years ago

Here's an example of a successful export of id, name and dataset hashes

scala> val allthedata = spark.read.parquet("hdfs:///guoda/data/source=preston.acis.ufl.e
du/dwca/core.parquet")                                                                  
scala> import spark.implicits._                                                         
scala> val nameAndSource = allthedata.select("`http://rs.tdwg.org/dwc/terms/scientificNa
scala> nameAndSource.rdd.zipWithUniqueId.map( x => (x._2, x._1._1, x._1._2)).toDS.write.
parquet("/user/jhpoelen/idNameSource.parquet")                                          
scala> val names = spark.read.parquet("/user/jhpoelen/idNameSource.parquet")            
names: org.apache.spark.sql.DataFrame = [_1: bigint, _2: string ... 1 more field]       

scala> names.show(5)                                                                    
+-----+--------------------+--------------------+                                       
|   _1|                  _2|                  _3|                                       
+-----+--------------------+--------------------+                                       
| 1195|Microchrysa sp. B...|hash://sha256/fe4...|                                       
| 5352|Mindarus sp. BIOU...|hash://sha256/fe4...|                                       
| 9509|Muscaphis sp. BIO...|hash://sha256/fe4...|                                       
|13666|Myzocallis sp. BA...|hash://sha256/fe4...|                                       
|17823|Myzocallis sp. BI...|hash://sha256/fe4...|                                       
+-----+--------------------+--------------------+                                       
only showing top 5 rows