Medium で見つけた記事をいくつか紹介しようと思ったのですが、Flowman の記事が長くなったので Docker でのデモも交えながらご紹介します。
Flowman — A Declarative ETL Framework powered by Apache Spark
kupferk.medium.com
Flowman という Apache ライセンス下の ETL フレームワークの紹介です。ピュアな Spark, Cloudera, AWS EMR, Azure Synapse などの分散処理システム上で動く「宣言的な」ETLフレームワークとのこと。記事自体も Flowmanの開発者 Kaya Kupferschmidt 氏によって書かれています。地味なポイントですが公式ページがオシャレなのも良いですね。ドキュメントを読みたくなる…。
Flowman - Declarative ETL Framework powered by Apache Spark
「宣言的」にETLが書けるというのをアピールポイントにしていて、具体的には YAML ファイルを定義することで ETL 処理の記述をすることができます。
mappings:
measurements_raw:
kind: relation
relation: measurements_raw
partitions:
year: $year
measurements_extracted:
kind: select
input: measurements_raw
columns:
usaf: "CAST(SUBSTR(raw_data,5,6) AS INT)"
wban: "CAST(SUBSTR(raw_data,11,5) AS INT)"
date: "TO_DATE(SUBSTR(raw_data,16,8), 'yyyyMMdd')"
time: "SUBSTR(raw_data,24,4)"
report_type: "SUBSTR(raw_data,42,5)"
wind_direction: "CAST(SUBSTR(raw_data,61,3) AS INT)"
wind_direction_qual: "SUBSTR(raw_data,64,1)"
wind_observation: "SUBSTR(raw_data,65,1)"
wind_speed: "CAST(CAST(SUBSTR(raw_data,66,4) AS FLOAT)/10 AS FLOAT)"
wind_speed_qual: "SUBSTR(raw_data,70,1)"
air_temperature: "CAST(CAST(SUBSTR(raw_data,88,5) AS FLOAT)/10 AS FLOAT)"
air_temperature_qual: "SUBSTR(raw_data,93,1)"
さらには単体テストの機能もついており、擬似データを定義することで各データ処理が正しく行われるかを確認することができます。
tests:
test_measurements:
environment:
- year=2013
overrideMappings:
measurements_raw:
kind: mock
records:
- year: $year
raw_data: "042599999963897201301010000I+32335-086979CRN05+004..."
- year: $year
raw_data: "013399999963897201301010005I+32335-086979CRN05+004..."
assertions:
measurements_extracted:
kind: sql
description: "Measurements are extracted correctly"
tests:
- query: "SELECT * FROM measurements_extracted"
expected:
- [ 999999,63897,2013-01-01,0000,CRN05,124,1,H,0.9,1,10.6,1 ]
- [ 999999,63897,2013-01-01,0005,CRN05,124,1,H,1.5,1,10.6,1 ]
- query: "SELECT COUNT(*) FROM measurements_extracted"
expected: 2
Dev, Prod など環境ごとの差異をプロパティによって吸収することもできたり、書き込みレコード数をメトリクスとして Prometeus などのモニタリングツールに連携することもできます。
ローカルの Docker で触ってみる
起動
Flowman Quick Start Guide - Flowman documentation
Flowman にはインタラクティブにジョブを実行する flowshell
と、バッチとして設定する flowexec
の2つのコマンドがあります。Dockerで触ってみる上のサンプルでは flowshell
を利用したチュートリアルが紹介されています。
% docker run --rm -ti dimajix/flowman:1.0.0-oss-spark3.3-hadoop3.3 bash
Unable to find image 'dimajix/flowman:1.0.0-oss-spark3.3-hadoop3.3' locally
1.0.0-oss-spark3.3-hadoop3.3: Pulling from dimajix/flowman
e756f3fdd6a3: Pull complete
bf168a674899: Pull complete
e604223835cc: Pull complete
6d5c91c4cd86: Pull complete
5e20d165240e: Pull complete
1334d60df9a8: Pull complete
16c2728dcd90: Pull complete
0eb2d7ff6056: Pull complete
6dde7ad652df: Pull complete
7b48840dfae0: Pull complete
45d6f68b5a92: Pull complete
cbc35c331c56: Pull complete
3a262e284a90: Pull complete
Digest: sha256:2135f9cd8c0dc4ba32def28681eb1158bc674594cd01515907cdcc9eda5e0e51
Status: Downloaded newer image for dimajix/flowman:1.0.0-oss-spark3.3-hadoop3.3
flowman@df9ac595c022:~$ ls
シェルに入って、-f
でプロジェクトディレクトリを指定して CLI に繋ぎますが…
flowman@2e564406fbbe:~$ flowshell -f examples/weather/
23/07/05 12:58:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[INFO] Reading Flowman system settings file /opt/flowman/conf/system.yml
[INFO] Reading plugin descriptor /opt/flowman/plugins/flowman-impala/plugin.yml
[INFO] Loading Plugin flowman-impala
[INFO] Reading namespace file /opt/flowman/conf/default-namespace.yml
Killed
Kill されてしまいました。verbose オプションをつけても有益なログは出てきませんでした。Colima のメモリ割り当てを増やして再実行(ローカルマシンで余っていた6GBを割り当て)。
% colima stop
% colima start --memory 6
% docker run --rm -ti dimajix/flowman:1.0.0-oss-spark3.3-hadoop3.3 bash
flowman@3c594160f6f0:~$ cd /opt/flowman
flowman@3c594160f6f0:/opt/flowman$ flowshell -f examples/weather/
Welcome to
______ _
| ___|| |
| |_ | | ___ __ __ _ __ ___ __ _ _ __
| _| | | / _ \\ \ /\ / /| '_ ` _ \ / _` || '_ \
| | | || (_) |\ V V / | | | | | || (_| || | | |
\_| |_| \___/ \_/\_/ |_| |_| |_| \__,_||_| |_| 1.0.0
Using Spark 3.3.2 and Hadoop 3.3.2 and Scala 2.12.15 (Java 11.0.15)
Type in 'help' for getting help
flowman:weather>
無事に起動できました。Relation の一覧から、 S3にあるデータ stations_raw
のデータを確認してみます。
flowman:weather> relation list
aggregates
measurements
measurements_raw
stations
stations_raw
flowman:weather> relation show stations_raw
CSV file: s3a://dimajix-training/data/weather/isd-history/isd-history.csv
+------+-----+----------+-------+-----+----+--------+---------+---------+----------+----------+
| usaf| wban| name|country|state|icao|latitude|longitude|elevation|date_begin| date_end|
+------+-----+----------+-------+-----+----+--------+---------+---------+----------+----------+
|007018|99999|WXPOD 7018| null| null|null| 0.0| 0.0| 7018.0|2011-03-09|2013-07-30|
|007026|99999|WXPOD 7026| AF| null|null| 0.0| 0.0| 7026.0|2012-07-13|2017-08-22|
|007070|99999|WXPOD 7070| AF| null|null| 0.0| 0.0| 7070.0|2014-09-23|2015-09-26|
|008260|99999| WXPOD8270| null| null|null| 0.0| 0.0| 0.0|2005-01-01|2010-09-20|
|008268|99999| WXPOD8278| AF| null|null| 32.95| 65.567| 1156.7|2010-05-19|2012-03-23|
|008307|99999|WXPOD 8318| AF| null|null| 0.0| 0.0| 8318.0|2010-04-21|2010-04-21|
|008411|99999| XM20| null| null|null| null| null| null|2016-02-17|2016-02-17|
|008414|99999| XM18| null| null|null| null| null| null|2016-02-16|2016-02-17|
|008415|99999| XM21| null| null|null| null| null| null|2016-02-17|2016-02-17|
|008418|99999| XM24| null| null|null| null| null| null|2016-02-17|2016-02-17|
+------+-----+----------+-------+-----+----+--------+---------+---------+----------+----------+
only showing top 10 rows
ジョブ実行&確認
flowshell 内でジョブを実行します:
flowman:weather> job build main year=2011
Collected metrics
job_runtime(name=main,project=weather,phase=BUILD,category=job,kind=job) = 151776.0
target_records(name=measurements,project=weather,phase=BUILD,category=target,kind=relation) = 4335406.0
target_records(name=stations,project=weather,phase=BUILD,category=target,kind=relation) = 29744.0
target_records(name=aggregates,project=weather,phase=BUILD,category=target,kind=relation) = 28.0
target_runtime(name=measurements,project=weather,phase=BUILD,category=target,kind=relation) = 144379.0
target_runtime(name=aggregates,project=weather,phase=BUILD,category=target,kind=relation) = 3754.0
target_runtime(name=stations,project=weather,phase=BUILD,category=target,kind=relation) = 3017.0
[INFO] Mark 'BUILD' job 'default/weather/main' as SUCCESS in history database
[INFO] -------------------------------------------------------------------------------------------------------------
[INFO] Execution summary for job 'weather/main' year=2011
[INFO]
[INFO] weather/measurements ................................................................... SUCCESS [ 2:24 min]
[INFO] weather/stations ....................................................................... SUCCESS [ 3.014 s]
[INFO] weather/aggregates ..................................................................... SUCCESS [ 3.75 s]
[INFO] -------------------------------------------------------------------------------------------------------------
[INFO] SUCCESS BUILD job 'weather/main' year=2011
[INFO] -------------------------------------------------------------------------------------------------------------
[INFO] Total time: 2:31 min
[INFO] Finished at: 2023-07-05T13:18:55.460834Z[Etc/UTC]
[INFO] -------------------------------------------------------------------------------------------------------------
[INFO]
[INFO] =============================================================================================================
[INFO] Overall lifecycle summary for job 'weather/main' year=2011
[INFO]
[INFO] Phase VALIDATE ......................................................................... SUCCESS [ 13.936 s]
[INFO] Phase CREATE ........................................................................... SUCCESS [ 0.4 s]
[INFO] Phase BUILD ............................................................................ SUCCESS [ 2:31 min]
[INFO] =============================================================================================================
[INFO] SUCCESS lifecycle for job 'weather/main' year=2011
[INFO] =============================================================================================================
[INFO] Total time: 2:46 min
[INFO] Finished at: 2023-07-05T13:18:55.705432Z[Etc/UTC]
[INFO] =============================================================================================================
aggregates の定義は country
ごとに集約して各気象情報の最小値/最大値/平均値を出すマッピングになっているので、この実行結果を見てみます。
集約も YAML ファイルで定義できています:
mappings:
aggregates:
kind: aggregate
input: facts
dimensions:
- country
aggregations:
min_wind_speed: "MIN(wind_speed)"
max_wind_speed: "MAX(wind_speed)"
avg_wind_speed: "AVG(wind_speed)"
min_temperature: "MIN(air_temperature)"
max_temperature: "MAX(air_temperature)"
avg_temperature: "AVG(air_temperature)"
ここがちょっとよく分からなかったのですが、year=2011
というジョブコンテキストを指定しながらデータを確認します:
flowman:weather> job enter main year=2011
flowman:weather/main> mapping list
aggregates
facts
measurements
measurements_extracted
measurements_joined
measurements_raw
stations
stations_raw
flowman:weather/main> mapping show aggregates
+-------+--------------+--------------+------------------+---------------+---------------+------------------+
|country|min_wind_speed|max_wind_speed| avg_wind_speed|min_temperature|max_temperature| avg_temperature|
+-------+--------------+--------------+------------------+---------------+---------------+------------------+
| FI| 0.0| 21.0| 3.91133101326951| -34.4| 30.7|3.1282190762524187|
| NL| 0.0| 31.4|4.9690808808653575| -8.2| 34.0|10.753498106716485|
| CA| 0.0| 24.7| 4.170504328900583| -45.0| 34.7| 2.931447564320896|
| GK| 0.0| 19.0| 5.831224191330406| 1.0| 28.0|12.094347907821643|
| PO| 0.0| 15.4|3.1292321427768974| -1.0| 33.5| 15.40633601601996|
| US| 0.0| 36.0| 2.95617297419709| -44.0| 46.4|11.681804003913621|
| GM| 0.0| 15.4|3.9108231087221923| -10.0| 30.0| 9.442136410413799|
| FR| 0.0| 19.5|2.8940356618754155| -9.0| 37.8|14.367572536833235|
| DA| 0.0| 22.6| 4.283963535296792| -13.1| 27.0| 8.76277380753966|
| UK| 0.0| 26.7| 4.486483891184242| -5.2| 32.0| 11.08883097570104|
+-------+--------------+--------------+------------------+---------------+---------------+------------------+
only showing top 10 rows
country
ごとに集約して各気象情報の最小値/最大値/平均値が計算できています。
/tmp/
にファイルを書き出したようなので、一度 flowshell から出て確認してみます。
flowman@3c594160f6f0:/opt/flowman$ ls -al /tmp/weather/aggregates/year\=2011/
total 20
drwxr-xr-x 2 flowman flowman 4096 Jul 5 13:18 .
drwxr-xr-x 3 flowman flowman 4096 Jul 5 13:18 ..
-rw-r--r-- 1 flowman flowman 8 Jul 5 13:18 ._SUCCESS.crc
-rw-r--r-- 1 flowman flowman 32 Jul 5 13:18 .part-00000-d2cb9b17-99c7-45d7-8cd2-d74dfbfb375e-c000.snappy.parquet.crc
-rw-r--r-- 1 flowman flowman 0 Jul 5 13:18 _SUCCESS
-rw-r--r-- 1 flowman flowman 2822 Jul 5 13:18 part-00000-d2cb9b17-99c7-45d7-8cd2-d74dfbfb375e-c000.snappy.parquet
現在時刻で新しい Parquet ファイルが生成されていました。実際のジョブでは Sink の設定をすることで、S3 や HDFS など外部ストレージにデータを書き出す(書き戻す)ことができます。
感想
Docker 上で少し触っただけですが、 YAML ファイルだけで ETL 処理を管理できるのは魅力的だと感じました。
一方で mapping
やジョブコンテキストの概念が少し独特で分かりづらくて GUI も開発されていないので、YAML で宣言的に ETL が書けると言っても、誰でも簡単に…という訳にはいかなそうです。この点は SQL さえかければ誰でもデータ構築ができると謳う dbt とは少しスタンスが異なります。Flowman は Spark 上で ETL 処理を行っているエンジニアの業務効率を上げて本来注力したいビジネスロジックの実装に時間を割けるようになるツールだと感じました。