NYCPlanning / db-safegraph

SafeGraph ETL
4 stars 1 forks source link

code snippet for athena #12

Closed SPTKL closed 3 years ago

SPTKL commented 3 years ago

will need to wrap this in a function to make it easier to use

import boto3
import time

client = boto3.client(
    'athena', 
    region_name = 'XXXXXX', 
    aws_access_key_id = 'XXXXXX', 
    aws_secret_access_key = 'XXXXXX', 
    aws_session_token = 'XXXXXX')
session = boto3.Session(
    aws_access_key_id='XXXXXX',
    aws_secret_access_key='XXXXXX',
)
s3 = session.resource('s3')

queryStart = client.start_query_execution(
    QueryString = '''
    SELECT   
        a.safegraph_place_id,
        b.naics_code,
        a.poi_cbg,
        SUBSTR(a.poi_cbg,1,5) AS countyfips,
        a.visits_by_day,
        DATE(from_iso8601_timestamp(a.date_range_start)) AS date_range_start,
        DATE(from_iso8601_timestamp(a.date_range_end)) AS date_range_end
    FROM "safegraph"."monthly_patterns" a
    LEFT JOIN (
      SELECT distinct safegraph_place_id, naics_code
      FROM "safegraph"."core_poi"
      WHERE region = 'NY'
    ) b  
    ON a.safegraph_place_id=b.safegraph_place_id
    WHERE SUBSTR(a.poi_cbg,1,5) IN ('36085','36081','36061','36047','36005')
    ''', 
    QueryExecutionContext = {
      'Database': "safegraph"
    },
    ResultConfiguration = {
      #query result output location you mentioned in AWS Athena
      "OutputLocation": "s3://recovery-data-partnership/tmp/"
    }
  )

#executes query and waits 3 seconds
queryId = queryStart['QueryExecutionId']
time.sleep(3)

#copies newly generated csv file with appropriate name
#query result output location you mentioned in AWS Athena
queryLoc = "recovery-data-partnership/tmp/" + queryId + ".csv"

#destination location and file name
s3.Object("recovery-data-partnership", "output_monthly_patterns/for_te.csv").copy_from(CopySource = queryLoc)

#deletes Athena generated csv and it's metadata file
response = s3.delete_object(
    Bucket='recovery-data-partnership',
    Key=queryId+".csv"
)
response = s3.delete_object(
    Bucket='recovery-data-partnership',
    Key=queryId+".csv.metadata"
)
print('{file-name} csv generated')
SPTKL commented 3 years ago

Unesting json for social distancing

select map_keys(a), map_values(a) from (
select CAST(json_parse( '{"060379203364":2,"060376039003":1,"060377030013":1,"150030084082":2,"060371275202":1,"060373015022":2,"060371113013":1,"060373101003":2,"061110079043":1,"061110049021":1,"060373107022":2,"060133522012":1,"060371245002":1,"060371397014":1,"060379200401":1,"060371279201":2,"060371275201":1,"150030097031":1,"060371310102":2,"060373001004":1,"060373012062":1,"060371197002":2,"060371133032":3,"060290032021":1,"060590992412":1,"060373025063":1,"060371066411":1,"060650438132":2,"060371393012":1,"060374818002":1,"060371239013":2,"060371112011":2,"060371197003":3,"060371153012":1,"060371066452":91,"060377004003":1,"060371021051":2,"060373012053":1,"060371211023":2,"490439644011":4,"060373021025":2,"060710099041":1,"060371201052":1,"150030097013":2,"060371133012":2,"060371272202":1,"060372643013":1,"060372911201":1,"060371234201":1,"060372674032":1,"060371219002":1,"060830016043":1,"490439643082":1,"060371082012":2,"060371066423":1,"490211101004":2,"060375321011":2,"060371234101":1,"060371905101":1,"060371203003":3,"060371066462":1,"060376205013":1,"060371279102":1,"060371092003":1,"060379201041":1,"060371081021":1,"060371093001":1,"060590994132":1,"061110001001":1,"490439644021":1,"060379012091":1,"060373025061":1,"060371091002":2,"490439644022":1,"060371393031":2,"320030067001":2,"060590219234":4,"060379100023":1,"060373017011":1,"060830030041":1,"060371392003":2,"060371916101":1,"060376038012":1,"060371111002":1,"060373023012":1,"060379201152":1,"060371151042":1,"060371066461":3,"061110031001":1,"060371153021":1,"060290033062":1,"060373007022":1,"060374816061":1,"060371066032":1,"060371916201":1,"060190064026":1,"060710113002":3,"040130927212":1,"150030086091":2,"060371220002":4,"060379203291":1}') AS  map<varchar, varchar>) as a) b

Unesting arrays: https://docs.aws.amazon.com/athena/latest/ug/flattening-arrays.html

WITH
dataset AS (
  SELECT
    'engineering' as department,
     ARRAY[
      MAP(ARRAY['first', 'last', 'age'],ARRAY['Bob', 'Smith', '40']),
      MAP(ARRAY['first', 'last', 'age'],ARRAY['Jane', 'Doe', '30']),
      MAP(ARRAY['first', 'last', 'age'],ARRAY['Billy', 'Smith', '8'])
     ] AS people
  )
