Python が誕生してもう30年以上経ちますが、このプログラミング言語がこれほどまでに普及したのは初めてです。データサイエンスとAI の台頭により、Python は ETL パイプラインを構築するデータエンジニアをはじめ、あらゆるデータエンジニアに選ばれています。

ただ、Pythonで ETL パイプラインを構築するのは、気の弱い人には向いていません。並列処理、ロギング、ジョブスケジューリング、データベース接続などの課題に遭遇することになりますからね。でも幸いなことに、このようなプロセスを簡単にするETLツールやパッケージが色々とあるんです。

そこで本記事では、Pythonで ETL パイプラインを構築し、データ統合プロジェクトを変革する方法についてご紹介します。

詳しく読むTop 6 Python ETL Tools for 2021(2021年の Python の ETL ツール6選)

早速データを統合しましょう!Integrate.io の デモのご予約はコチラ

Python での ETLパイプライン構築について知っておくべきこと

ETL パイプラインは、データを単独または複数のソースからデータウェアハウスなどのデータベースに移動させる一連のプロセスです。ETLを実行する方法は複数ありますが、ETL の分野では、Python が圧倒的なシェアを誇っています。

Pythonは1991年に登場しました。グイド・ヴァンロッサム氏によって作られたこのプログラミング言語は、その「使いやすい構文」と「読みやすさ」から、デベロッパーの間で瞬く間に人気となり、この2つの要素により、それまでのプログラム保守に関連する一般的なコストが削減され、データサイエンス界でさらに普及しました。

Python がシンプルなプログラミング言語だと言っているわけではありません。全然違います。

Python の活用には、関連するフレームワークやライブラリの知識が必要なため、全くの初心者だと使いこなすのが難しいかもしれません。タスクの自動化やWebサイトの開発、データ分析などを行うには、何度も練習が必要な言語ですからね。

Python での ETL パイプラインの構築には、特定のスキルも必要です。データエンジニアリングチームのない小さな会社では、このプログラミング言語の深い知識がない限り、複雑なパイプラインをゼロから作成するのは大変かもしれません。ただありがたいことに、ワークフローを管理する Apache Airflow や Luigi、データの移動と処理を行う Pandas、Pygrametl のような自己完結型のツールキットなど、Python のETL パイプラインを簡単に構築できるツールが充実しています。

以下で、このようなリソースの活用法について見ていきましょう。

さらに読むAirflow vs. Luigi: Which ETL is the Best?(Airflow vs. Luigi:どちらのETLがベストか)

早速 Integrate.io のデモを予約して、ぱぱっとデータを統合しちゃいましょう。

Pygrametl

Pygrametl は、一般的な ETL プロセスのための機能を内蔵したオープンソースの Python ETL フレームワークであり、各ディメンションとファクトテーブルが「Python オブジェクト」として提示されるので、ユーザーは多くの一般的な ETL 操作を実行することができます。また、Pygrametl は、2021年5月にフレームワークの最新バージョン(Version 2.7)をリリースしました。

Pygrametl はデフォルトで PostgreSQL を搭載した CPython 上で動作しますが、Python上でも動作するように修正することができます。以下は、ソースコードの例です:

import psycopg2 import pygrametl from pygrametl.datasources import SQLSource, CSVSource from pygrametl.tables import Dimension, FactTable sales_string = "host='10.0.0.12' dbname='sale' user='user' password='pass'" sales_pgconn = psycopg2.connect(sales_string)

この Pygrametl の初心者向けガイド(2021年更新)では、データを抽出してデータウェアハウスに格納する方法について紹介されています。以下は、データベースへの接続を確立する方法を示したソースコードです:

import psycopg2 import pygrametl from pygrametl.datasources import SQLSource, CSVSource from pygrametl.tables import Dimension, FactTable sales_string = "host='10.0.0.12' dbname='sale' user='user' password='pass'" sales_pgconn = psycopg2.connect(sales_string)

