airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
15.4k stars 3.97k forks source link

New Integration Request: URL Tables #3052

Open jim-barlow opened 3 years ago

jim-barlow commented 3 years ago

Tell us about the new integration you’d like to have

OK this is a slightly strange one, however I often seem to need to snapshot table data from public URLs and stream the content as JSON into BigQuery.

Describe the context around this new integration

Typically I need to take daily snapshots for a historical record of data which is not available via any API.

Describe the alternative you are considering or using

What are you considering doing if you don’t have this integration through Airbyte?

I have tested this code in a notebook and it works fine, and I could spin it up into a scheduled Cloud Function (triggered by PubSub with the config in the payload), however I am keen on trying to use Airbyte as my single point of data ingestion so I thought this might be an interesting lightweight connector which could enable Airbyte to be used as a (limited) very simple web table scraper:

import ssl
import os
import json
import pandas as pd
from datetime import datetime
from google.cloud import bigquery

# specific to this URL due to SSL verification issues
ssl._create_default_https_context = ssl._create_unverified_context

destination_ref = 'beepbeeptechnology.urltable.granada_covid_stats'
source_url = "https://www.juntadeandalucia.es/institutodeestadisticaycartografia/salud/static/resultadosProvincialesCovid_18.html5?prov=18&CodOper=b3_2314&codConsulta=38667"

response = pd.read_html(source_url,encoding='UTF-8')

for response_table in response:
  response_table_json = json.dumps(response_table.to_dict(orient='records'), ensure_ascii=False)
  stream_json_into_bq_with_id(source_url, response_table_json, destination_ref)

Note that this depends on the following function:

def stream_json_into_bq_with_id(id_string: str, json_string: str, destination_ref: str) -> dict:
    """
    Streams a JSON string as text into a single row in a three column (all string) BigQuery table for subsequent decoding, with the following schema:

    Args:   
        id_string (string): row identifier
        json_string (string): data payload
        destination_ref (string): BigQuery table destination reference (project_id.dataset_id.table_name)

    Returns:
        A dict containing 
        status (string): "success" for successful stream, "error" for a failed stream or "fail" for other failure.
        message (string): Exception or error message if applicable
    """
    try:
        current_time = datetime.now()
        current_timestamp = current_time.strftime("%Y-%m-%d %H:%M:%S.%f")

        project_id = os.environ.get('GCP_PROJECT')
        BQ = bigquery.Client(project=project_id)
        table = BQ.get_table(destination_ref)

        rows_to_insert = [(id_string, json_string, current_timestamp)]

        errors = BQ.insert_rows(
            table, rows_to_insert, row_ids=[None] * len(rows_to_insert)
        )

        stream_log: dict 
        if len(errors) == 0:
            stream_log = {"status": "success", "message": "no errors"}
        else:
            error_string = json.dumps(errors)
            stream_log = {"status": "error", "message": error_string}

    except Exception as e:
        stream_log = {"status": "fail", "message": e}

    return stream_log

┆Issue is synchronized with this Asana task by Unito

marcosmarxm commented 3 years ago

@jimbeepbeep amazing! are you willing to transfer your code to a connector? I can help you set up and how to do that if you like

jim-barlow commented 3 years ago

Sure, I do have some time reserved for learning how to build connectors this week (I'm on the CDK beta), do you have any other docs you could point me towards or should that be ok to start there?