mozilla / bigquery-etl

Bigquery ETL
https://mozilla.github.io/bigquery-etl
Mozilla Public License 2.0
253 stars 100 forks source link

add in sleep timers in case of no data, refactor app_conversions_v1 s… #5834

Closed Marlene-M-Hirose closed 3 months ago

Marlene-M-Hirose commented 3 months ago

…cript to deal with null dates.

Sleep timers included due to Microsoft store not liking having pings sent too quickly. If there is no data for an app, and the script immediately goes to the next app, the MS store will throw an error and ask for the user to wait a few seconds before pinging again.

For some reason, the appchannel conversion link returns nulls in the date field as well as others. Python doesn't like that, so these nulls need to be turned into strings before processing can occur

Checklist for reviewer:

For modifications to schemas in restricted namespaces (see CODEOWNERS):

┆Issue is synchronized with this Jira Task

dataops-ci-bot commented 3 months ago

Integration report for "Merge branch 'main' into deng2631_add_sleep_timer_refactor_conversions_script_to_handle_nulls"

sql.diff

Click to expand! ```diff diff -bur --no-dereference --new-file /tmp/workspace/main-generated-sql/sql/moz-fx-data-shared-prod/microsoft_derived/app_acquisitions_v1/query.py /tmp/workspace/generated-sql/sql/moz-fx-data-shared-prod/microsoft_derived/app_acquisitions_v1/query.py --- /tmp/workspace/main-generated-sql/sql/moz-fx-data-shared-prod/microsoft_derived/app_acquisitions_v1/query.py 2024-06-25 15:07:48.000000000 +0000 +++ /tmp/workspace/generated-sql/sql/moz-fx-data-shared-prod/microsoft_derived/app_acquisitions_v1/query.py 2024-06-25 15:07:46.000000000 +0000 @@ -5,6 +5,7 @@ import os import tempfile from argparse import ArgumentParser +from time import sleep import requests from google.cloud import bigquery @@ -114,6 +115,7 @@ } print(url) response = get_response(url, headers, url_params) + print(response) return response @@ -233,12 +235,14 @@ # Ping the microsoft_store URL and get a response json_file = download_microsoft_store_data(date, app["app_id"], bearer_token) query_export = check_json(json_file.text) + print(query_export) if query_export is not None: # This section writes the tmp json data into a temp CSV file which will then be put into a BigQuery table microsoft_store_data = clean_json(query_export, date) data.extend(microsoft_store_data) else: print("no data for today") + sleep(5) upload_to_bigquery(data, project, dataset, table_name, date) diff -bur --no-dereference --new-file /tmp/workspace/main-generated-sql/sql/moz-fx-data-shared-prod/microsoft_derived/app_conversions_v1/query.py /tmp/workspace/generated-sql/sql/moz-fx-data-shared-prod/microsoft_derived/app_conversions_v1/query.py --- /tmp/workspace/main-generated-sql/sql/moz-fx-data-shared-prod/microsoft_derived/app_conversions_v1/query.py 2024-06-25 15:07:48.000000000 +0000 +++ /tmp/workspace/generated-sql/sql/moz-fx-data-shared-prod/microsoft_derived/app_conversions_v1/query.py 2024-06-25 15:07:46.000000000 +0000 @@ -3,8 +3,10 @@ import csv import json import os +import re import tempfile from argparse import ArgumentParser +from time import sleep import requests from google.cloud import bigquery @@ -57,10 +59,17 @@ def read_json(filename: str) -> dict: """Read JSON file.""" with open(filename, "r") as f: + print(f.read()) data = json.loads(f.read()) return data +def change_null_to_string(json_string): + """Change null values in downloaded json string to string.""" + none_string = re.sub("null", '"null"', json_string) + return none_string + + def write_dict_to_csv(json_data, filename): """Write a dictionary to a csv.""" with open(filename, "w") as out_file: @@ -91,48 +100,26 @@ end_date = date token = bearer_token app_id = application_id - groupBy = [ - "date", - "applicationId", - "applicationName", - "customCampaignId", - "channelType", - "storeClient", - "deviceType", - "market", - ] # getting overview metrics for different kpis / Deliverables url = f"https://manage.devcenter.microsoft.com/v1.0/my/analytics/appchannelconversions?applicationId={app_id}" - url_params = f"aggregationLevel=day&startDate={start_date}&endDate={end_date}&skip=0&groupBy={','.join(groupBy)}&orderby=date" + url_params = ( + f"aggregationLevel=day&startDate={start_date}&endDate={end_date}&skip=0" + ) headers = { "Content-Type": "application/x-www-form-urlencoded", "Authorization": f"Bearer {token}", } - print(url) + print(url + "&" + url_params) response = get_response(url, headers, url_params) return response -def check_json(microsoft_store_response_text): - """Script will return an empty dictionary for apps on days when there is no data. Check for that here.""" - with tempfile.NamedTemporaryFile() as tmp_json: - with open(tmp_json.name, "w") as f_json: - f_json.write(microsoft_store_response_text) - try: - query_export = read_json(f_json.name) - except ( - ValueError - ): # ex. json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0) - return None - return query_export - - def clean_json(query_export, date): """Turn the json file into a list to be input into a CSV for bq upload.""" fields_list = [] for val in query_export["Value"]: field_dict = { - "date": val["date"], + "date": date, "application_id": val["applicationId"], "application_name": val["applicationName"], "custom_campaign_id": val["customCampaignId"], @@ -176,12 +163,11 @@ bigquery.SchemaField("store_client", "STRING"), bigquery.SchemaField("device_type", "STRING"), bigquery.SchemaField("market", "STRING"), - bigquery.SchemaField("click_count", "STRING"), - bigquery.SchemaField("conversion_count", "STRING"), + bigquery.SchemaField("click_count", "INT64"), + bigquery.SchemaField("conversion_count", "INT64"), ], ) destination = f"{project}.{dataset}.{table_name}${partition}" - destination = f"{project}.analysis.mhirose_{table_name}${partition}" job = client.load_table_from_file(f_csv, destination, job_config=job_config) print( @@ -223,14 +209,18 @@ print(f'This is data for {app["app_name"]} - {app["app_id"]} ') # Ping the microsoft_store URL and get a response json_file = download_microsoft_store_data(date, app["app_id"], bearer_token) - query_export = check_json(json_file.text) - if query_export is not None: + # For some reason, the date returned from https://manage.devcenter.microsoft.com/v1.0/my/analytics/appchannelconversions? returns the date as null. + # This needs to be changed from a null to a string null. + json_none_string = change_null_to_string(json_file.text) + # Convert the string to a dictionary + query_export = eval(json_none_string) + if query_export["Value"]: # This section writes the tmp json data into a temp CSV file which will then be put into a BigQuery table microsoft_store_data = clean_json(query_export, date) data.extend(microsoft_store_data) else: print("no data for today") - + sleep(5) upload_to_bigquery(data, project, dataset, table_name, date) diff -bur --no-dereference --new-file /tmp/workspace/main-generated-sql/sql/moz-fx-data-shared-prod/microsoft_derived/app_installs_v1/query.py /tmp/workspace/generated-sql/sql/moz-fx-data-shared-prod/microsoft_derived/app_installs_v1/query.py --- /tmp/workspace/main-generated-sql/sql/moz-fx-data-shared-prod/microsoft_derived/app_installs_v1/query.py 2024-06-25 15:07:48.000000000 +0000 +++ /tmp/workspace/generated-sql/sql/moz-fx-data-shared-prod/microsoft_derived/app_installs_v1/query.py 2024-06-25 15:07:46.000000000 +0000 @@ -5,6 +5,7 @@ import os import tempfile from argparse import ArgumentParser +from time import sleep import requests from google.cloud import bigquery @@ -216,6 +217,7 @@ data.extend(microsoft_store_data) else: print("no data for today") + sleep(5) upload_to_bigquery(data, project, dataset, table_name, date) ```