psycopg2 は PostgreSQL データベースへ接続をしやすくする Python モジュールであり、ソースに接続する前に、データベース名、ユーザー名、パスワードを含む文字列を psycopg2.connect() にフィードしないといけません。また、対象のデータウェアハウスに接続する際も、この関数を利用することができます:

dw_string = "host='10.0.0.13' dbname='dw' user='dwuser' password='dwpass'" dw_pgconn = psycopg2.connect(dw_string) dw_conn_wrapper = pygrametl.ConnectionWrapper(connection=dw_pgconn)

上記の例では、ユーザーは "sales" という名前のデータベースに接続します。以下は、データベースから特定の属性を抽出するためのコードです:

name_mapping= 'book', 'genre', 'city', 'timestamp', 'sale' sales_source = SQLSource(connection=sales_pgconn, \ query="SELECT * FROM sales", names=name_mapping)

ソースデータベースからデータを抽出した後、ETLの変換ステージに入ることができます。このコード例では、ユーザーは単純な変換を実行する関数を確定しており、その関数は、データベースから1行を入力として受け取り、タイムスタンプ文字列を3つの構成部分(年、月、日)に分割します:

def split_timestamp(row): timestamp = row['timestamp'] timestamp_split = timestamp.split('/') row['year'] = timestamp_split[0] row['month'] = timestamp_split[1] row['day'] = timestamp_split[2]

前述のとおり、Pygrametl はディメンションとファクト・テーブルをすべて個別の Python オブジェクトとして扱います。以下では、ユーザーは、「book」および「time」ディメンションの 3 次元オブジェクトと、これら 2 つのディメンションを格納する Fact Table オブジェクトを作成しています:

book_dimension = Dimension(name='book', key='bookid', attributes= ['book', 'genre']) time_dimension = Dimension(name='time', key='timeid', attributes=['day', 'month', 'year']) fact_table = FactTable(name='facttable', keyrefs=['bookid', 'timeid'], measures=['sale'])

次に、ソースとなる売上データベースの各行を繰り返し処理し、関連する情報を各ディメンジョン・オブジェクトに格納します。”secure "関数は、ディメンション内に行が既に存在するかどうかをチェックし、存在しない場合はそれを挿入します。

for row in sales_source: split_timestamp(row) row['bookid'] = book_dimension.ensure(row) row['timeid'] = time_dimension.ensure(row) fact_table.insert(row)

最後に、このデータをデータウェアハウスにコミットして、接続を終了します:

dw_conn_wrapper.commit() dw_conn_wrapper.close()

Pygrametl は、通常の Python のパワーと表現力と組み合わせて、内蔵された多くの関数を備えた強力な ETL ツールキットを提供します。

Airflow

Pygrametl が本格的な Python ETL フレームワークであるのに対し、Airflow は「ワークフローの自動化によるデータパイプラインの実行」のみを目的としています。Airflow は Airbnb によって開発され、現在では Apache Software Foundation によって管理されているオープンソースのプロジェクトです。Airflow の基本単位は DAG(有向非巡回グラフ)で、実行したい ETL タスク間の関係や依存関係を確定します。

Airflow の開発者は、このツールの機能を示す簡単なチュートリアルを提供しています。(このチュートリアルは、2020年12月に登場した最新版である2.1.3までの Airflow の全バージョンがカバーされています)。まずユーザーは、必要なライブラリをインポートし、DAG内の各タスクのデフォルト引数を確定する必要があります:

from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': days_ago(2), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), }

この引数の意味するところは以下のようになります:

  • Owner:タスクの所有者(オペレーティングシステムでは所有者のユーザー名であることが多い)。
  • Depends_on_past:trueの場合、この引数は、前回の試行で成功しなかった場合はタスクの発生を停止させる。
  • Start_date:タスクの実行開始の日時を指定する。
  • Email:タスク所有者の連絡先メールアドレス。
  • email_on_failure, email_on_retry:タスクが失敗またはリタイアしたときに、タスクのオーナーがメール通知を受け取るかどうかのコントロールをする。
  • Retries:タスクが失敗した後に再試行する回数。
  • retry_delay::再試行するまでの時間。

