/home/by-natures/dev*

データ界隈で働くエンジニアとしての技術的なメモと、たまに普通の日記。

2023/07/05 Apache Flowman(YAMLでETL処理が書けるOSSプロダクト)の紹介

Medium で見つけた記事をいくつか紹介しようと思ったのですが、Flowman の記事が長くなったので Docker でのデモも交えながらご紹介します。

Flowman — A Declarative ETL Framework powered by Apache Spark

kupferk.medium.com

Flowman という Apache ライセンス下の ETL フレームワークの紹介です。ピュアな Spark, Cloudera, AWS EMR, Azure Synapse などの分散処理システム上で動く「宣言的な」ETLフレームワークとのこと。記事自体も Flowmanの開発者 Kaya Kupferschmidt 氏によって書かれています。地味なポイントですが公式ページがオシャレなのも良いですね。ドキュメントを読みたくなる…。

Flowman - Declarative ETL Framework powered by Apache Spark

「宣言的」にETLが書けるというのをアピールポイントにしていて、具体的には YAML ファイルを定義することで ETL 処理の記述をすることができます。

# File: flow/mapping/measurements.yml
mappings:
  # This mapping refers to the "raw" relation and reads in data from the source in S3
  measurements_raw:
    kind: relation
    relation: measurements_raw
    partitions:
      year: $year

  # Extract multiple columns from the raw measurements data using SQL SUBSTR functions
  measurements_extracted:
    kind: select
    input: measurements_raw
    columns:
      usaf: "CAST(SUBSTR(raw_data,5,6) AS INT)"
      wban: "CAST(SUBSTR(raw_data,11,5) AS INT)"
      date: "TO_DATE(SUBSTR(raw_data,16,8), 'yyyyMMdd')"
      time: "SUBSTR(raw_data,24,4)"
      report_type: "SUBSTR(raw_data,42,5)"
      wind_direction: "CAST(SUBSTR(raw_data,61,3) AS INT)"
      wind_direction_qual: "SUBSTR(raw_data,64,1)"
      wind_observation: "SUBSTR(raw_data,65,1)"
      wind_speed: "CAST(CAST(SUBSTR(raw_data,66,4) AS FLOAT)/10 AS FLOAT)"
      wind_speed_qual: "SUBSTR(raw_data,70,1)"
      air_temperature: "CAST(CAST(SUBSTR(raw_data,88,5) AS FLOAT)/10 AS FLOAT)"
      air_temperature_qual: "SUBSTR(raw_data,93,1)"

さらには単体テストの機能もついており、擬似データを定義することで各データ処理が正しく行われるかを確認することができます。

# File: flow/test/test-measurements.yml
tests:
  test_measurements:
    environment:
      - year=2013

    overrideMappings:
      measurements_raw:
        # Mocking a data source will pick up the original data schema, and the only thing left to do is providing
        # mocked records
        kind: mock
        records:
          # Provide two records including both the raw data and the partition key
          - year: $year
            raw_data: "042599999963897201301010000I+32335-086979CRN05+004..."
          - year: $year
            raw_data: "013399999963897201301010005I+32335-086979CRN05+004..."

    assertions:
      measurements_extracted:
        kind: sql
        description: "Measurements are extracted correctly"
        tests:
          - query: "SELECT * FROM measurements_extracted"
            expected:
              - [ 999999,63897,2013-01-01,0000,CRN05,124,1,H,0.9,1,10.6,1 ]
              - [ 999999,63897,2013-01-01,0005,CRN05,124,1,H,1.5,1,10.6,1 ]
          - query: "SELECT COUNT(*) FROM measurements_extracted"
            expected: 2

Dev, Prod など環境ごとの差異をプロパティによって吸収することもできたり、書き込みレコード数をメトリクスとして Prometeus などのモニタリングツールに連携することもできます。

ローカルの Docker で触ってみる

起動

Flowman Quick Start Guide - Flowman documentation

Flowman にはインタラクティブにジョブを実行する flowshell と、バッチとして設定する flowexec の2つのコマンドがあります。Dockerで触ってみる上のサンプルでは flowshell を利用したチュートリアルが紹介されています。

