Integrate.ioは、様々なソースから効率的にデータを抽出、変換、保存するための機能を提供します。Chartioは、データの探索と分析を可能にするVisual SQL機能を提供しています。さらに、チャートやメトリクスをダッシュボードに並べて共有できる機能も備えています。これらのツールは両方を使うことで相乗効果が期待できます。

この記事では、Integrate.ioでChartioでデータを使用するための設定方法を説明します。さらに次の記事では、Integrate.ioが準備したデータをChartioで可視化する方法を説明します。

なぜIntegrate.ioとChartioを使うのか?

Integrate.ioは、直感的なグラフィックインターフェースを通して、(複雑な)ETLパイプラインを作成することができます。Integrate.ioは、Rest APIや様々な統合対象からデータを引き出す機能も提供しています。例えば、Facebook、Salesforce、Google Analytics、Linkedinなどのサードパーティのプラットフォームからデータを取得し、ユースケースに基づいて変換し、Chartioがサポートするデータソースに保存するためにIntegrate.ioを使用することができます。良い点は、Integrate.ioがこれらの多くをサポートしていることで、Integrate.ioで構築されたあらゆるデータパイプラインにChartioを接続することが非常に簡単にできるということです。これは、Chartioを通して可視化できるデータの可能性を大きく広げてくれます。

 

Integrate.ioとChartioを一緒に使う

手順を理解するために、実際にタスクを設定して、この2つのプラットフォームを使って解決してみましょう。

マーチャントが商品を販売するために使用する仮想のマーケットプレイスプラットフォームを考えてみましょう。ほとんどのプラットフォームと同様に、このプラットフォームは、RESTful APIを介して適切なデータ(顧客、トラフィック、注文について)をマーチャントに提供します。このデータを分析的な観点から Chartio で可視化したいと考えています。

API は認証に Basic認証スキームを使用し、エンドポイントはこれら 2 つのクエリパラメータをサポートしています。

  • since パラメータ:この値は ISO フォーマットの日付のタイムスタンプです。このパラメータの目的は、指定した時刻以降に作成されたデータを取得することです。
  • limit パラメータ:ページングされたAPIコールごとに受信するデータオブジェクトの数を指定します。

ページネーションについては、APIはLinkヘッダ標準に従います。また、API にはスロットリング機構があり、1分間に 10回の呼び出ししか許可されておらず、1日に最大1000リクエストまでとなっています。API は Content-Type ヘッダを渡す必要があり、API が "Orders" エンドポイントに対して返す JSON レスポンスは以下のようになります。

{
"orders": [
  ...
{
      "id": 29393,
      "total_price": 100,
      "tax_lines": [
        // These objects themselves are nested and so on.
      ]
      ...
    },
  {
      ...
    },
    ... 
  ]
}

APIレスポンスの完全なサンプルはこちらでご覧いただけます。 

ソリューションの実装

概要レベルのデザイン

thumbnail image

マーケットプレイスのプラットフォームからAPIを使ってデータを取得し(上記のように)、Chartioがサポートするデータウェアハウスに格納します。データの保存先には様々なオプションがあります。今回は、分析・ビジネスインテリジェンスシステム向けに高速なパフォーマンスを提供することで知られるSingleStore(旧名:MemSQL)を採用します。SingleStoreは、MySQLクライアントソフトウェアと互換性があります。同じWireプロトコルを使用し、MySQLと同様のSQL構文をサポートしています。私たちは、Integrate.ioパイプラインのデスティネーションとChartioのソースとしてシームレスに機能するために、SingleStoreで管理されたクラスタを使用します。

APIレスポンスには多くのフィールドが含まれているため、関連するフィールドをフィルタリングする必要があります。データは配列を値として含むフィールドから構成されているので(例えばtax_linesフィールド)、それらにフラット化処理を適用して、正規化し、別のSQLテーブルに格納する必要があります。完全に公平に言うと、ChartioにもいくつかのJSON抽出機能がありますが、データを保存する前に一度データパイプライン内でJSONをパースしてからデータを保存した方がはるかに効率的です。この記事で説明したビジュアライゼーションでは、"Orders "リソースレスポンスのトップレベルフィールドと、配列に入れ子になっているtax line情報を抽出する必要があります。他のフィールドも同様のアプローチで抽出することができます。

そこで最初のマイルストーンは、マーケットプレイスAPIからデータを取得し、適切な変換を適用し、それをSingleStoreクラスタに保存することができるパイプラインをIntegrate.ioに実装することです。

Integrate.io パイプラインの構築

上記のREST APIからデータを引き出すには、認証、ページネーション、レート制限、入れ子になったJSONデータの処理を実装する必要があります。幸いなことに、Integrate.ioのREST APIソースコンポーネントはこれらの機能をすべて提供しており、数分でユーザーフレンドリーなインターフェースを介して設定することができます。REST APIソースコンポーネントとは別に、パイプラインにはさらに3種類のコンポーネントが必要になります。

 