次に、ETL ワークフローの様々なタスクを格納するDAGオブジェクトを作成します:

dag = DAG( 'tutorial', default_args=default_args, description='A simple tutorial DAG', schedule_interval=timedelta(days=1), )

schedule_interval パラメータは、DAG ワークフローの実行間隔をコントロールします。ここでは1日に設定されており、これは事実上、Airflow が毎日ターゲットデータウェアハウスにデータを格納するということになります。

最後に、ユーザーは簡単なタスクをいくつか定め、それをDAGに追加します:

t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag,) t2 = BashOperator( task_id='sleep', depends_on_past=False, bash_command='sleep 5', retries=3, dag=dag, )

ここでは、タスク「t1」が bashコマンド「date(現在の日付と時刻をコマンドラインに表示)」を実行し、「t2」が bashコマンド「sleep 5(現在のプログラムの実行を5秒間停止させる)」を実行します。

Airflow で、コマンドラインの ETL ジョブをスケジュールしやすくなり、それによって、確実にパイプラインが一貫して必要なデータを抽出、変換、および格納するようにできます。Airflow は他の ETL ツールや Integrate.io などのプラットフォームと統合しやすく、クラウドデータ統合のための自動パイプラインを作成し、スケジュールすることができるのがいいですね。

Airflow の使用は、長い ETL ジョブを実行する場合や、プロジェクトが複数のステップを含む場合が一番便利です。ETLプロセスのどの時点からでも再開することができますが、Airflow はライブラリではなく、デプロイする必要があるため、Airflow は小規模な ETL ジョブには最適な選択ではありません。

さらに読むApache Airflow: Explained (Apache Airflow の説明)

Pandas

Pandas データ分析のための Python ライブラリであり、ETL ツールキットに追加するのに最適なライブラリです。ちなみに最新版の1.3.2は、2021年8月に登場しました。
Pandas ライブラリには、以下のような多くのファイル形式を読み書きする機能があります:

  • Text files
  • CSV files
  • JSON files
  • XML/HTML files
  • Excel (.xlsb) files
  • HDF5 files
  • Parquet files
  • SQL queries
  • Google BigQuery

以下のコードから、JSON ファイルからのデータの取り込みやすさがわかります:

import pandas as pd pd.read_json('test.json')

Pandas の基本単位は DataFrame で、表形式のデータを行と列で格納する2次元データ構造です。DataFrame にデータを読み込むと、Pandas では様々な変換を行うことができます。例えば、pandasで広く使われている「merge」関数は、2つの DataFrame の間で結合操作を行います:

pd.merge(left, right, how='inner', on=None, left_on=None, right_on=None, 

left_index=False, right_index=False, sort=True)

これらの引数の意味するところは以下のとおりです:

  • left, right:結合する必要のある2つの DataFrame
  • How:結合操作のタイプ('inner'、'outer'、'left'、'right')
  • on, left_on, right_on:結合キーとして使用するカラムまたはインデックスレベル(左または右のDataFramesから可能)。
  • left_index, right_index:Trueの場合、左または右のDataFrameのインデックス(行ラベル)を結合キーとして使用。
  • sort:Trueの場合、結果のDataFrameを結合キーで整理。

データの抽出、クリーニング、変換、CSVファイルやExcel、SQLデータベースへの書き込みを行う際には Pandas を使いましょう。

Luigi

Luigi は、複雑なパイプラインの構築ができるオープンソースのツールです。アプリは多数ありますが、Spotify のために作られたものなので、独自のニーズにはあまり適していないかもしれません。ただ Deliveroo などの一部の企業は、長年にわたってこれを採用しています。

Luigiは以下を扱っています:

  • ワークフロー管理
  • 可視化
  • 依存性の解消 
  • コマンドライン統合