% docker run --rm -ti dimajix/flowman:1.0.0-oss-spark3.3-hadoop3.3 bash
Unable to find image 'dimajix/flowman:1.0.0-oss-spark3.3-hadoop3.3' locally
1.0.0-oss-spark3.3-hadoop3.3: Pulling from dimajix/flowman
e756f3fdd6a3: Pull complete 
bf168a674899: Pull complete 
e604223835cc: Pull complete 
6d5c91c4cd86: Pull complete 
5e20d165240e: Pull complete 
1334d60df9a8: Pull complete 
16c2728dcd90: Pull complete 
0eb2d7ff6056: Pull complete 
6dde7ad652df: Pull complete 
7b48840dfae0: Pull complete 
45d6f68b5a92: Pull complete 
cbc35c331c56: Pull complete 
3a262e284a90: Pull complete 
Digest: sha256:2135f9cd8c0dc4ba32def28681eb1158bc674594cd01515907cdcc9eda5e0e51
Status: Downloaded newer image for dimajix/flowman:1.0.0-oss-spark3.3-hadoop3.3
flowman@df9ac595c022:~$ ls

シェルに入って、-f でプロジェクトディレクトリを指定して CLI に繋ぎますが…

flowman@2e564406fbbe:~$ flowshell -f examples/weather/
23/07/05 12:58:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[INFO] Reading Flowman system settings file /opt/flowman/conf/system.yml
[INFO] Reading plugin descriptor /opt/flowman/plugins/flowman-impala/plugin.yml
[INFO] Loading Plugin flowman-impala
[INFO] Reading namespace file /opt/flowman/conf/default-namespace.yml
Killed

Kill されてしまいました。verbose オプションをつけても有益なログは出てきませんでした。Colima のメモリ割り当てを増やして再実行(ローカルマシンで余っていた6GBを割り当て)。

