Tasks
In Airflow, pipelines are represented by directed acyclic graphs (DAGs), Python function decorated with a@dag decorator. DAGs include calls to tasks,
implemented as instances of the Operator class. Operators can perform various
tasks: poll for some precondition, perform extract-load-transform (ETL), or
trigger external systems like Cube.
Integration between Cube and Airflow is enabled by the
airflow-provider-cube package that provides
the following operators.
CubeQueryOperator
CubeQueryOperator is used to query Cube via the
/v1/load endpoint of the REST API.
It supports the following options:
| Option | Type | Default | Description |
|---|---|---|---|
cube_conn_id | string | cube_default | Airflow connection name. |
headers | dict | HTTP headers to be added to the request. | |
query | dict | Cube query object. | |
timeout | int | 30 | Response wait timeout in seconds. |
wait | int | 10 | Interval between API calls in seconds. |
CubeBuildOperator
CubeBuildOperator is used to trigger pre-aggregation builds and check their
status via the /v1/pre-aggregations/jobs endpoint of
the Orchestration API.
It supports the following options:
| Option | Type | Default | Description |
|---|---|---|---|
cube_conn_id | string | cube_default | Airflow connection name. |
headers | dict | HTTP headers to be added to the request. | |
selector | dict | /v1/pre-aggregations/jobs selector. | |
complete | bool | False | Whether a task should wait for builds to complete or not. |
wait | int | 10 | Interval between API calls in seconds. |
Installation
Install Astro CLI installed. Create a new directory and initialize a new Astro project:requirements.txt:
Configuration
Connection
Create an Airflow connection via the web console or by adding the following contents to theairflow_settings.yaml file:
- By default, Cube operators use
cube_defaultas an Airflow connection name. - The connection shoud be of the
generictype. conn_hostshould be set to the URL of your Cube deployment.conn_passwordshould be set to the value of theCUBEJS_API_SECRETenvironment variable.conn_extrashould contain a security context (assecurity_context) that will be sent with API requests.
DAGs
Create a new DAG namedcube_query.py in the dags subdirectory with the
following contents. As you can see, the CubeQueryOperator accepts a Cube query
via the query option.
cube_build.py in the dags subdirectory with the
following contents. As you can see, the CubeBuildOperator accepts a
pre-aggregation selector via the selector option.
complete option. When it’s set to True, the operator
will wait for pre-aggregation builds to complete before allowing downstream
tasks to run.
Running workflows
Now, you can run these DAGs:localhost:8080 (use admin/admin to authenticate):