Closed timrobertson100 closed 4 years ago
@timrobertson100 has a separate Oozie workflow for this uncommitted, but recent MOL / AVRO work make me think to build out more download formats will be more maintainable
@timrobertson100 I guess the challenge here is where and how would a user inject their Spark code like that at https://github.com/gbif/gbif-bloodhound/blob/master/oozie-export/src/main/scala/org/gbif/bloodhound/Process.scala? You'd want some assurances that it's tested & won't cripple your cluster when executed. And, you wouldn't want to be the one to store and maintain these, write help docs, and offer user support, right?
Thanks @dshorthouse
I think you might misunderstand the proposal. This is about porting your work into a download format that we can evolve over time, not about you submitting arbitrary Spark code - we're not equipped to support that for the reasons you say, but we can try and simplify things for you.
Wouldn't be the first time :) Thought you were playing for something generalized and available to all. What you proposed here is just fine and much appreciated. When/if my Spark code does change (unlikely for now), how would we coordinate this so you don't have to maintain it?
Thanks @dshorthouse - we may at some point support arbitrary SQL downloads but here I'm looking to merge the work from the gbif-bloodhound repository to ease the DOI assignments.
It will actually just be ported to SQL (Hive HQL to be precise), which will run on MapReduce or Spark so it will be very easy to adjust the format. A PR from you or simply open an issue for us and it can be addressed and deployed very quickly. It will sit in its own sub-workflow, so we can enhance it if needed with more smarts (e.g. native Spark code)
The current queries give tables like this:
agent name recordedBy_gbifIDs identifiedBy_gbifIDs
Ångström, J. 2578910099;2578910728 ␀
Ångström, Johann 1019761413;1019761377;436874154;436815242 ␀
Åsa Dahlkild 849587770;849588386;849587991;849588442;849587793;849588368;849587995;849587973;849588127;849588502;849588766;849588241;849588954;849588019;849588910;849587781;849589402;849587883;849589284;849588194;849588372;849588075;849588230;849588669;849588922;849587834;849588806;849587916 ␀
The list of gbifIDs can be extremely long, for example "Die Mitarbeiterinnen und Mitarbeiter der Floristischen Kartierung Deutschlands" has recorded about 8 million occurrences. That's a very long field, about 100MB.
For the family table it's even more -- 68M occurrences, giving a 750MB field value. I This is going to cause problems in Hive -- even if it could be made to work now, I'm concerned that changes in data could exceed available memory.
Instead, I propose that the export format is a more conventional (i.e. easier to manage with text-based tools) pair of tables, which can be joined and grouped on David's side:
agent name recordedBy_gbifIDs
Ångström, J. 2578910099
Ångström, J. 2578910728
Ångström, Johann 1019761413
Ångström, Johann 1019761377
Ångström, Johann 436874154
Ångström, Johann 436815242
Similarly for the families.
Thanks, Matt. First, I do not include observational data. I expect that I've been somewhat fortunate where massive content in one cell has not (yet) happened. Your proposed export format would be a bit of a regression for me because the point of this odd structure is to make unique agent strings and then dump these rows into a processing queue while retaining gbifID membership. I would either have to redesign how that processing works, which might not be such a bad thing, or I would have to rewrite portions of my Scala script to repeat the grouping & in effect repeat the original goal. I'd have to toy with your structures and see if it would afford any benefit.
First, I do not include observational data.
That is likely to make a huge difference; I will retry and see what the performance is. Which basisOfRecord values did you include? Or if it's more complicated, can you give the previous GBIF download you used? (Are specimens of long-dead collectors also excluded?)
Here's the latest one https://doi.org/10.15468/dl.9upnjj
https://www.gbif-uat.org/occurrence/download/0000034-200427174931356
This is a small test download (from the UAT system), just to review the format.
Archive: /home/matt/0000034-200427174931356.zip
Length Date Time Name
--------- ---------- ----- ----
169850860 2020-04-28 12:14 0000034-200427174931356.tsv
9741428 2020-04-28 12:14 0000034-200427174931356_agents.tsv
6405568 2020-04-28 12:14 0000034-200427174931356_families.tsv
--------- -------
185997856 3 files
This one is a bit larger (2.5GB): https://www.gbif-uat.org/occurrence/download/0000036-200427174931356
There's a problem generating full datasets which I'm looking into — hopefully just a case of putting some configuration in the right place.
The format is as you originally requested, except I have added license
to the first table, since it's used to generate the citation and ought to be kept with each record, and total_
fields to the agent and family tables, which I think could be useful when reading the data (knowing how much is to follow).
Could you have a look and see if this fits your processing?
@MattBlissett thanks for all this work! I had a look at the smaller test file and, although you have some "v_*" prefixes on the columns, I can probably make this work. I was originally hoping for either parquet or avro files in place of the massive occurrence file & also directories of csv/tsv files instead of single files for agents & families files, but what you have is probably best to avoid platform/java version compatibility nightmares.
But...
I've just now begun to do something with your new recordedByID and identifiedByID terms. Can these be added to the large occurrence tsv much as you've done with the license term?
Thanks @dshorthouse
Suggest Avro as the better format for portability reasons.
Can you please confirm that by directories you mean a directory and then multiple file parts to aid parallel processing on your end?
@timrobertson100 re: directories – That's correct, w/ the file parts being csv files. And yes, the reason is for parallel processing. However, this is merely to parallelize the loading onto a queue at my end. So, if produced as large tsv files as @MattBlissett has here, it's not the end of the world because processing jobs is the bottleneck for me, not the loading onto a queue.
So there could be a file 0000034-200427174931356.zip containing three files:
Just to confirm that part of the file is Avro, and part TSV.
Apologies if I'm overlooking something obvious, but I had understood:
occurrence/part-0000*.avro
agents/part-0000*.avro
families/part-0000*.avro
Sorry for the confusion. Was more like this:
occurrence/part-0000*.avro
agents/part-0000*.csv
families/part-0000*.csv
Is the query included in a repo anywhere? I have a suspicion that this https://github.com/gbif/gbif-bloodhound/blob/master/oozie-export/src/main/scala/org/gbif/bloodhound/Process.scala is not being used here. I made a few commits there, assuming that was the place to do so.
https://www.gbif-uat.org/occurrence/download/0000042-200427174931356
There's some small improvements to the format still to do (name/namespace in the Avro schema for example), but this has recordedByID and identifiedByID, Avro + TSV, and everything split into files -- just the default as our UAT has produced.
The main part of the process is a Hive script here: https://github.com/gbif/occurrence/blob/bloodhound/occurrence-download/src/main/resources/download-workflow/bloodhound/hive-scripts/execute-bloodhound-query.q -- but it might (or might not) end up being converted back to a Scala script.
Great! I saw:
CAST(toLocalISO8601(dateidentified) AS BIGINT) AS dateIdentified
and CAST(toLocalISO8601(eventdate) AS BIGINT) AS eventDate
...but not sure what to make of these as you already have v_identifiedBy
and v_eventDate
, which I assume are (equally?) processed. Hoped to have verbatim dates as well as your v_*
dates because the former as you know can be disasters though interpretable (to a degree) by humans.
That looks like a mistake to me, https://www.gbif-uat.org/occurrence/download/0000044-200427174931356 has it fixed.
All the v_
-prefixed fields are verbatim fields; the other fields are processed.
Here's a download with everything we have on UAT (53 million specimens) : https://www.gbif-uat.org/occurrence/download/0000054-200429162749254
These are the fields in the occurrence table:
gbifID,
datasetKey,
license,
countryCode
toLocalISO8601(dateidentified) AS dateIdentified,
toLocalISO8601(eventdate) AS eventDate,
array_contains(mediaType, 'StillImage') AS hasImage,
These are all after going through GBIF interpretation. The dates will be in the form "2020-05-04T16:25:00"
These are all unprocessed, verbatim fields, including v_dateIdentified
and v_eventDate
.
Issues with the performance of the process have been fixed, so once we've settled on a data format we can run this in production.
The zip file contains three directories, occurrence.avro, agents.tsv and families.tsv. For the TSV directories, the first file 000000
is just the header row. At first I thought this was annoying, but it's actually convenient -- it can be easily ignored, or prepended to any other block:
82 2020-05-04 13:34 agents.tsv/000000
5301130 2020-05-04 13:34 agents.tsv/000001
5041933 2020-05-04 13:34 agents.tsv/000002
@MattBlissett This is wonderful! Am downloading this now and will let you know how I make out.
@MattBlissett Finally had a chance toy with the outputs you've generated. I have no issues using Spark in a Scala script to read the occurrence.avro. However, I am struggling to properly read the *.tsv outputs back into Spark with that header file, 000000. Turns out I needed the header in each of the files because that simplifies the way I'm loading the rows into a processing queue.
So...
Do you think we can forgo with the *.tsv and just stick with an avro output for agents and families as was done for occurrence.avro? As the latter seems to work, am hoping this will result in more consistency in data products with less need for support.
I will generate another DwC-A download later this week, does that give enough time to adjust the BLOODHOUND export by stripping out the .tsv & replacing with .avro such that the three are: occurrence.avro, agents.avro and families.avro? And then executing?
Hi -- I'm getting back to this.
I've replaced the TSV files with Avro, giving occurrence.avro, agents.avro and families.avro.
Here's a small test: https://www.gbif-uat.org/ru/occurrence/download/0000214-200812085136737
And here might be a larger one (all specimens on GBIF-UAT.org), unless it fails: https://www.gbif-uat.org/ru/occurrence/download/0000216-200812085136737
Was there anything else? I think the size of the Avro chunks could use some tuning, but it perhaps doesn't really matter.
This is great @MattBlissett! Since you've done this work, I monkeyed with what terms to pull and from where. It's stabilized now and can be seen at https://github.com/bionomia/bionomia/blob/master/spark.md. What you'll see here affects how the occurrence.avro is constructed.
In particular, the content drawn from verbatim.txt generated from a DwC-A at your end include:
val verbatimTerms = List(
"gbifID",
"occurrenceID",
"dateIdentified",
"decimalLatitude",
"decimalLongitude",
"country",
"eventDate",
"year",
"identifiedBy",
"institutionCode",
"collectionCode",
"catalogNumber",
"recordedBy",
"scientificName",
"typeStatus",
"recordedByID",
"identifiedByID"
)
...and those from the occurrence.txt include:
val processedTerms = List(
"gbifID",
"datasetKey",
"countryCode",
"dateIdentified",
"eventDate",
"mediaType",
"family"
)
And finally, what columns get renamed, filtered, and how these two sources are joined with a leftouter are undoubtedly different from what you have.
I think we're almost there. I've changed the verbatim and processed fields to reflect your changes (the main one is using the processed family rather than the verbatim family). You can see the chosen columns in the Hive query script here. Columns prefixed with v_
are verbatim, the others are processed — I'm keeping this consistent with the rest of the GBIF codebase, it would be too confusing otherwise.
I've made a download from the UAT system (which doesn't have all data), and I added additional download predicate filters which should make some of the filters in your Spark job redundant: https://www.gbif-uat.org/occurrence/download/0000274-200812085136737 (the excluded dataset is just due to some corrupt data on UAT).
How does it look to you?
In principle, it looks good. Thanks for reworking this! I'll have to fire-up another Digital Ocean droplet so I can send it across the pond so gimme a few days.
@MattBlissett I downloaded your zipped avros & minimally adjusted my scripts to accommodate the minor differences in naming conventions. These are perfect and will accelerate processing at my end at least 10-fold. Amazing! What are the next steps? I was hoping to execute another download end of next week Oct. 15 or 16 before TDWG gets underway. Is it too ambitious for this to be in production by then? If not, how would I request this download? I'd be sure to make mention of this in my presentation & manuscript that I'm working on.
That timing would also help, as I'd like to announce it in the governing board update presentation too (same week) as part of work done to support others.
Thanks all, this work is complete -- if further changes are needed, please open new issues.
To help enable @dshorthouse to focus more on expansion and less on data processing, GBIF will provide pre-wrangled data.
The first iteration of this has been run using this process which can be summarised as:
This can be drastically improved by creating a
BLOODHOUND_PROCESSING
(we can find a better name, perhaps reusable in other contexts) download format which would:This would mean at any point Bloodhound can send a query (changing their query over time) and get a DOI for the exact data export used. We can then evolve this format over time.
@dshorthouse - can you please review and suggest edits?