SELECT names['first'] AS
 first_name,
 names['last'] AS last_name,
 department FROM dataset
CROSS JOIN UNNEST(people) AS t(names)
SPTKL commented 3 years ago
WITH dataset AS (
select map_keys(a) as cbg, a from (
select CAST(json_parse( '{"060379203364":2,"060376039003":1,"060377030013":1,"150030084082":2,"060371275202":1,"060373015022":2,"060371113013":1,"060373101003":2,"061110079043":1,"061110049021":1,"060373107022":2,"060133522012":1,"060371245002":1,"060371397014":1,"060379200401":1,"060371279201":2,"060371275201":1,"150030097031":1,"060371310102":2,"060373001004":1,"060373012062":1,"060371197002":2,"060371133032":3,"060290032021":1,"060590992412":1,"060373025063":1,"060371066411":1,"060650438132":2,"060371393012":1,"060374818002":1,"060371239013":2,"060371112011":2,"060371197003":3,"060371153012":1,"060371066452":91,"060377004003":1,"060371021051":2,"060373012053":1,"060371211023":2,"490439644011":4,"060373021025":2,"060710099041":1,"060371201052":1,"150030097013":2,"060371133012":2,"060371272202":1,"060372643013":1,"060372911201":1,"060371234201":1,"060372674032":1,"060371219002":1,"060830016043":1,"490439643082":1,"060371082012":2,"060371066423":1,"490211101004":2,"060375321011":2,"060371234101":1,"060371905101":1,"060371203003":3,"060371066462":1,"060376205013":1,"060371279102":1,"060371092003":1,"060379201041":1,"060371081021":1,"060371093001":1,"060590994132":1,"061110001001":1,"490439644021":1,"060379012091":1,"060373025061":1,"060371091002":2,"490439644022":1,"060371393031":2,"320030067001":2,"060590219234":4,"060379100023":1,"060373017011":1,"060830030041":1,"060371392003":2,"060371916101":1,"060376038012":1,"060371111002":1,"060373023012":1,"060379201152":1,"060371151042":1,"060371066461":3,"061110031001":1,"060371153021":1,"060290033062":1,"060373007022":1,"060374816061":1,"060371066032":1,"060371916201":1,"060190064026":1,"060710113002":3,"040130927212":1,"150030086091":2,"060371220002":4,"060379203291":1}') AS  map<varchar, varchar>) as a) b
) 
SELECT desti_cbg, a[desti_cbg] as counts
FROM dataset
CROSS JOIN UNNEST(cbg) AS t(desti_cbg)
SPTKL commented 3 years ago
SELECT date_add('day', row_number() over(), date_start) AS date_current, visits
FROM (
  SELECT 
     CAST(SUBSTR('2020-04-01T00:00:00-04:00', 1, 10) AS DATE) as date_start,
     CAST(SUBSTR('2020-05-01T00:00:00-04:00', 1, 10) AS DATE) as date_end,
     cast(json_parse('[0,1,0,0,1,0,2,0,2,6,1,0,0,0,4,0,0,4,2,2,1,5,2,0,0,5,2,0,1,0]') as array<varchar>) as a
) b
CROSS JOIN UNNEST(a) as t(visits)

Athena unnest array and get corresponding dates

SELECT location_name, date_add('day', row_number() over(), date_start) AS date_current, visits
FROM (
  SELECT
     location_name,
     CAST(SUBSTR(date_range_start, 1, 10) AS DATE) as date_start,
     CAST(SUBSTR(date_range_end, 1, 10) AS DATE) as date_end,
     cast(json_parse(visits_by_day) as array<varchar>) as a
  FROM safegraph.monthly_patterns
  WHERE SUBSTR(poi_cbg,1,5) IN ('36085','36081','36061','36047','36005')
  LIMIT 3
) b
CROSS JOIN UNNEST(a) as t(visits)
mgraber commented 3 years ago

Example creating weekday, weekend, and total weekly visit aggregations from monthly_patterns data

WITH daily_visits AS(
SELECT location_name, date_add('day', row_number() over(), date_start) AS date_current, CAST(visits AS SMALLINT) as visits
FROM (
  SELECT
     location_name,
     CAST(SUBSTR(date_range_start, 1, 10) AS DATE) as date_start,
     CAST(SUBSTR(date_range_end, 1, 10) AS DATE) as date_end,
     cast(json_parse(visits_by_day) as array<varchar>) as a
  FROM safegraph.monthly_patterns
  WHERE SUBSTR(poi_cbg,1,5) IN ('36085','36081','36061','36047','36005')
  LIMIT 3
) b
CROSS JOIN UNNEST(a) as t(visits)
)

SELECT
   location_name,
   EXTRACT(year from date_current) as year,
   EXTRACT(week from date_current) as week,
   SUM(CASE WHEN EXTRACT(dow from date_current) NOT IN (0, 6) THEN visits END) as weekday_visits,
   SUM(CASE WHEN EXTRACT(dow from date_current) IN (0, 6) THEN visits END) as weekend_visits,
   SUM(visits) as total_visits
FROM daily_visits
GROUP BY EXTRACT(year from date_current), EXTRACT(week from date_current), location_name
SPTKL commented 3 years ago

closing