Table of Contents 
  • What Is Apache Airflow?
  • Scheduling Complex Workflows: Why Use Apache Airflow?
  • Automate ETL Workflows with Apache Airflow
  • Integrate.io Integration
  • Going the Extra Mile…

What Is Apache Airflow?

Written in Python, Apache Airflow is an open-source workflow manager used to develop, schedule, and monitor workflows. Created by Airbnb, Apache Airflow is now being widely adopted by many large companies, including Google and Slack. 

Being a workflow management framework, Apache Airflow differs from other frameworks in that it does not require exact parent-child relationships. Instead, you only need to define parents between data flows, automatically organizing them into a DAG (directed acyclic graph). This collection of tasks directly reflects a task’s relationships and dependencies, describing how you plan to carry out your workflow. 

Apache Airflow is:

  • Dynamic 
  • Scalable 
  • Extensible 
  • Elegant 

Scheduling Complex Workflows: Why Use Apache Airflow?

Automate ETL Workflows with Apache Airflow

ETL pipelines are one of the most commonly used process workflows within companies today, allowing them to take advantage of deeper analytics and overall business intelligence. By adapting Apache Airflow, companies are able to more efficiently build, scale, and maintain ETL pipelines. Best of all, this workflow management platform gives companies the ability to manage all of their jobs in one place, review job statuses, and optimize available resources. 

Integrate.io Integration

Now, having all the setup ready, one might wonder how hard would it be to actually make it production-ready and scale for the whole enterprise. Taking into account all the required infrastructure, server configuration, maintenance and availability, software installation - there’s a lot you need to ensure in order for the scheduler to be reliable. What if someone could take away all these worries and let you focus just on scheduling your jobs?

So, let us now take Integrate.io further with Astronomer.io! Let’s check it things are as easy as they claim:

Starting with the guide available on the page I’ve set up a trial account and created my first Workspace.

It’s now possible to configure the New Deployment and choose appropriate executor:

Let me quote the description from Astronomer.io here:

“Airflow supports multiple executor plugins. These plugins determine how and where tasks are executed. We support the Local Executor for light or test workloads, and the Celery and Kubernetes Executors for larger, production workloads. The Celery Executor uses a distributed task queue and a scalable worker pool, whereas the Kubernetes Executor launches every task in a separate Kubernetes pod.”

Once saved, page redirects to overview and encourages to open Apache Airflow:

As you may figure out, behind the scenes the server is created - you may notice being redirected to a generated web address, which in my case is:

Whole environment is started behind and it may take a moment. Once started (refresh the browser window to verify that), Airflow main screen pops up:

But there are no DAGs! It’s completely empty - beside the scheduler. And there is no UI option to upload your DAGs. In order to do that, we need to follow the CLI quickstart instructions.

Setting DAGs on Astronomer.io

Running WSL on Windows

As long as you’re running a Windows version with Hyper-V enabled, you should be able to accomplish the steps using WSL.

Following the instructions let’s install CLI using

curl -sSL https://install.astronomer.io | sudo bash

This should do all the set up, which can be verified by running the astro command to see if help will be shown:

Let’s create a directory for the project and set it as current path:

mkdir integrate.io && cd integrate.io

Initializing project  with astro dev init should return a confirmation message:

Now it should be possible to connect to Astronomer Cloud using:

astro auth login gcp0001.us-east4.astronomer.io

Follow the on screen instructions to log in - either with oAuth or using username/password.

Once done, a confirmation message should be visible:

Successfully authenticated to registry.gcp0001.us-east4.astronomer.io

Make sure to put the Integrate.io API key into .env. While in the project directory, you should now be able to copy your DAGs over to the project, /mnt/c/Astronomer/integrate.io/dag in my case. Next it should be possible to run astro deploy

This command should first give you a choice of deployment and workspaces. In the discussed example there’s just one on the list. As a result, the whole setup should get published to Astronomer.io:

Select which airflow deployment you want to deploy to:

#  LABEL   DEPLOYMENT NAME      WORKSPACE DEPLOYMENT ID

1  Integrate.io  quasarian-antenna-4223  Trial Workspace ck3xao7sm39240a38qi5s4y74

> 1

Deploying: quasarian-antenna-4223

quasarian-antenna-4223/airflow

Building image...

Sending build context to Docker daemon 26.62kB

Step 1/1 : FROM astronomerinc/ap-airflow:0.10.3-1.10.5-onbuild

# Executing 5 build triggers

---> Using cache

---> Using cache

---> Using cache

---> Using cache

---> a5866d1769c4

Successfully built a5866d1769c4

Successfully tagged quasarian-antenna-4223/airflow:latest

Pushing image to Astronomer registry

The push refers to repository [registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow]

d2a571c73db1: Pushed

01de691c8f7c: Layer already exists

6dca0d392e56: Layer already exists

097cec956145: Layer already exists

dd314892853b: Layer already exists

4285fcfc2381: Layer already exists

3f4cdd9563bd: Layer already exists

