A CKAN extension for integrating the AirFlow-based AirCan Data Factory into CKAN. Specifically, this extension provides:
There are two potential cases:
git@github.com:datopian/ckanext-aircan.git
aircan_connector
to CKAN__PLUGINS
list in your.env
. Make sure to disable datapusher
and xloader
if you have them there as AirCan replaces them..env
file add CKAN__AIRFLOW__URL
according to Apache AirFlow REST API Reference. If you are running CKAN in a Docker container, make sure to specify the Airflow URL with host.docker.internal
instead of localhost
: CKAN__AIRFLOW__URL=http://host.docker.internal:8080/api/experimental/dags/ckan_api_load_multiple_steps/dag_runs
. Also notice Airflow requires, by default, the endpoint api/experimental/dags/DAG_ID
for API access.CKAN__AIRFLOW__USERNAME=airflow_amin_username
and CKAN__AIRFLOW__PASSWORD=airflow_admin_password
.env
file, specify a temporary directory for files: CKAN__AIRFLOW__STORAGE_PATH=/tmp/
and CKAN__AIRFLOW__CLOUD=local
. Assuming you already have a Google Cloud Composer properly set up, it is possible to trigger a DAG on GoogleCloud Platform following these steps:
Set up the following environment variables on your .env
file:
CKAN__AIRFLOW__CLOUD=GCP # this line activates the integration with GCP
CKAN__AIRFLOW__CLOUD__PROJECT_ID=YOUR_PROJECT_ID_ON_COMPOSER
CKAN__AIRFLOW__CLOUD__LOCATION=us-east1_OR_OTHER
CKAN__AIRFLOW__CLOUD__COMPOSER_ENVIRONMENT=NAME_OF_COMPOSER_ENVIRONMENT
CKAN__AIRFLOW__CLOUD__WEB_UI_ID=ID_FROM_AIRFLOW_UI_ON_COMPOSER
CKAN__AIRFLOW__CLOUD__GOOGLE_APPLICATION_CREDENTIALS={ YOUR SINGLE LINE CREDENTIALS JSON FILE }
Make a request to http://YOUR-CKAN:5000/api/3/action/aircan_submit?dag_name=DAG_NAME
, specifying your CKAN_API_KEY
on the header and send the following information on the body of the request, replacing the values accordingly:
{
"package_id": "YOUR_PACKAGE_ID",
"url": "http://url.for.your.resource.com",
"description": "This is the best resource ever!" ,
"schema": {
"fields": [
{
"name": "FID",
"type": "int",
"format": "default"
},
{
"name": "another-field",
"type": "float",
"format": "default"
}
]
}
}
Replace dag_name
with the DAG you want to invoke, for example, http://YOUR-CKAN:5000/api/3/action/aircan_submit?dag_name=ckan_api_load_gcp
. This will trigger the DAG ckan_api_load_gcp
.
NB: the DAG ckan_api_load_gcp
is designed for Google Cloud Composer AirFlow instance and will load a resource into the DataStore.
The endpoint http://YOUR-CKAN:5000/api/3/action/resource_create
produces the same effect of http://YOUR-CKAN:5000/api/3/action/aircan_submit?dag_name=DAG_NAME
. Make sure you set up an extra variable on your .env
file specifying the DAG you want to trigger:
# .env
# all other variables
CKAN__AIRFLOW__CLOUD__DAG_NAME=DAG_YOU_WANT_TO_TRIGGER
ckanext.aircan.load_with_postgres_copy=True
env to load with postgres copy loader. By default it loads with datastore API. ckanext.aircan.datastore_chunk_insert_rows_size=300
env variable to configure number of records to send a request to datastore. Default 250 rows.append_or_update_datastore = true
if new data schema matches with old schema append or update data, otherwise create new tableckanext.aircan.enable_datastore_upload_configuration=true
to enable the upload configuration UI option.ckanext.aircan.notification_to = author, maintainer, editor, someone@gmail.com
failure email notification sent to. ckanext.aircan.notification_from = sender@gmail.com
failure notification from email.ckanext.aircan.notification_subject
configure notification subject.The aircan_status_update
API can be use to store or update the run status for given resource. It accepts the POST request with authorized user.
{
"resource_id": "a4a520aa-c790-4b53-93aa-de61e1a2813c",
"state": "progress",
"message":"Pusing dataset records.",
"dag_run_id":"394a1f0f-d8b3-47f2-9a51-08732349b785",
"error": {
"message" : "Failed to push data records."
}
}
Use aircan_status
API to get aircan run status for given resource providing resource id.
eg.
http://YOUR-CKAN:5000/api/3/action/aircan_status
{
"resource_id": "a4a520aa-c790-4b53-93aa-de61e1a2813c"
}
Test the aircan-connector with cypress.
npm install
Opens up the cypress app and you can choose the specs to run.
npm test