GlobalFishingWatch / anchorages_pipeline

Python pipeline for anchorages
Apache License 2.0
6 stars 3 forks source link

Add visit-id #56

Closed bitsofbits closed 4 years ago

bitsofbits commented 4 years ago
bitsofbits commented 4 years ago

Run a year of visits based off of events in pipe_production_v20190502

    docker-compose run port_visits \
        --job_name portvisitssharded \
        --events_table world-fishing-827.pipe_production_v20190502.port_events_ \
        --start_date 2018-01-01 \
        --end_date 2018-12-31 \
        --start_padding 365 \
        --output_table machine_learning_dev_ttl_120d.port_visits_test_visit_id_v20191212A \
        --project world-fishing-827 \
        --max_num_workers 100 \
        --requirements_file requirements.txt \
        --project world-fishing-827 \
        --staging_location gs://machine-learning-dev-ttl-30d/anchorages/portevents/output/staging \
        --temp_location gs://machine-learning-dev-ttl-30d/anchorages/temp \
        --setup_file ./setup.py \
        --runner DataflowRunner \
        --disk_size_gb 100 
bitsofbits commented 4 years ago

Run year of voyages, but manually setting table names in voyages query:

#StandardSql
#
# Generate a voyages table, using the query generated by Nate and Tim
# For reference check:
# https://github.com/GlobalFishingWatch/GFW-Tasks/issues/1034
#

#
# Get a map of ssvid to vessel_id
#
with ssvid_map as (
  select
    ANY_VALUE(ssvid) as ssvid,
    vessel_id
  from `pipe_production_v20190502.vessel_info`
  group by vessel_id
),

#
# Get all port visits and include the vessel-id from the map created above
#
all_real_port_visits_with_vessel_id as (
  SELECT
    ssvid,
    vessel_id,
    start_anchorage_id,
    visit_id,
    start_timestamp,
    end_anchorage_id,
    end_timestamp
  from `machine_learning_dev_ttl_120d.port_visits_test_visit_id_v20191212B*`
  join ssvid_map
    using (vessel_id)
),

dummy_initial_port_visits_with_vessel_id as (
  select
    ssvid,
    vessel_id,
    'NO_PREVIOUS_DATA' as start_anchorage_id,
    'NO_PREVIOUS_DATA' as visit_id,
    timestamp('0001-2-3') as start_timestamp,
    'NO_PREVIOUS_DATA' as end_anchorage_id,
    timestamp('0001-2-3') as end_timestamp
  from (select ssvid, vessel_id from ssvid_map group by 1,2)
),

dummy_final_port_visits_with_vessel_id as (
  select
    ssvid,
    vessel_id,
    'ACTIVE_VOYAGE' as start_anchorage_id,
    'ACTIVE_VOYAGE' as visit_id,
    timestamp('9999-9-9') as start_timestamp,
    'ACTIVE_VOYAGE' as end_anchorage_id,
    timestamp('9999-9-10') as end_timestamp
  from (select ssvid, vessel_id from ssvid_map group by 1,2)
),

all_port_visits_with_vessel_id as (
  select * from 
    (select * from dummy_initial_port_visits_with_vessel_id)
    union all
    (select * from dummy_final_port_visits_with_vessel_id)
    union all
    (select * from all_real_port_visits_with_vessel_id)
),

#
# Get all port entries
#
entries as (
  SELECT
    ssvid, 
    ARRAY_AGG(vessel_id) vessel_ids,
    start_anchorage_id as anchorage_id,
    visit_id,
    start_timestamp as timestamp
  from all_port_visits_with_vessel_id 
  group by 1,3,4,5
),

#
# Get all port exits
#
exits as (
  SELECT
    ssvid, 
    end_anchorage_id as anchorage_id,
    visit_id,
    end_timestamp as timestamp 
  from all_port_visits_with_vessel_id
  group by 1,2,3,4
),

#
# Generate pairs of port entries and exits taking into account the
# timestamp of the events
#
pairs as (
  select
    ssvid,
    entries.vessel_ids,
    exits.timestamp as exit,
    entries.timestamp as entry,
    exits.anchorage_id as exit_id,
    entries.anchorage_id as entry_id,
    exits.visit_id as exit_visit_id,
    entries.visit_id as entry_visit_id
  from exits
  join entries
    using (ssvid)
  where entries.timestamp > exits.timestamp
  and not (
           exits.anchorage_id = 'NO_PREVIOUS_DATA' 
           and
           entries.anchorage_id = 'ACTIVE_VOYAGE'
          )
),

ranked as (
  select
    ssvid,
    vessel_ids,
    exit,
    entry,
    exit_id,
    entry_id,
    exit_visit_id,
    entry_visit_id,
    row_number() over (partition by ssvid, exit order by entry asc) as rn
  from pairs
)

select
  ssvid,
  vessel_ids,
  exit as trip_start,
  entry as trip_end, 
  exit_id as trip_start_anchorage_id,
  entry_id as trip_end_anchorage_id,
  exit_visit_id as trip_start_visit_id,
  entry_visit_id as trip_end_visit_id,
  concat(
    ssvid,
    '-',
    format('%012x', timestamp_diff(exit, timestamp('1970-01-01'), MILLISECOND))
  ) as trip_id
from ranked
where rn = 1