Table of Contents
- What Is Apache Airflow?
- Scheduling Complex Workflows: Why Use Apache Airflow?
- Automate ETL Workflows with Apache Airflow
- Integrate.io Integration
- Going the Extra Mile…
What Is Apache Airflow?
Written in Python, Apache Airflow is an open-source workflow manager used to develop, schedule, and monitor workflows. Created by Airbnb, Apache Airflow is now being widely adopted by many large companies, including Google and Slack.
Being a workflow management framework, Apache Airflow differs from other frameworks in that it does not require exact parent-child relationships. Instead, you only need to define parents between data flows, automatically organizing them into a DAG (directed acyclic graph). This collection of tasks directly reflects a task’s relationships and dependencies, describing how you plan to carry out your workflow.
Apache Airflow is:
- Dynamic
- Scalable
- Extensible
- Elegant
Scheduling Complex Workflows: Why Use Apache Airflow?
Automate ETL Workflows with Apache Airflow
ETL pipelines are one of the most commonly used process workflows within companies today, allowing them to take advantage of deeper analytics and overall business intelligence. By adapting Apache Airflow, companies are able to more efficiently build, scale, and maintain ETL pipelines. Best of all, this workflow management platform gives companies the ability to manage all of their jobs in one place, review job statuses, and optimize available resources.
Integrate.io Integration
Now, having all the setup ready, one might wonder how hard would it be to actually make it production-ready and scale for the whole enterprise. Taking into account all the required infrastructure, server configuration, maintenance and availability, software installation - there’s a lot you need to ensure in order for the scheduler to be reliable. What if someone could take away all these worries and let you focus just on scheduling your jobs?
So, let us now take Integrate.io further with Astronomer.io! Let’s check it things are as easy as they claim:
Starting with the guide available on the page I’ve set up a trial account and created my first Workspace.
It’s now possible to configure the New Deployment and choose appropriate executor:
Let me quote the description from Astronomer.io here:
“Airflow supports multiple executor plugins. These plugins determine how and where tasks are executed. We support the Local Executor for light or test workloads, and the Celery and Kubernetes Executors for larger, production workloads. The Celery Executor uses a distributed task queue and a scalable worker pool, whereas the Kubernetes Executor launches every task in a separate Kubernetes pod.”
Once saved, page redirects to overview and encourages to open Apache Airflow:
As you may figure out, behind the scenes the server is created - you may notice being redirected to a generated web address, which in my case is:
Whole environment is started behind and it may take a moment. Once started (refresh the browser window to verify that), Airflow main screen pops up:
But there are no DAGs! It’s completely empty - beside the scheduler. And there is no UI option to upload your DAGs. In order to do that, we need to follow the CLI quickstart instructions.
Setting DAGs on Astronomer.io
Running WSL on Windows
As long as you’re running a Windows version with Hyper-V enabled, you should be able to accomplish the steps using WSL.
Following the instructions let’s install CLI using
curl -sSL https://install.astronomer.io | sudo bash
This should do all the set up, which can be verified by running the astro command to see if help will be shown:
Let’s create a directory for the project and set it as current path:
mkdir integrate.io && cd integrate.io
Initializing project with astro dev init should return a confirmation message:
Now it should be possible to connect to Astronomer Cloud using:
astro auth login gcp0001.us-east4.astronomer.io
Follow the on screen instructions to log in - either with oAuth or using username/password.
Once done, a confirmation message should be visible:
Successfully authenticated to registry.gcp0001.us-east4.astronomer.io
Make sure to put the Integrate.io API key into .env. While in the project directory, you should now be able to copy your DAGs over to the project, /mnt/c/Astronomer/integrate.io/dag in my case. Next it should be possible to run astro deploy
This command should first give you a choice of deployment and workspaces. In the discussed example there’s just one on the list. As a result, the whole setup should get published to Astronomer.io:
Select which airflow deployment you want to deploy to:
# LABEL DEPLOYMENT NAME WORKSPACE DEPLOYMENT ID
1 Integrate.io quasarian-antenna-4223 Trial Workspace ck3xao7sm39240a38qi5s4y74
> 1
Deploying: quasarian-antenna-4223
quasarian-antenna-4223/airflow
Building image...
Sending build context to Docker daemon 26.62kB
Step 1/1 : FROM astronomerinc/ap-airflow:0.10.3-1.10.5-onbuild
# Executing 5 build triggers
---> Using cache
---> Using cache
---> Using cache
---> Using cache
---> a5866d1769c4
Successfully built a5866d1769c4
Successfully tagged quasarian-antenna-4223/airflow:latest
Pushing image to Astronomer registry
The push refers to repository [registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow]
d2a571c73db1: Pushed
01de691c8f7c: Layer already exists
6dca0d392e56: Layer already exists
097cec956145: Layer already exists
dd314892853b: Layer already exists
4285fcfc2381: Layer already exists
3f4cdd9563bd: Layer already exists
15797b66dbf6: Layer already exists
0f65bcec71fa: Layer already exists
299fd49bdb72: Layer already exists
da37bee05289: Layer already exists
132a2e1367b6: Layer already exists
03901b4a2ea8: Layer already exists
cli-3: digest: sha256:b48933029f2c76e7f4f0c2433c7fcc853771acb5d60c176b357d28f6a9b6ef4b size: 3023
Untagged: registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow:cli-3
Untagged: registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow@sha256:b48933029f2c76e7f4f0c2433c7fcc853771acb5d60c176b357d28f6a9b6ef4b
Deploy succeeded!
root@270c02e5d9d5:/home/astronomer/integrate.io#
So, your Astronomer workspace should now have the new DAG available:
Running on Docker
And while all of the above should happen, none of it actually did - I wasn’t actually able to deploy and running astro deploy from WSL failed as follows:
vic@DESKTOP-I5D2O6C:/c/Astronomer/integrate.io$ astro deploy
Authenticated to gcp0001.us-east4.astronomer.io
Select which airflow deployment you want to deploy to:
# LABEL DEPLOYMENT NAME WORKSPACE DEPLOYMENT ID
1 Integrate.io quasarian-antenna-4223 Trial Workspace ck3xao7sm39240a38qi5s4y74
> 1
Deploying: quasarian-antenna-4223
quasarian-antenna-4223/airflow
Building image...
Cannot connect to the Docker daemon at tcp://localhost:2375. Is the docker daemon running?
Error: command 'docker build -t quasarian-antenna-4223/airflow:latest failed: failed to execute cmd: exit status 1
vic@DESKTOP-I5D2O6C:/c/Astronomer/integrate.io$
Docker deamon will not start on my Windows due to lack of Hyper-V. If you face the same issue - don’t worry! I will not leave you there.
First, pull a ubuntu docker image:
docker pull ubuntu
Next, we’re going to install the Astronomer CLI within the container - just as we did above. Start the container in interactive mode by
docker run -it ubuntu sh -c "bash"
Install CLI using:
curl -sSL https://install.astronomer.io | sudo bash
Create the directory for the project and set it as current path:
mkdir integrate.io && cd integrate.io
Initialize project with astro dev init - and check confirmation message:
root@270c02e5d9d5:/home/astronomer/integrate.io# astro dev init
Initialized empty astronomer project in /home/astronomer/integrate.io
Now it should be possible to connect to Astronomer Cloud using:
astro auth login gcp0001.us-east4.astronomer.io
Follow the on screen instructions to log in - either with oAuth or using username/password.
root@270c02e5d9d5:/home/astronomer/integrate.io# astro auth login gcp0001.us-east4.astronomer.io
CLUSTER WORKSPACE
gcp0001.us-east4.astronomer.io ck3xaemty38yx0a383cmooskp
Switched cluster
Username (leave blank for oAuth):
Please visit the following URL, authenticate and paste token in next prompt
https://app.gcp0001.us-east4.astronomer.io/login?source=cli
oAuth Token:
Obtain and paste the token - it works great - or use username and password.
Once done, a confirmation message should be visible:
Successfully authenticated to registry.gcp0001.us-east4.astronomer.io
Having completed that step it would be great to save the state of the docker container to a new image. Just check the container ID with docker ps
In my case it’s 270c02e5d9d5
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
270c02e5d9d5 ubuntu "sh -c bash" 48 minutes ago Up 48 minutes charming_galileo
So I’ve used the following command to create an image with Astronomer installed
docker commit 270c02e5d9d5 ubuntu:astro
So, now there’s a new image, and it can be seen by running docker images command
$ docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
ubuntu astro 6f7e5bf1b01c 2 hours ago 139MB
ubuntu latest 775349758637 5 weeks ago 64.2MB
Finally. I’ve rerun the container with mounting the DAGs volume that I intend to copy to my integrate.io project created inside the container. In addition, I’ve mounted the docker.sock to allow astro from within the container to reach docker:
docker run -it -v /airflow/dags/:/usr/local/Astronomer/dags/ -v /var/run/docker.sock:/var/run/docker.sock --env-file=env ubuntu:astro sh -c "bash"
Now, one last thing to add before deployment is the API key. I recommend setting it as an environment variable in your Dockerfile, like this:
root@270c02e5d9d5:/home/astronomer/integrate.io# ll
total 60
drwxr-xr-x 1 root root 4096 Dec 9 14:08 ./
drwxr-xr-x 1 root root 4096 Dec 9 12:23 ../
drwxr-x--- 2 root root 4096 Dec 9 10:07 .astro/
-rw-r--r-- 1 root root 38 Dec 9 10:07 .dockerignore
-rw-r--r-- 1 root root 45 Dec 9 12:03 .env
-rw-r--r-- 1 root root 31 Dec 9 10:07 .gitignore
-rw-r--r-- 1 root root 101 Dec 9 14:00 Dockerfile
-rw-r--r-- 1 root root 556 Dec 9 10:07 airflow_settings.yaml
drwxr-xr-x 1 root root 4096 Dec 9 14:07 dags/
drwxr-xr-x 2 root root 4096 Dec 9 10:07 include/
-rw------- 1 root root 62 Dec 9 10:52 nohup.out
-rw-r--r-- 1 root root 0 Dec 9 10:07 packages.txt
drwxr-xr-x 2 root root 4096 Dec 9 10:07 plugins/
-rw-r--r-- 1 root root 0 Dec 9 10:07 requirements.txt
root@270c02e5d9d5:/home/astronomer/integrate.io# more Dockerfile
FROM astronomerinc/ap-airflow:0.10.3-1.10.5-onbuild
ENV xpl_api_key=<your-API-key-here>
Finally, everything is set for deployment! Just run astro deploy:
root@270c02e5d9d5:/home/astronomer/integrate.io# astro deploy
Authenticated to gcp0001.us-east4.astronomer.io
Select which airflow deployment you want to deploy to:
# LABEL DEPLOYMENT NAME WORKSPACE DEPLOYMENT ID
1 Integrate.io quasarian-antenna-4223 Trial Workspace ck3xao7sm39240a38qi5s4y74
> 1
Deploying: quasarian-antenna-4223
quasarian-antenna-4223/airflow
Building image...
Sending build context to Docker daemon 26.62kB
Step 1/2 : FROM astronomerinc/ap-airflow:0.10.3-1.10.5-onbuild
# Executing 5 build triggers
---> Using cache
---> Using cache
---> Using cache
---> Using cache
---> b4f4c9e5cb16
Step 2/2 : ENV xpl_api_key=Vf9ykgM3UCiBsDMUQpkpUyTYsp7uPQd2
---> Running in 0ec9edff34a5
Removing intermediate container 0ec9edff34a5
---> 24232535523f
Successfully built 24232535523f
Successfully tagged quasarian-antenna-4223/airflow:latest
Pushing image to Astronomer registry
The push refers to repository [registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow]
caafa5dbf9af: Pushed
01de691c8f7c: Layer already exists
6dca0d392e56: Layer already exists
097cec956145: Layer already exists
dd314892853b: Layer already exists
4285fcfc2381: Layer already exists
3f4cdd9563bd: Layer already exists
15797b66dbf6: Layer already exists
0f65bcec71fa: Layer already exists
299fd49bdb72: Layer already exists
da37bee05289: Layer already exists
132a2e1367b6: Layer already exists
03901b4a2ea8: Layer already exists
cli-11: digest: sha256:b7d5f8b5b1ba49fb70549c473a52a7587c5c6a22be8141353458cb8899f4159a size: 3023
Untagged: registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow:cli-11
Untagged: registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow@sha256:b7d5f8b5b1ba49fb70549c473a52a7587c5c6a22be8141353458cb8899f4159a
Deploy succeeded!
As you may have noticed, some layers already existed in my case. It shows nicely that in case of subsequent deployments some parts are reused. I needed a few attempts before I was all set. This shows you can perform these steps multiple times in case of issues, so don’t be afraid to experiment!
Once that is done, you can go to Astronomer workspace and execute the DAG:
(Yes - it took quite a few attempts for me to finally have the setup completed! :) )
You should notice a cluster being created by the earlier described Integrate.io wrapper:
and a job started:
After a while you should be able to see all jobs completed both in Integrate.io:
and reflected in Astronomer graph view:
Or the tree view:
Voilà!
Going the Extra Mile…
If you’re curious, you’ve probably noticed that along the way I go some failures. It’s possible to get some details just by pointing the mouse over particular run:
Ok, the tasks “State” says it has failed - quite obviously. With just a click of a button we can get a menu that lets us check the logs:
Well, while it’s really easy to check the logs, in this case it won’t tell us much as the wrapper here is not really expressive:
So, let’s dig a bit deeper and try to investigate Integrate.io. It’s as easy as finding the failed job and choosing “View details”:
This opens up a panel where we can review the variables and errors:
Now it’s pretty obvious why the job has failed:
Input(s):
Failed to read data from "integrate.io://XPLENTY_USER_S3_CONNECTION_9669@integrate.io.dev/mako/breakfast.csv"
The source file was not available. Now you can see how Integrate.io makes failure investigation and root cause analysis super easy!
Integrating Apache Airflow with Integrate.io
Airflow with Integrate.io enables enterprise wide workflows that seamlessly schedule and monitor jobs to integrate with ETL. Integrate.io is a cloud-based, code-free ETL software that provides simple, visualized data pipelines for automated data flows across a wide range of sources and destinations. Schedule a demo here to integrate Integrate.io with Apache Airflow!