Luigi を使うとき、"ノード "こと 「タスク」と、 "エッジ "こと 「ターゲット 」が存在します。タスクはターゲットを消費し、連鎖反応を起こします。

Luigi を活用するには、タスクは基本的なビルディングブロックであるので、きちんと習得しましょう。タスクの作成には、以下のうちの1つまたは全部を含むクラスを作成します:

  • run()
  • requires()
  • output()
  • targets 

多くのパイプラインシステムとは異なり、Luigi は次のノードに情報を転送するプロセスを逆転させます。プログラムは最後のタスクから始まり、そのタスクが実行可能かどうかをチェックします。

このオプションは、ログのような単純な ETL 処理に最適です。Luigi の構造はかなり厳密なので、より複雑なタスクは制限されますが企業向けソリューションを構築するのであれば、Luigi はいい選択かもしれません。

Python で ETLパイプラインを構築する際にIntegrate.io ができること

Python で ETL パイプライン構築のために貴重な時間と労力を割く代わりに、Integrate.io のようなノーコード ETL データ統合プラットフォームを選ぶ企業が増えています。Integrate.io は、信じられないほど幅広い事前構築済みの統合と、簡単なドラッグ & ドロップのビジュアル インターフェースを備えており、データ ウェアハウスへのシンプルかつ強力な ETL パイプラインをこれまで以上に簡単に構築できます。

Integrate.io と Python のどちらかを選ばないといけないわけではないのはよかったですね。例えば Integrate.io の Python ラッパーを使えば、Python プログラム内で Integrate.io の REST API にアクセスすることができ、両方を利用することができます。

Integrate.io の Python Wrapper は簡単に始められます。Integrate.io パッケージをインポートし、アカウント ID と API キーを提供するだけです:

from Integrate.io import Integrate.ioClient account_id ="MyAccountID" api_key = "V4eyfgNqYcSasXGhzNxS" client = Integrate.ioClient(account_id,api_key)

次に、ETL ジョブ用に割り当てたマシンのグループであるクラスタのインスタンス化が必要です:

cluster_type = "production" nodes = 2 name ="New Cluster #199999" description ="New Cluster's Description" terminate_on_idle = False time_to_idle = 3600 cluster = client.create_cluster(cluster_type, nodes, name, description, terminate_on_idle, time_to_idle)

これらの引数の意味するところは以下のとおりです:

  • Cluster_type:ユースケースに応じて「プロダクション」または「サンドボックス」のいずれかを選択。
  • Nodes:クラスタのノード数(2~アカウントで許可された最大値の間)
  • Terminate_on_idle:trueの場合、クラスタがアイドル状態になったときに終了
  • time_to_idle:クラスタがアイドル状態になるまでの時間(秒)

Integrate.io のクラスタにはジョブが含まれます。以下のコードは、Integrate.io のジョブを新規に作成し、実行する方法を示しています:

cluster_id = 83 package_id = 782 variables = {} variables['OUTPUTPATH']="test/job_vars.csv" variables['Date']="09-10-2020"    job = client.add_job(cluster_id, package_id, variables)

Python で ETL パイプラインを構築する際に 、ETL プロジェクトの開始前にデータをクレンジングしてソースデータのサイズを小さくしたり、ニーズに最適な時間にジョブをスケジュールしたり、わかりやすく使いやすいダッシュボードでジョブを監視したり、Integrate.io がお手伝いできる方法は他にもあります。また、Integrate.io は GDPR(EU一般データ保護規則)、CCPA(カリフォルニア州消費者プライバシー法)、HIPAA(医療保険の相互運用性と責任に関する法律)などのデータプライバシー規制にも準拠しています。

さらに、膨大な種類のデータ統合機能があらかじめ用意されていることや、使うコネクタの数に応じて課金されるシンプルな価格設定、世界最高水準のカスタマーサービスなどのメリットがあります。

今年は Pythonで ETL パイプラインを構築しませんか?Integrate.ioの強力なデータ統合プラットフォームがお手伝いします。デモのご予約はコチラ