Medium で見つけた記事をいくつか紹介しようと思ったのですが、Flowman の記事が長くなったので Docker でのデモも交えながらご紹介します。
Flowman — A Declarative ETL Framework powered by Apache Spark
Flowman という Apache ライセンス下の ETL フレームワークの紹介です。ピュアな Spark, Cloudera, AWS EMR, Azure Synapse などの分散処理システム上で動く「宣言的な」ETLフレームワークとのこと。記事自体も Flowmanの開発者 Kaya Kupferschmidt 氏によって書かれています。地味なポイントですが公式ページがオシャレなのも良いですね。ドキュメントを読みたくなる…。
Flowman - Declarative ETL Framework powered by Apache Spark
「宣言的」にETLが書けるというのをアピールポイントにしていて、具体的には YAML ファイルを定義することで ETL 処理の記述をすることができます。
# File: flow/mapping/measurements.yml mappings: # This mapping refers to the "raw" relation and reads in data from the source in S3 measurements_raw: kind: relation relation: measurements_raw partitions: year: $year # Extract multiple columns from the raw measurements data using SQL SUBSTR functions measurements_extracted: kind: select input: measurements_raw columns: usaf: "CAST(SUBSTR(raw_data,5,6) AS INT)" wban: "CAST(SUBSTR(raw_data,11,5) AS INT)" date: "TO_DATE(SUBSTR(raw_data,16,8), 'yyyyMMdd')" time: "SUBSTR(raw_data,24,4)" report_type: "SUBSTR(raw_data,42,5)" wind_direction: "CAST(SUBSTR(raw_data,61,3) AS INT)" wind_direction_qual: "SUBSTR(raw_data,64,1)" wind_observation: "SUBSTR(raw_data,65,1)" wind_speed: "CAST(CAST(SUBSTR(raw_data,66,4) AS FLOAT)/10 AS FLOAT)" wind_speed_qual: "SUBSTR(raw_data,70,1)" air_temperature: "CAST(CAST(SUBSTR(raw_data,88,5) AS FLOAT)/10 AS FLOAT)" air_temperature_qual: "SUBSTR(raw_data,93,1)"
さらには単体テストの機能もついており、擬似データを定義することで各データ処理が正しく行われるかを確認することができます。
# File: flow/test/test-measurements.yml tests: test_measurements: environment: - year=2013 overrideMappings: measurements_raw: # Mocking a data source will pick up the original data schema, and the only thing left to do is providing # mocked records kind: mock records: # Provide two records including both the raw data and the partition key - year: $year raw_data: "042599999963897201301010000I+32335-086979CRN05+004..." - year: $year raw_data: "013399999963897201301010005I+32335-086979CRN05+004..." assertions: measurements_extracted: kind: sql description: "Measurements are extracted correctly" tests: - query: "SELECT * FROM measurements_extracted" expected: - [ 999999,63897,2013-01-01,0000,CRN05,124,1,H,0.9,1,10.6,1 ] - [ 999999,63897,2013-01-01,0005,CRN05,124,1,H,1.5,1,10.6,1 ] - query: "SELECT COUNT(*) FROM measurements_extracted" expected: 2
Dev, Prod など環境ごとの差異をプロパティによって吸収することもできたり、書き込みレコード数をメトリクスとして Prometeus などのモニタリングツールに連携することもできます。
ローカルの Docker で触ってみる
起動
Flowman Quick Start Guide - Flowman documentation
Flowman にはインタラクティブにジョブを実行する flowshell
と、バッチとして設定する flowexec
の2つのコマンドがあります。Dockerで触ってみる上のサンプルでは flowshell
を利用したチュートリアルが紹介されています。
% docker run --rm -ti dimajix/flowman:1.0.0-oss-spark3.3-hadoop3.3 bash Unable to find image 'dimajix/flowman:1.0.0-oss-spark3.3-hadoop3.3' locally 1.0.0-oss-spark3.3-hadoop3.3: Pulling from dimajix/flowman e756f3fdd6a3: Pull complete bf168a674899: Pull complete e604223835cc: Pull complete 6d5c91c4cd86: Pull complete 5e20d165240e: Pull complete 1334d60df9a8: Pull complete 16c2728dcd90: Pull complete 0eb2d7ff6056: Pull complete 6dde7ad652df: Pull complete 7b48840dfae0: Pull complete 45d6f68b5a92: Pull complete cbc35c331c56: Pull complete 3a262e284a90: Pull complete Digest: sha256:2135f9cd8c0dc4ba32def28681eb1158bc674594cd01515907cdcc9eda5e0e51 Status: Downloaded newer image for dimajix/flowman:1.0.0-oss-spark3.3-hadoop3.3 flowman@df9ac595c022:~$ ls
シェルに入って、-f
でプロジェクトディレクトリを指定して CLI に繋ぎますが…
flowman@2e564406fbbe:~$ flowshell -f examples/weather/ 23/07/05 12:58:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable [INFO] Reading Flowman system settings file /opt/flowman/conf/system.yml [INFO] Reading plugin descriptor /opt/flowman/plugins/flowman-impala/plugin.yml [INFO] Loading Plugin flowman-impala [INFO] Reading namespace file /opt/flowman/conf/default-namespace.yml Killed
Kill されてしまいました。verbose オプションをつけても有益なログは出てきませんでした。Colima のメモリ割り当てを増やして再実行(ローカルマシンで余っていた6GBを割り当て)。
% colima stop % colima start --memory 6 % docker run --rm -ti dimajix/flowman:1.0.0-oss-spark3.3-hadoop3.3 bash flowman@3c594160f6f0:~$ cd /opt/flowman flowman@3c594160f6f0:/opt/flowman$ flowshell -f examples/weather/ Welcome to ______ _ | ___|| | | |_ | | ___ __ __ _ __ ___ __ _ _ __ | _| | | / _ \\ \ /\ / /| '_ ` _ \ / _` || '_ \ | | | || (_) |\ V V / | | | | | || (_| || | | | \_| |_| \___/ \_/\_/ |_| |_| |_| \__,_||_| |_| 1.0.0 Using Spark 3.3.2 and Hadoop 3.3.2 and Scala 2.12.15 (Java 11.0.15) Type in 'help' for getting help flowman:weather>
無事に起動できました。Relation の一覧から、 S3にあるデータ stations_raw
のデータを確認してみます。
flowman:weather> relation list aggregates measurements measurements_raw stations stations_raw flowman:weather> relation show stations_raw CSV file: s3a://dimajix-training/data/weather/isd-history/isd-history.csv +------+-----+----------+-------+-----+----+--------+---------+---------+----------+----------+ | usaf| wban| name|country|state|icao|latitude|longitude|elevation|date_begin| date_end| +------+-----+----------+-------+-----+----+--------+---------+---------+----------+----------+ |007018|99999|WXPOD 7018| null| null|null| 0.0| 0.0| 7018.0|2011-03-09|2013-07-30| |007026|99999|WXPOD 7026| AF| null|null| 0.0| 0.0| 7026.0|2012-07-13|2017-08-22| |007070|99999|WXPOD 7070| AF| null|null| 0.0| 0.0| 7070.0|2014-09-23|2015-09-26| |008260|99999| WXPOD8270| null| null|null| 0.0| 0.0| 0.0|2005-01-01|2010-09-20| |008268|99999| WXPOD8278| AF| null|null| 32.95| 65.567| 1156.7|2010-05-19|2012-03-23| |008307|99999|WXPOD 8318| AF| null|null| 0.0| 0.0| 8318.0|2010-04-21|2010-04-21| |008411|99999| XM20| null| null|null| null| null| null|2016-02-17|2016-02-17| |008414|99999| XM18| null| null|null| null| null| null|2016-02-16|2016-02-17| |008415|99999| XM21| null| null|null| null| null| null|2016-02-17|2016-02-17| |008418|99999| XM24| null| null|null| null| null| null|2016-02-17|2016-02-17| +------+-----+----------+-------+-----+----+--------+---------+---------+----------+----------+ only showing top 10 rows
ジョブ実行&確認
flowshell 内でジョブを実行します:
flowman:weather> job build main year=2011 Collected metrics job_runtime(name=main,project=weather,phase=BUILD,category=job,kind=job) = 151776.0 target_records(name=measurements,project=weather,phase=BUILD,category=target,kind=relation) = 4335406.0 target_records(name=stations,project=weather,phase=BUILD,category=target,kind=relation) = 29744.0 target_records(name=aggregates,project=weather,phase=BUILD,category=target,kind=relation) = 28.0 target_runtime(name=measurements,project=weather,phase=BUILD,category=target,kind=relation) = 144379.0 target_runtime(name=aggregates,project=weather,phase=BUILD,category=target,kind=relation) = 3754.0 target_runtime(name=stations,project=weather,phase=BUILD,category=target,kind=relation) = 3017.0 [INFO] Mark 'BUILD' job 'default/weather/main' as SUCCESS in history database [INFO] ------------------------------------------------------------------------------------------------------------- [INFO] Execution summary for job 'weather/main' year=2011 [INFO] [INFO] weather/measurements ................................................................... SUCCESS [ 2:24 min] [INFO] weather/stations ....................................................................... SUCCESS [ 3.014 s] [INFO] weather/aggregates ..................................................................... SUCCESS [ 3.75 s] [INFO] ------------------------------------------------------------------------------------------------------------- [INFO] SUCCESS BUILD job 'weather/main' year=2011 [INFO] ------------------------------------------------------------------------------------------------------------- [INFO] Total time: 2:31 min [INFO] Finished at: 2023-07-05T13:18:55.460834Z[Etc/UTC] [INFO] ------------------------------------------------------------------------------------------------------------- [INFO] [INFO] ============================================================================================================= [INFO] Overall lifecycle summary for job 'weather/main' year=2011 [INFO] [INFO] Phase VALIDATE ......................................................................... SUCCESS [ 13.936 s] [INFO] Phase CREATE ........................................................................... SUCCESS [ 0.4 s] [INFO] Phase BUILD ............................................................................ SUCCESS [ 2:31 min] [INFO] ============================================================================================================= [INFO] SUCCESS lifecycle for job 'weather/main' year=2011 [INFO] ============================================================================================================= [INFO] Total time: 2:46 min [INFO] Finished at: 2023-07-05T13:18:55.705432Z[Etc/UTC] [INFO] =============================================================================================================
aggregates の定義は country
ごとに集約して各気象情報の最小値/最大値/平均値を出すマッピングになっているので、この実行結果を見てみます。
集約も YAML ファイルで定義できています:
# https://github.com/dimajix/flowman/blob/main/examples/weather/mapping/aggregates.yml mappings: # Create some aggregates containing min/max/avg metrics of wind speed and temperature aggregates: kind: aggregate input: facts dimensions: - country aggregations: min_wind_speed: "MIN(wind_speed)" max_wind_speed: "MAX(wind_speed)" avg_wind_speed: "AVG(wind_speed)" min_temperature: "MIN(air_temperature)" max_temperature: "MAX(air_temperature)" avg_temperature: "AVG(air_temperature)"
ここがちょっとよく分からなかったのですが、year=2011
というジョブコンテキストを指定しながらデータを確認します:
flowman:weather> job enter main year=2011 flowman:weather/main> mapping list aggregates facts measurements measurements_extracted measurements_joined measurements_raw stations stations_raw flowman:weather/main> mapping show aggregates +-------+--------------+--------------+------------------+---------------+---------------+------------------+ |country|min_wind_speed|max_wind_speed| avg_wind_speed|min_temperature|max_temperature| avg_temperature| +-------+--------------+--------------+------------------+---------------+---------------+------------------+ | FI| 0.0| 21.0| 3.91133101326951| -34.4| 30.7|3.1282190762524187| | NL| 0.0| 31.4|4.9690808808653575| -8.2| 34.0|10.753498106716485| | CA| 0.0| 24.7| 4.170504328900583| -45.0| 34.7| 2.931447564320896| | GK| 0.0| 19.0| 5.831224191330406| 1.0| 28.0|12.094347907821643| | PO| 0.0| 15.4|3.1292321427768974| -1.0| 33.5| 15.40633601601996| | US| 0.0| 36.0| 2.95617297419709| -44.0| 46.4|11.681804003913621| | GM| 0.0| 15.4|3.9108231087221923| -10.0| 30.0| 9.442136410413799| | FR| 0.0| 19.5|2.8940356618754155| -9.0| 37.8|14.367572536833235| | DA| 0.0| 22.6| 4.283963535296792| -13.1| 27.0| 8.76277380753966| | UK| 0.0| 26.7| 4.486483891184242| -5.2| 32.0| 11.08883097570104| +-------+--------------+--------------+------------------+---------------+---------------+------------------+ only showing top 10 rows
country
ごとに集約して各気象情報の最小値/最大値/平均値が計算できています。
/tmp/
にファイルを書き出したようなので、一度 flowshell から出て確認してみます。
flowman@3c594160f6f0:/opt/flowman$ ls -al /tmp/weather/aggregates/year\=2011/ total 20 drwxr-xr-x 2 flowman flowman 4096 Jul 5 13:18 . drwxr-xr-x 3 flowman flowman 4096 Jul 5 13:18 .. -rw-r--r-- 1 flowman flowman 8 Jul 5 13:18 ._SUCCESS.crc -rw-r--r-- 1 flowman flowman 32 Jul 5 13:18 .part-00000-d2cb9b17-99c7-45d7-8cd2-d74dfbfb375e-c000.snappy.parquet.crc -rw-r--r-- 1 flowman flowman 0 Jul 5 13:18 _SUCCESS -rw-r--r-- 1 flowman flowman 2822 Jul 5 13:18 part-00000-d2cb9b17-99c7-45d7-8cd2-d74dfbfb375e-c000.snappy.parquet
現在時刻で新しい Parquet ファイルが生成されていました。実際のジョブでは Sink の設定をすることで、S3 や HDFS など外部ストレージにデータを書き出す(書き戻す)ことができます。
感想
Docker 上で少し触っただけですが、 YAML ファイルだけで ETL 処理を管理できるのは魅力的だと感じました。
一方で mapping
やジョブコンテキストの概念が少し独特で分かりづらくて GUI も開発されていないので、YAML で宣言的に ETL が書けると言っても、誰でも簡単に…という訳にはいかなそうです。この点は SQL さえかければ誰でもデータ構築ができると謳う dbt とは少しスタンスが異なります。Flowman は Spark 上で ETL 処理を行っているエンジニアの業務効率を上げて本来注力したいビジネスロジックの実装に時間を割けるようになるツールだと感じました。