AirCan is an open source cloud-based system powered by AirFlow for creating, manipulating and running data workflows (aka data pipelines). It is focused on integration with CKAN and the DataHub but can also be used standalone.
It can be used to create any kind of data pipeline and out of the box it provides functionality such as:
In design, it is a lightweight set of patterns and services built on top of widely adopted open source products such as AirFlow and Frictionless.
For CKAN, this an evolution of DataPusher and Xloader (some core code is similar but it is much improved with the runner now being AirFlow).
AirCan consists of 2 parts
aircan
directory. They get loaded into your AirFlow instance.In addition, if you want to use it with CKAN you will want:
How it works in e.g. CKAN:
aircan_submit(dag_id, run_id, config)
aircan_status
: which calls AirFlow for "is it running" + Stackdriver for logsckanext-aircan
on your CKAN instance. See the installation instructions in https://github.com/datopian/ckanext-aircanInstall and setup Airflow (https://airflow.apache.org/docs/stable/installation.html):
export AIRFLOW_HOME=~/airflow
pip install apache-airflow
airflow initdb
Note: On recent versions of Python (3.7+), you may face the following error when executing airflow initdb
:
ModuleNotFoundError: No module named 'typing_extensions'
This can be solved with pip install typing_extensions
.c
Then, start the server and visit your Airflow admin UI:
airflow webserver -p 8080
By default, the server will be accessible at http://localhost:8080/
as shown in the output of the terminal where you ran the previous command.
Open your airflow.cfg
file (usually located at ~/airflow/airflow.cfg
) and point your DAG folder to AirCan:
dags_folder = /your/path/to/aircan
auth_backend = airflow.api.auth.backend.basic_auth
dag_run_conf_overrides_params = True
...other configs
Note: do not point dags_folder
to /your/path/to/aircan/aircan/dags
. It must be pointing to the outer aircan
folder
Verify that Airflow finds the DAGs of Aircan by running airflow list_dags
. The output should list:
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
ckan_api_load_multiple_steps
...other DAGs...
Make sure you have these environment variables properly set up:
export LC_ALL=en_US.UTF-8
export LANG=en_US.UTF-8
export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
Run the Airflow webserver (in case you have skipped the previous example): airflow webserver
Run the Airflow scheduler: airflow scheduler
. Make sure the environment variables from (3) are set up.
Access the Airflow UI (http://localhost:8080/
). You should see the DAG ckan_api_load_single_step
listed.
Activate the DAG by hitting on the Off button on the interface (It will switch the button state to On).
We recommend Google Cloud Composer but there are plenty of other options. Documentation here is for Google Cloud Composer.
Sign up for an account at https://cloud.google.com/composer. Create or select an existing project at Google Cloud Platform. For this example, we use one called aircan-test-project
.
Create an environment at Google Cloud Composer, either by command line or by UI. Make sure you select Python 3 when creating the project. Here, we create an environment named aircan-airflow
.
After creating your environment, it should appear in your environment list:
Override the configuration for dag_run_conf_overrides_params
:
Access the designated DAGs folder (which will be a bucket). Upload the contents of local/path/to/aircan/aircan
to the bucket:
The contents of the subfolder aircan
must be:
Enter the subdirectory dags
and delete the __init__.py
file on this folder. It conflicts with Google Cloud Composer configurations.
[For CKAN related DAGs] Similarly to what we did on Example 2, access your Airflow instance (created by Google Cloud Composer) and add CKAN_SITE_URL
and CKAN_SYSADMIN_API_KEY
as Variables. Now the DAGs must appear on the UI interface.
Note: If you are using GCP, make sure to enable the following services for your Composer Project:
Also, make sure your service account key (which you can creating by accessing the IAM panel -> Service accounts) must have permissions to read logs and objects from buckets. Enable the following options for the service account (assuming you'll have a setup with StackDriver and Google Cloud Composer):
In this example we'll run an AirCan example to convert a CSV to JSON. The example assumes AirFlow running locally.
Add the DAG to the default directory for Airflow to recognize it:
mkdir ~/airflow/dags/
cp examples/aircan-example-1.csv ~/airflow/
cp examples/csv_to_json.py ~/airflow/dags/
To see this DAG appear in the Airflow admin UI, you may need to restart the server or launch the scheduler to update the list of DAGs (this may take about a minute or two to update, then refresh the page on the Airflow admin UI):
airflow scheduler
Run this DAG:
Enable the dag in the admin UI with this toggle to make it run with the scheduler:
"Trigger" the DAG with this button:
After a moment, check the output. You should see a successful run for this DAG:
Locate the output on disk at ~/airflow/aircan-example-1.json
This assumes:
my-dataset
) Now we can manually run the DAG. On your terminal, run:
airflow trigger_dag ckan_api_load_multiple_steps \
--conf='{ "resource": {
"path": "path/to/my.csv",
"format": "CSV",
"ckan_resource_id": "res-id-123",
"schema": {
"fields": [
{
"name": "Field_Name",
"type": "number",
"format": "default"
}
]
}
},
"ckan_config": {
"api_key": "API_KEY",
"site_url": "URL",
}
}'
Replace the necessary parameters accordingly.
Let's assume you have a resource on https://demo.ckan.org/
with my-res-id-123
as its resource_id. We also assume you have, in the root of your DAG bucket on Google Cloud platform, two files: One CSV file with the resource you want to upload, named r3.csv
, with two columns, field1
and field2
. The other file you must have in the root of your your bucket is r4.json
, an empty JSON file.
Since our DAGs expect parameters, you'll have to trigger them via CLI. For example, to trigger api_ckan_load_single_node
, run (from your terminal):
gcloud composer environments run aircan-airflow \
--location us-east1 \
trigger_dag -- ckan_api_load_single_step \
--conf='{ "resource": {
"path": "path/to/my.csv",
"format": "CSV",
"ckan_resource_id": "res-id-123",
"schema": {
"fields": [
{
"name": "Field_Name",
"type": "number",
"format": "default"
}
]
}
},
"ckan_config": {
"api_key": "API_KEY",
"site_url": "URL",
}
}'
Check Google Cloud logs (tip: filter them by your DAG ID, for example, ckan_api_load_single_step
). It should updload the data of your .csv
file to demo.ckan
successfully.
While running test_ckan_import_to_bq
you have to provide appropriate cloud credentials for operation with BigQuery.
You can get your credentials from here: (https://cloud.google.com/docs/authentication/getting-started).
Please save your credentials as google.json
and include in /tests
directory.
Configure CKAN to automatically load.
ckanext-aircan
.ckan_datastore_loader
).To run all the tests, do:
make test
You can specify the path to single test by using:
make test TESTS_DIR=tests/test_file.py e.g make test TESTS_DIR=tests/test_hybrid_load.py