15797b66dbf6: Layer already exists

0f65bcec71fa: Layer already exists

299fd49bdb72: Layer already exists

da37bee05289: Layer already exists

132a2e1367b6: Layer already exists

03901b4a2ea8: Layer already exists

cli-3: digest: sha256:b48933029f2c76e7f4f0c2433c7fcc853771acb5d60c176b357d28f6a9b6ef4b size: 3023

Untagged: registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow:cli-3

Untagged: registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow@sha256:b48933029f2c76e7f4f0c2433c7fcc853771acb5d60c176b357d28f6a9b6ef4b

Deploy succeeded!

root@270c02e5d9d5:/home/astronomer/integrate.io#

So, your Astronomer workspace should now have the new DAG available:

Running on Docker

And while all of the above should happen, none of it actually did - I wasn’t actually able to deploy and running astro deploy from WSL failed as follows:

vic@DESKTOP-I5D2O6C:/c/Astronomer/integrate.io$ astro deploy

Authenticated to gcp0001.us-east4.astronomer.io

Select which airflow deployment you want to deploy to:

#  LABEL   DEPLOYMENT NAME      WORKSPACE DEPLOYMENT ID

1  Integrate.io  quasarian-antenna-4223  Trial Workspace ck3xao7sm39240a38qi5s4y74

> 1

Deploying: quasarian-antenna-4223

quasarian-antenna-4223/airflow

Building image...

Cannot connect to the Docker daemon at tcp://localhost:2375. Is the docker daemon running?

Error: command 'docker build -t quasarian-antenna-4223/airflow:latest failed: failed to execute cmd: exit status 1

vic@DESKTOP-I5D2O6C:/c/Astronomer/integrate.io$

Docker deamon will not start on my Windows due to lack of Hyper-V. If you face the same issue - don’t worry! I will not leave you there.

First, pull a ubuntu docker image:

docker pull ubuntu

Next, we’re going to install the Astronomer CLI within the container - just as we did above. Start the container in interactive mode by

docker run -it ubuntu sh -c "bash"

Install CLI using:

curl -sSL https://install.astronomer.io | sudo bash

Create the directory for the project and set it as current path:

mkdir integrate.io && cd integrate.io

Initialize project  with astro dev init - and check confirmation message:

root@270c02e5d9d5:/home/astronomer/integrate.io# astro dev init

Initialized empty astronomer project in /home/astronomer/integrate.io

Now it should be possible to connect to Astronomer Cloud using:

astro auth login gcp0001.us-east4.astronomer.io

Follow the on screen instructions to log in - either with oAuth or using username/password.

root@270c02e5d9d5:/home/astronomer/integrate.io# astro auth login gcp0001.us-east4.astronomer.io

CLUSTER              WORKSPACE

gcp0001.us-east4.astronomer.io   ck3xaemty38yx0a383cmooskp

Switched cluster

Username (leave blank for oAuth):

Please visit the following URL, authenticate and paste token in next prompt

https://app.gcp0001.us-east4.astronomer.io/login?source=cli

oAuth Token:

Obtain and paste the token - it works great - or use username and password.

Once done, a confirmation message should be visible:

Successfully authenticated to registry.gcp0001.us-east4.astronomer.io

Having completed that step it would be great to save the state of the docker container to a new image. Just check the container ID with docker ps

In my case it’s 270c02e5d9d5

CONTAINER ID    IMAGE COMMAND CREATED      STATUS PORTS NAMES

270c02e5d9d5    ubuntu "sh -c bash"    48 minutes ago   Up 48 minutes          charming_galileo

So I’ve used the following command to create an image with Astronomer installed

docker commit 270c02e5d9d5 ubuntu:astro

So, now there’s a new image, and it can be seen by running docker images command

$ docker images

REPOSITORY           TAG IMAGE ID     CREATED SIZE

ubuntu             astro 6f7e5bf1b01c    2 hours ago 139MB

ubuntu             latest 775349758637    5 weeks ago 64.2MB

Finally. I’ve rerun the container with mounting the DAGs volume that I intend to copy to my integrate.io project created inside the container. In addition, I’ve mounted the docker.sock to allow astro from within the container to reach docker:

docker run -it -v /airflow/dags/:/usr/local/Astronomer/dags/ -v /var/run/docker.sock:/var/run/docker.sock --env-file=env ubuntu:astro sh -c "bash"

Now, one last thing to add before deployment is the API key. I recommend setting it as an environment variable in your Dockerfile, like this:

root@270c02e5d9d5:/home/astronomer/integrate.io# ll

total 60

drwxr-xr-x 1 root root 4096 Dec 9 14:08 ./

drwxr-xr-x 1 root root 4096 Dec 9 12:23 ../

drwxr-x--- 2 root root 4096 Dec 9 10:07 .astro/

-rw-r--r-- 1 root root 38 Dec 9 10:07 .dockerignore

-rw-r--r-- 1 root root 45 Dec 9 12:03 .env