Link to full diff

dataops-ci-bot commented 3 months ago

Integration report for "Merge branch 'main' into deng2631_add_sleep_timer_refactor_conversions_script_to_handle_nulls"

sql.diff

Click to expand! ```diff diff -bur --no-dereference --new-file /tmp/workspace/main-generated-sql/sql/moz-fx-data-shared-prod/microsoft_derived/app_acquisitions_v1/query.py /tmp/workspace/generated-sql/sql/moz-fx-data-shared-prod/microsoft_derived/app_acquisitions_v1/query.py --- /tmp/workspace/main-generated-sql/sql/moz-fx-data-shared-prod/microsoft_derived/app_acquisitions_v1/query.py 2024-06-25 16:55:56.000000000 +0000 +++ /tmp/workspace/generated-sql/sql/moz-fx-data-shared-prod/microsoft_derived/app_acquisitions_v1/query.py 2024-06-25 16:55:52.000000000 +0000 @@ -5,6 +5,7 @@ import os import tempfile from argparse import ArgumentParser +from time import sleep import requests from google.cloud import bigquery @@ -114,6 +115,7 @@ } print(url) response = get_response(url, headers, url_params) + print(response) return response @@ -233,12 +235,14 @@ # Ping the microsoft_store URL and get a response json_file = download_microsoft_store_data(date, app["app_id"], bearer_token) query_export = check_json(json_file.text) + print(query_export) if query_export is not None: # This section writes the tmp json data into a temp CSV file which will then be put into a BigQuery table microsoft_store_data = clean_json(query_export, date) data.extend(microsoft_store_data) else: print("no data for today") + sleep(5) upload_to_bigquery(data, project, dataset, table_name, date) diff -bur --no-dereference --new-file /tmp/workspace/main-generated-sql/sql/moz-fx-data-shared-prod/microsoft_derived/app_conversions_v1/query.py /tmp/workspace/generated-sql/sql/moz-fx-data-shared-prod/microsoft_derived/app_conversions_v1/query.py --- /tmp/workspace/main-generated-sql/sql/moz-fx-data-shared-prod/microsoft_derived/app_conversions_v1/query.py 2024-06-25 16:55:56.000000000 +0000 +++ /tmp/workspace/generated-sql/sql/moz-fx-data-shared-prod/microsoft_derived/app_conversions_v1/query.py 2024-06-25 16:55:52.000000000 +0000 @@ -3,8 +3,10 @@ import csv import json import os +import re import tempfile from argparse import ArgumentParser +from time import sleep import requests from google.cloud import bigquery @@ -57,10 +59,17 @@ def read_json(filename: str) -> dict: """Read JSON file.""" with open(filename, "r") as f: + print(f.read()) data = json.loads(f.read()) return data +def change_null_to_string(json_string): + """Change null values in downloaded json string to string.""" + none_string = re.sub("null", '"null"', json_string) + return none_string + + def write_dict_to_csv(json_data, filename): """Write a dictionary to a csv.""" with open(filename, "w") as out_file: @@ -91,48 +100,26 @@ end_date = date token = bearer_token app_id = application_id - groupBy = [ - "date", - "applicationId", - "applicationName", - "customCampaignId", - "channelType", - "storeClient", - "deviceType", - "market", - ] # getting overview metrics for different kpis / Deliverables url = f"https://manage.devcenter.microsoft.com/v1.0/my/analytics/appchannelconversions?applicationId={app_id}" - url_params = f"aggregationLevel=day&startDate={start_date}&endDate={end_date}&skip=0&groupBy={','.join(groupBy)}&orderby=date" + url_params = ( + f"aggregationLevel=day&startDate={start_date}&endDate={end_date}&skip=0" + ) headers = { "Content-Type": "application/x-www-form-urlencoded", "Authorization": f"Bearer {token}", } - print(url) + print(url + "&" + url_params) response = get_response(url, headers, url_params) return response -def check_json(microsoft_store_response_text): - """Script will return an empty dictionary for apps on days when there is no data. Check for that here.""" - with tempfile.NamedTemporaryFile() as tmp_json: - with open(tmp_json.name, "w") as f_json: - f_json.write(microsoft_store_response_text) - try: - query_export = read_json(f_json.name) - except ( - ValueError - ): # ex. json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0) - return None - return query_export - - def clean_json(query_export, date): """Turn the json file into a list to be input into a CSV for bq upload.""" fields_list = [] for val in query_export["Value"]: field_dict = { - "date": val["date"], + "date": date, "application_id": val["applicationId"], "application_name": val["applicationName"], "custom_campaign_id": val["customCampaignId"], @@ -176,12 +163,11 @@ bigquery.SchemaField("store_client", "STRING"), bigquery.SchemaField("device_type", "STRING"), bigquery.SchemaField("market", "STRING"), - bigquery.SchemaField("click_count", "STRING"), - bigquery.SchemaField("conversion_count", "STRING"), + bigquery.SchemaField("click_count", "INT64"), + bigquery.SchemaField("conversion_count", "INT64"), ], ) destination = f"{project}.{dataset}.{table_name}${partition}" - destination = f"{project}.analysis.mhirose_{table_name}${partition}" job = client.load_table_from_file(f_csv, destination, job_config=job_config) print( @@ -223,14 +209,18 @@ print(f'This is data for {app["app_name"]} - {app["app_id"]} ') # Ping the microsoft_store URL and get a response json_file = download_microsoft_store_data(date, app["app_id"], bearer_token) - query_export = check_json(json_file.text) - if query_export is not None: + # For some reason, the date returned from https://manage.devcenter.microsoft.com/v1.0/my/analytics/appchannelconversions? returns the date as null. + # This needs to be changed from a null to a string null. + json_none_string = change_null_to_string(json_file.text) + # Convert the string to a dictionary + query_export = eval(json_none_string) + if query_export["Value"]: # This section writes the tmp json data into a temp CSV file which will then be put into a BigQuery table microsoft_store_data = clean_json(query_export, date) data.extend(microsoft_store_data) else: print("no data for today") - + sleep(5) upload_to_bigquery(data, project, dataset, table_name, date) diff -bur --no-dereference --new-file /tmp/workspace/main-generated-sql/sql/moz-fx-data-shared-prod/microsoft_derived/app_installs_v1/query.py /tmp/workspace/generated-sql/sql/moz-fx-data-shared-prod/microsoft_derived/app_installs_v1/query.py --- /tmp/workspace/main-generated-sql/sql/moz-fx-data-shared-prod/microsoft_derived/app_installs_v1/query.py 2024-06-25 16:55:56.000000000 +0000 +++ /tmp/workspace/generated-sql/sql/moz-fx-data-shared-prod/microsoft_derived/app_installs_v1/query.py 2024-06-25 16:55:52.000000000 +0000 @@ -5,6 +5,7 @@ import os import tempfile from argparse import ArgumentParser +from time import sleep import requests from google.cloud import bigquery @@ -216,6 +217,7 @@ data.extend(microsoft_store_data) else: print("no data for today") + sleep(5) upload_to_bigquery(data, project, dataset, table_name, date) ```

Link to full diff