% colima stop
% colima start --memory 6
% docker run --rm -ti dimajix/flowman:1.0.0-oss-spark3.3-hadoop3.3 bash
flowman@3c594160f6f0:~$ cd /opt/flowman
flowman@3c594160f6f0:/opt/flowman$ flowshell -f examples/weather/
Welcome to
______  _
|  ___|| |
| |_   | |  ___ __      __ _ __ ___    __ _  _ __
|  _|  | | / _ \\ \ /\ / /| '_ ` _ \  / _` || '_ \
| |    | || (_) |\ V  V / | | | | | || (_| || | | |
\_|    |_| \___/  \_/\_/  |_| |_| |_| \__,_||_| |_|    1.0.0

Using Spark 3.3.2 and Hadoop 3.3.2 and Scala 2.12.15 (Java 11.0.15)

Type in 'help' for getting help
flowman:weather> 

無事に起動できました。Relation の一覧から、 S3にあるデータ stations_raw のデータを確認してみます。

flowman:weather> relation list
aggregates
measurements
measurements_raw
stations
stations_raw

flowman:weather> relation show stations_raw
CSV file: s3a://dimajix-training/data/weather/isd-history/isd-history.csv
+------+-----+----------+-------+-----+----+--------+---------+---------+----------+----------+
|  usaf| wban|      name|country|state|icao|latitude|longitude|elevation|date_begin|  date_end|
+------+-----+----------+-------+-----+----+--------+---------+---------+----------+----------+
|007018|99999|WXPOD 7018|   null| null|null|     0.0|      0.0|   7018.0|2011-03-09|2013-07-30|
|007026|99999|WXPOD 7026|     AF| null|null|     0.0|      0.0|   7026.0|2012-07-13|2017-08-22|
|007070|99999|WXPOD 7070|     AF| null|null|     0.0|      0.0|   7070.0|2014-09-23|2015-09-26|
|008260|99999| WXPOD8270|   null| null|null|     0.0|      0.0|      0.0|2005-01-01|2010-09-20|
|008268|99999| WXPOD8278|     AF| null|null|   32.95|   65.567|   1156.7|2010-05-19|2012-03-23|
|008307|99999|WXPOD 8318|     AF| null|null|     0.0|      0.0|   8318.0|2010-04-21|2010-04-21|
|008411|99999|      XM20|   null| null|null|    null|     null|     null|2016-02-17|2016-02-17|
|008414|99999|      XM18|   null| null|null|    null|     null|     null|2016-02-16|2016-02-17|
|008415|99999|      XM21|   null| null|null|    null|     null|     null|2016-02-17|2016-02-17|
|008418|99999|      XM24|   null| null|null|    null|     null|     null|2016-02-17|2016-02-17|
+------+-----+----------+-------+-----+----+--------+---------+---------+----------+----------+
only showing top 10 rows
ジョブ実行&確認

flowshell 内でジョブを実行します:

flowman:weather> job build main year=2011

Collected metrics
  job_runtime(name=main,project=weather,phase=BUILD,category=job,kind=job) = 151776.0
  target_records(name=measurements,project=weather,phase=BUILD,category=target,kind=relation) = 4335406.0
  target_records(name=stations,project=weather,phase=BUILD,category=target,kind=relation) = 29744.0
  target_records(name=aggregates,project=weather,phase=BUILD,category=target,kind=relation) = 28.0
  target_runtime(name=measurements,project=weather,phase=BUILD,category=target,kind=relation) = 144379.0
  target_runtime(name=aggregates,project=weather,phase=BUILD,category=target,kind=relation) = 3754.0
  target_runtime(name=stations,project=weather,phase=BUILD,category=target,kind=relation) = 3017.0
[INFO] Mark 'BUILD' job 'default/weather/main' as SUCCESS in history database
[INFO] -------------------------------------------------------------------------------------------------------------
[INFO] Execution summary for job 'weather/main' year=2011
[INFO] 
[INFO] weather/measurements ................................................................... SUCCESS [  2:24 min]
[INFO] weather/stations ....................................................................... SUCCESS [   3.014 s]
[INFO] weather/aggregates ..................................................................... SUCCESS [    3.75 s]
[INFO] -------------------------------------------------------------------------------------------------------------
[INFO] SUCCESS BUILD job 'weather/main' year=2011
[INFO] -------------------------------------------------------------------------------------------------------------
[INFO] Total time: 2:31 min
[INFO] Finished at: 2023-07-05T13:18:55.460834Z[Etc/UTC]
[INFO] -------------------------------------------------------------------------------------------------------------
[INFO] 
[INFO] =============================================================================================================
[INFO] Overall lifecycle summary for job 'weather/main' year=2011
[INFO] 
[INFO] Phase VALIDATE ......................................................................... SUCCESS [  13.936 s]
[INFO] Phase CREATE ........................................................................... SUCCESS [     0.4 s]
[INFO] Phase BUILD ............................................................................ SUCCESS [  2:31 min]
[INFO] =============================================================================================================
[INFO] SUCCESS lifecycle for job 'weather/main' year=2011
[INFO] =============================================================================================================
[INFO] Total time: 2:46 min
[INFO] Finished at: 2023-07-05T13:18:55.705432Z[Etc/UTC]
[INFO] =============================================================================================================

aggregates の定義は country ごとに集約して各気象情報の最小値/最大値/平均値を出すマッピングになっているので、この実行結果を見てみます。

集約も YAML ファイルで定義できています:

# https://github.com/dimajix/flowman/blob/main/examples/weather/mapping/aggregates.yml
mappings:
  # Create some aggregates containing min/max/avg metrics of wind speed and temperature
  aggregates:
    kind: aggregate
    input: facts
    dimensions:
      - country
    aggregations:
      min_wind_speed: "MIN(wind_speed)"
      max_wind_speed: "MAX(wind_speed)"
      avg_wind_speed: "AVG(wind_speed)"
      min_temperature: "MIN(air_temperature)"
      max_temperature: "MAX(air_temperature)"
      avg_temperature: "AVG(air_temperature)"

ここがちょっとよく分からなかったのですが、year=2011 というジョブコンテキストを指定しながらデータを確認します:

flowman:weather> job enter main year=2011

flowman:weather/main> mapping list                                                                                                                                   
aggregates                                                                                                                                                           
facts                                                                                                                                                                
measurements                                                                                                                                                         
measurements_extracted                                                                                                                                               
measurements_joined                                                                                                                                                  
measurements_raw                                                                                                                                                     
stations                                                                                                                                                             
stations_raw                                                                                                                                                         

flowman:weather/main> mapping show aggregates 
+-------+--------------+--------------+------------------+---------------+---------------+------------------+
|country|min_wind_speed|max_wind_speed|    avg_wind_speed|min_temperature|max_temperature|   avg_temperature|
+-------+--------------+--------------+------------------+---------------+---------------+------------------+
|     FI|           0.0|          21.0|  3.91133101326951|          -34.4|           30.7|3.1282190762524187|
|     NL|           0.0|          31.4|4.9690808808653575|           -8.2|           34.0|10.753498106716485|
|     CA|           0.0|          24.7| 4.170504328900583|          -45.0|           34.7| 2.931447564320896|
|     GK|           0.0|          19.0| 5.831224191330406|            1.0|           28.0|12.094347907821643|
|     PO|           0.0|          15.4|3.1292321427768974|           -1.0|           33.5| 15.40633601601996|
|     US|           0.0|          36.0|  2.95617297419709|          -44.0|           46.4|11.681804003913621|
|     GM|           0.0|          15.4|3.9108231087221923|          -10.0|           30.0| 9.442136410413799|
|     FR|           0.0|          19.5|2.8940356618754155|           -9.0|           37.8|14.367572536833235|
|     DA|           0.0|          22.6| 4.283963535296792|          -13.1|           27.0|  8.76277380753966|
|     UK|           0.0|          26.7| 4.486483891184242|           -5.2|           32.0| 11.08883097570104|
+-------+--------------+--------------+------------------+---------------+---------------+------------------+
only showing top 10 rows

country ごとに集約して各気象情報の最小値/最大値/平均値が計算できています。

/tmp/ にファイルを書き出したようなので、一度 flowshell から出て確認してみます。

flowman@3c594160f6f0:/opt/flowman$ ls -al /tmp/weather/aggregates/year\=2011/                                                                                        
total 20                                                                                                                                                             
drwxr-xr-x 2 flowman flowman 4096 Jul  5 13:18 .                                                                                                                     
drwxr-xr-x 3 flowman flowman 4096 Jul  5 13:18 ..                                                                                                                    
-rw-r--r-- 1 flowman flowman    8 Jul  5 13:18 ._SUCCESS.crc                                                                                                         
-rw-r--r-- 1 flowman flowman   32 Jul  5 13:18 .part-00000-d2cb9b17-99c7-45d7-8cd2-d74dfbfb375e-c000.snappy.parquet.crc                                              
-rw-r--r-- 1 flowman flowman    0 Jul  5 13:18 _SUCCESS                                                                                                              
-rw-r--r-- 1 flowman flowman 2822 Jul  5 13:18 part-00000-d2cb9b17-99c7-45d7-8cd2-d74dfbfb375e-c000.snappy.parquet

現在時刻で新しい Parquet ファイルが生成されていました。実際のジョブでは Sink の設定をすることで、S3 や HDFS など外部ストレージにデータを書き出す(書き戻す)ことができます。

感想

Docker 上で少し触っただけですが、 YAML ファイルだけで ETL 処理を管理できるのは魅力的だと感じました。

一方で mapping やジョブコンテキストの概念が少し独特で分かりづらくて GUI も開発されていないので、YAML で宣言的に ETL が書けると言っても、誰でも簡単に…という訳にはいかなそうです。この点は SQL さえかければ誰でもデータ構築ができると謳う dbt とは少しスタンスが異なります。Flowman は Spark 上で ETL 処理を行っているエンジニアの業務効率を上げて本来注力したいビジネスロジックの実装に時間を割けるようになるツールだと感じました。