-rw-r--r-- 1 root root 31 Dec 9 10:07 .gitignore

-rw-r--r-- 1 root root 101 Dec 9 14:00 Dockerfile

-rw-r--r-- 1 root root 556 Dec 9 10:07 airflow_settings.yaml

drwxr-xr-x 1 root root 4096 Dec 9 14:07 dags/

drwxr-xr-x 2 root root 4096 Dec 9 10:07 include/

-rw------- 1 root root 62 Dec 9 10:52 nohup.out

-rw-r--r-- 1 root root  0 Dec 9 10:07 packages.txt

drwxr-xr-x 2 root root 4096 Dec 9 10:07 plugins/

-rw-r--r-- 1 root root  0 Dec 9 10:07 requirements.txt

root@270c02e5d9d5:/home/astronomer/integrate.io# more Dockerfile

FROM astronomerinc/ap-airflow:0.10.3-1.10.5-onbuild

ENV xpl_api_key=<your-API-key-here>

Finally, everything is set for deployment! Just run astro deploy:

root@270c02e5d9d5:/home/astronomer/integrate.io# astro deploy

Authenticated to gcp0001.us-east4.astronomer.io

Select which airflow deployment you want to deploy to:

#  LABEL   DEPLOYMENT NAME      WORKSPACE DEPLOYMENT ID

1  Integrate.io  quasarian-antenna-4223  Trial Workspace ck3xao7sm39240a38qi5s4y74

> 1

Deploying: quasarian-antenna-4223

quasarian-antenna-4223/airflow

Building image...

Sending build context to Docker daemon 26.62kB

Step 1/2 : FROM astronomerinc/ap-airflow:0.10.3-1.10.5-onbuild

# Executing 5 build triggers

---> Using cache

---> Using cache

---> Using cache

---> Using cache

---> b4f4c9e5cb16

Step 2/2 : ENV xpl_api_key=Vf9ykgM3UCiBsDMUQpkpUyTYsp7uPQd2

---> Running in 0ec9edff34a5

Removing intermediate container 0ec9edff34a5

---> 24232535523f

Successfully built 24232535523f

Successfully tagged quasarian-antenna-4223/airflow:latest

Pushing image to Astronomer registry

The push refers to repository [registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow]

caafa5dbf9af: Pushed

01de691c8f7c: Layer already exists

6dca0d392e56: Layer already exists

097cec956145: Layer already exists

dd314892853b: Layer already exists

4285fcfc2381: Layer already exists

3f4cdd9563bd: Layer already exists

15797b66dbf6: Layer already exists

0f65bcec71fa: Layer already exists

299fd49bdb72: Layer already exists

da37bee05289: Layer already exists

132a2e1367b6: Layer already exists

03901b4a2ea8: Layer already exists

cli-11: digest: sha256:b7d5f8b5b1ba49fb70549c473a52a7587c5c6a22be8141353458cb8899f4159a size: 3023

Untagged: registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow:cli-11

Untagged: registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow@sha256:b7d5f8b5b1ba49fb70549c473a52a7587c5c6a22be8141353458cb8899f4159a

Deploy succeeded!

As you may have noticed, some layers already existed in my case. It shows nicely that in case of subsequent deployments some parts are reused. I needed a few attempts before I was all set. This shows you can perform these steps multiple times in case of issues, so don’t be afraid to experiment!

Once that is done, you can go to Astronomer workspace and execute the DAG:

(Yes - it took quite a few attempts for me to finally have the setup completed! :) )

You should notice a cluster being  created by the earlier described Integrate.io wrapper:

and a job started:

After a while you should be able to see all jobs completed both in Integrate.io:

and reflected in Astronomer graph view:

Or the tree view:

Voilà!

Going the Extra Mile…

If you’re curious, you’ve probably noticed that along the way I go some failures. It’s possible to get some details just by pointing the mouse over particular run:

Ok, the tasks “State” says it has failed - quite obviously. With just a click of a button we can get a menu that lets us check the logs:

Well, while it’s really easy to check the logs, in this case it won’t tell us much as the wrapper here is not really expressive:

So, let’s dig a bit deeper and try to investigate Integrate.io. It’s as easy as finding the failed job and choosing “View details”:

This opens up a panel where we can review the variables and errors:

Now it’s pretty obvious why the job has failed:

Input(s):

Failed to read data from "integrate.io://XPLENTY_USER_S3_CONNECTION_9669@integrate.io.dev/mako/breakfast.csv"

The source file was not available. Now you can see how Integrate.io makes failure investigation and root cause analysis super easy!

Integrating Apache Airflow with Integrate.io

Airflow with Integrate.io enables enterprise wide workflows that seamlessly schedule and monitor jobs to integrate with ETL. Integrate.io is a cloud-based, code-free ETL software that provides simple, visualized data pipelines for automated data flows across a wide range of sources and destinations. Schedule a demo here to integrate Integrate.io with Apache Airflow!