これらのコンポーネントを適切に組み合わせると、最終的なパイプラインは以下のようになります。

thumbnail image

このパイプラインは、データを2つのテーブル(orderとorder_tax_lines)に書き込みます。以下では、パイプラインで使用する各コンポーネントの構成を示します。

thumbnail image

1) Rest APIソースコンポーネント

API認証用にユーザー名とパスワードの詳細が提供されます。 

thumbnail image

APIのURLとcontent-typeヘッダーの値を追加しました。APIではページネーションにLink ヘッダを使用しているため、ページネーションのスキームは「Link Headers」に設定しています。レート制限については、ページネーションされたリクエストのスリープ間隔および上限を指定しています。基本レベルのJSONパス式を追加して、Baseレスポンスから "order "配列の項目のみをキャプチャするようにしました。次に、"Response Schema "セクションのフィールドは、そのフィールドのデータ型と共にIntegrate.ioによって自動的に検知されます。

注意深く観察すると、URLは3つのパッケージ変数を利用しています。api_urlはAPIのベースURL、limit_sizeは1つのページが返すレコード数、最後にlast_updatedはパッケージを頻繁に実行しているときにデータをインクリメンタルに読み込むために使用します。次のスクリーンショットにこれらの変数の値を示します。

thumbnail image

last_updated変数の値は以下のように設定されています。

ToString(ToDate(CASE WHEN (COALESCE($_PACKAGE_LAST_SUCCESSFUL_JOB_SUBMISSION_TIMESTAMP,'')=='' OR $full_load==1) THEN '1900-01-01T00:00:00Z' ELSE $_PACKAGE_LAST_SUCCESSFUL_JOB_SUBMISSION_TIMESTAMP END),'yyyy-MM-dd\\'T\\'HH:mm:ss')

最初は複雑すぎると思われるかもしれませんが、ドキュメントにメソッドの詳細な説明があります。この変数には、最後にロードに成功した日付が格納され、APIからのデータのインクリメンタルフェッチに使用されます。 

2) Selectコンポーネント

パイプラインのparse_ordersとparse_taxという2つのコンポーネントは、データオブジェクトから目的のフィールドを選択し、そのデータ型を指定するために使用されます。 

thumbnail image
thumbnail image

コンポーネントのflatten_taxとmap_order_taxは、その名が示すように、tax_linesフィールドの配列から項目を抽出し、それらにorder_idを関連付けるために使用され、後でデータベース内で注文レコードと税レコードを互いにマッピングできるようにします。

thumbnail image

flatten 関数は、値の配列を受け取り、ネスト化されたレベルを取り除き、配列内の各項目がそれぞれのレコードに表示されるようにします。

thumbnail image

JsonStringToMap関数は文字列オブジェクト(ここではtax_line)を受け取り、それをキーと値のペアのマップに変換し、map#'fieldname'構文を使ってフィールドを抽出する関数です。 

3) データベース(送信先)

データベースの保存先については、Integrate.ioでSingleStoreデータベース接続を設定する必要があります。SingleStoreデータベースを設定するには、セルフマネージドバージョンをマシンにインストールするか、クラウド上で完全に管理されたインスタンスを起動するかのいずれかの方法があります。このブログで紹介する方法としては、後者の方が早いので、後者のアプローチを採用しています。管理されたSingleStoreクラスタの場合、接続の詳細(ホスト名、ユーザー名など)はSingleStoreポータルで確認できます。

thumbnail image

前述したように、SingleStoreはMySQLと同じプロトコルを使用しているので、MySQLコネクターでクラスタの詳細を設定することができます。 

thumbnail image

接続を設定したら、データベースの宛先コンポーネントを設定する必要があります。

thumbnail image

thumbnail image

パイプラインでデータベースにデータをインクリメンタルに更新したいので、テーブル名を追加し、操作タイプを「Merge with existing data using the delete and insert(DeleteとInsertを使用して既存のデータとマージ)」を指定しました。  データベースに書き込むカラムは、オートフィルオプションを使用して自動入力することができます。あとはデータ更新時にデータベース・テーブル内のエントリを一意に識別するのに使用する主キーまたは複合キー(複数のフィールドを選択)を構成するカラムを選択するだけです。

パイプラインのテスト

実装されたら、ジョブを作成してパッケージを検証し、実行することができます。ジョブの状態はIntegrate.ioのダッシュボードから監視することができ、ジョブが終了したら、SingleStore studioのコンソールでSQLクエリを実行することで、データが適切に保存されているかどうかを検証することができます。

thumbnail image

thumbnail image

これで、ChartioのためのIntegrate.ioの設定が完了しました。次のステップは、「Part 2: Chartioでのデータの可視化」を参照してください。