surfriderfoundationeurope / etl

ETL (Extract Transform Load) Data Management process
MIT License
2 stars 0 forks source link

Plastic Origin ETL

Proudly Powered by SURFRIDER Foundation Europe, this open-source initiative is a part of the PLASTIC ORIGINS project - a citizen science project that uses AI to map plastic pollution in European rivers and share its data publicly. Browse the project repository to know more about its initiatives and how you can get involved. Please consider starring :star: the project's repositories to show your interest and support. We rely on YOU for making this project a success and thank you in advance for your contributions.


Welcome to Plastic Origin ETL, an ETL (Extract Transform Load) Data Management process allowing to produce Data that will be leveraged within the Plastic Origin project. This Data will serve to build then analytics and reports of plastic pollution within rivers.

Please note this project is under development and that frequent changes and updates happen over time.

Getting Started

Prerequisites

Before you begin, ensure you have met the following requirements:

Technical stack

Installation

ETL local API

The ETL API can be locally deployed using the Azure function framework.

Pre-requesite:

It's recommended that you use python virtual environement before installing packages with pip. You also have to set the following environment variables: CONN_STRING, PGSERVER, PGDATABASE, PGUSERNAME, PGPWD.

Deploy local ETL API:

cd src/batch/etlAPI/

pip install -r requirements

func start etlHttpTrigger

Build Docker ETL API

The GPS extraction subprocess requires to use binaries like ffmpeg which is not natively available within Python Azure Function. To address this requirement, the ETL Azure Function has been made available as a Docker image. The Docker file to build the image is located here You will need to pass the appropriate credential at run time for the ETL to use correctly Azure Storage and PostGre Database server.

cd src/batch/etlAPI/

docker build -t surfrider/etl:latest .

Run Docker ETL API

cd src/batch/etlAPI/

docker run -p 8082:80 --restart always --name etl -e PGUSERNAME=${PGUSERNAME} -e PGDATABASE=${PGDATABASE} -e PGSERVER=${PGSERVER} -e PGPWD=${PGPWD} -e CONN_STRING=${CONN_STRING} surfrider/etl:latest

Usage

Call local ETL API:

Option:

target: csv or postgre, prediction: json or ai

Port:

7071 for local API, 8082 for docker API

No Surfnet AI
manual: curl --request GET 'http://localhost:<port>/api/etlHttpTrigger?container=manual&blob=<blobname>&prediction=json&source=manual&target=csv&logid=<logid>'
mobile: curl --request GET 'http://localhost:<port>/api/etlHttpTrigger?container=mobile&blob=<blobname>&prediction=json&source=mobile&target=csv&logid=<logid>'
gopro:  curl --request GET 'http://localhost:<port>/api/etlHttpTrigger?container=gopro&blob=<blobname>&prediction=json&source=gopro&target=csv&logid=<logid>'
With Surfnet AI
mobile: curl --request GET 'http://localhost:<port>/api/etlHttpTrigger?container=mobile&blob=<blobname>&prediction=ai&source=mobile&target=csv&aiurl=<aiurl>&logid=<logid>'
gopro:  curl  --request GET 'http://localhost:<port>/api/etlHttpTrigger?container=gopro&blob=<blobname>&prediction=ai&source=gopro&target=csv&aiurl=<aiurl>&logid=<logid>'

ETL Trigger Azure Function

The ETL Trigger Azure function defines additionnaly 3 x functions that will automatically call the ETL API when new media to process are stored in Azure. They used the blob trigger capabilities defined within the function.json. Simplest way for testing is to publish directly to Azure with:

cd src/batch/etlBlobTrigger/

func azure functionapp publish <AZUREFUNCTIONApp>

ETL Deployment Architecture

The ETL is made of three parts:

Recipe for re-running ETL on data

If you want to re-run the full processing from starting data:

There are two steps

trigger_batch_etl_all The `trigger_batch_etl_all` will re-insert the trash in `campaign.trash`. To rerun this process for a given campaign `campaign_id`, you need to: - remove the rows in `campaing.trash` which corresponds to the `campaign_id` - In `logs.etl` row with the corresponding `campaign_id`, set the column "status" to "notprocessed" You may then run the `trigger_batch_etl_all` DAG.
bi processing and postprocessing The `bi-processing` will recompute the different metrics related to the trash and campaign. To rerun this process for a given campaign `campaign_id`, you need to: - In table `campaign.campaign` row `campaign_id`, set the column "has_been_computed" to NULL - Remove the line corresponing to `campaign_id` in `bi_temp.pipelines` You may then run the `bi-processing` DAG which will update the `bi` tables and run `bi-postprocessin`

Contributing

It's great to have you here! We welcome any help and thank you in advance for your contributions.

Maintainers

If you experience any problems, please don't hesitate to ping:

Special thanks to all our Contributors.

License

We’re using the MIT License. For more details, check LICENSE file.