/home/by-natures/dev*

データ界隈で働くエンジニアとしての技術的なメモと、たまに普通の日記。

2019/02/15 Glue Job の同時実行数について

Glue Job から Spark ジョブを呼ぶことで、大規模データに対する ETL 処理をしたいなと思っているのですが、Glue Job に同時実行数の制限があることに気がつきました:

docs.aws.amazon.com

Glue Job にはテンプレート部分を記述して、実際に動かす SparkSQL はパラメータとして Glue Job に与えようと思ったのですが、同時実行数3だと苦しいです。追加料金で増やしてもらうことはできそうですが、数十・数百にまで上げてもらえるものなんでしょうか。

同時実行数は大幅に上げられなくとも、キューイングしてジョブ待機させておければ良いので、そのような解決方法が取れるか AWS Forum で質問中です。ニッチな課題かもしれないので、回答があるか不安ですが。。:

https://forums.aws.amazon.com/message.jspa?messageID=889894#889894

Glue Job の状態を見て必要に応じて sleep するという処理をしている方がいるようですが、多数のジョブが sleep だけでジョブ実行を待機するのは場当たり的すぎますし。。SQS にキューイングさせて、そこから Glue Job へ登録する何かがいて、ジョブ完了時は CloudWatch イベントを発火させて・・・というとずいぶん複雑になってしまいます。来週 AWS の方とお話する機会があるので聞いてみようと思います。

それでは良い週末を。

2019/02/12 CloudFormation

先週末、都内でキャンプをしまして、インドアな自分としてはとてもアクティブな体験でした。たき火を見ながらぼーっと話をするのもよいものです。翌朝起きたら、キャンプ場一体が雪で真っ白だったことにも驚きました。新鮮な体験でした。

AWS での運用について考えています。運用、というと前職で使っていた CloudFormation を思い出したので、これを改めて調べました:

qiita.com

codezine.jp

AWSで避けるべき5つの間違い | POSTD

postd.cc

CloudFormation でデプロイした環境には手動運用で変更を施すと、それ以降は CloudFormation で更新が掛けられなくなってしまいます。セットアップだけ使うと割り切るか、徹底して CloudFormation だけ使うか・・・でもそうすると上の記事で gumi の方がおっしゃっている

もちろん一方では、どうしても自動化できない、または自動化が難しい部分もあった。その代表的なものが「緊急対応・障害対応」だ。こちらはスピードが第一条件なので、どうしても手作業で対応せざるを得ない。その場合は、手作業で行った修正をコードに反映してインフラをリビルドする。

というのが気になります。リビルドと言っているので、やはり CloudFormation 等で環境を作り直すということなんでしょうか。私が今触っているコンポーネントは AWS Lambda や AWS Glue、StepFunction あたりが多く、インフラというよりソースコードの管理ができればよさそうなので、CloudFormation では扱いづらいかもしれません。

2019/02/06 DynamicFrame の出力スキーマを parquet-tools で確認

AWS Glue で書いた Parquet ファイルが Glue のデータカタログのスキーマと異なることが度々発生しているので、直接 Parquet ファイルのスキーマを確認できないか調べたところ、parquet-tools コマンドで確認できることがわかりました。

github.com

mvn install コマンドでインストールしたり、Maven リポジトリから jar をダウンロードして実行するなど色々方法はありますが、Mac OS 環境だと brew install できます。README には

cd parquet-tools && mvn clean package -Plocal 

でインストールするとありますが、Parquet の他のサブモジュールの成果物も必要なので、 git clone したあとに親モジュール内で mvn clean package -Plocal したほうが良さそう。そのあとも thrift コマンドがないとエラーが出たりしたので brew でインストールしましたが、Parquet が必要なバージョンと合わなかったりしたのでひとまず断念。

Maven リポジトリから jar をダウンロードできるので、少し使うだけならこれが一番よいなと思っていたのですが、エラーも吐かずにエラーコード1を返してきます。Fat jar のようなので java -jar で叩くだけだと思うんですが、、

最後に brew で入れてみると:

$ brew install parquet-tools

特にエラーもなく簡単にインストールできました。インストール後は parquet-tools コマンドとして利用できます:

$ parquet-tools schema file.snappy.parquet
message spark_schema {
  optional binary title (UTF8);
  optional binary word (UTF8);
  optional int64 num;
}

余談: Athena はある程度スキーマ差を吸収している?

例えば上のスキーマに対して Glue Catalog で numint として定義していると、 intbigint(int64) で型が異なるので、Redshift から読むとエラーになります。ただ Athena で同じようなクエリを投げてもエラーにはなりません。

structmap, struct に対して string のようなスキーマ差は Athena もエラーとなるのですが、ビット長が違う程度のスキーマ差は Athena は吸収してくれるようです。そのため、最終的に利用する方法が Redshift なのであれば、Athena ではなく Redshift でデータが読めるか確認しないといけません。

2019/02/05 AWS サービスにインストールされている boto3 のバージョン

AWS CodeCommit を利用して、Glue Job から CodeCommit からファイルを取得して集計処理をしたいなと思ったのですが、 get_file というメソッドが存在しないとエラーが起きてしまいました。

boto3.amazonaws.com

GitHub の boto3 リポジトリを見ると get_file など CodeCommit のメソッド群が追加されているのが 2018年末だったので、関連サービスで使われている boto3 のバージョンを確認しました:

  • Glue Job: 1.7.47
  • Sagemaker Sparkmagic(PySpark): 1.7.47
  • Sagemaker Sparkmagic(PySpark3): (boto3 が入っていない)
  • Lambda Python 2.7: 1.7.74
  • Lambda Python 3.6: 1.7.74
  • Lambda Python 3.7: 1.9.42

この中だと、Lambda Python 3.7 環境がもっとも新しく、CodeCommit の get_file メソッドが呼べました。Glue はそもそも Python2.7 固定なので逃げ道もないです。

Glue で実行する Python スクリプトはファイルを指定することもできるので、最新の boto3 と一緒に S3 上に配置して読み込ませることもできるかもしれません。ただ Glue Job のエディタで閲覧・編集できたほうが今の所便利なので、Lambda Python 3.7 で CodeCommit にアクセスしてファイル取得後、パラメータで Glue Job に渡そうかと思います。

2019/02/01 Parquet ファイルと Glue DataCatalog のスキーマ差異の問題など

AWS Glue と戯れる日々なのですが、SparkSQL の扱い方がわかったところでまたいくつか問題が。

Parquet とテーブルスキーマのフォーマット差異

S3 に書き出したファイルを Glue の DataCatalog を経由して Athena や Redshift から読み込もうとすると、Athena からは読み込めるけれど Redshift からは読み込めない問題が。

https://forums.aws.amazon.com/thread.jspa?threadID=257680

Parquet のスキーマと DataCatalog で定義したテーブルスキーマとの間で整合性が取れない場合に、こういったエラーがでるのですが、今回はAthena からは読み込めるけれど Redshift からは読み込めないという中途半端な状態でした。テーブル定義では bigint を指定していたのですが、Parquet で書き出すときに int 型になっていて、その差異を Athena は吸収してくれたのですが Redshift では型が違うということでエラーになっていたのが原因でした。

Glue で書き出すと、テーブルへの INSERT ではなく Parquet ファイルの書き出しとなるので、今回のように型を間違うことは頻発しそうです。SparkSQL を一段下げて

SELECT
  CAST(col1 AS int) col1,
  CAST(col1 AS bigint) col2
FROM ...

のように、明治的にキャストしてあげると間違いも減りそうです。

空テーブルを DynamicFrame にできない

Glue の DataCatalog からデータを読み込み、DynamicFrame にするところで、対象のテーブルやパーティションが空だと Unable to infer schema for Parquet. It must be specified manually. というエラーが発生してしまう問題にも出くわしました。

stackoverflow.com

空テーブルをわざわざ DynamicFrame にする必要はないのですが、読み込むテーブルなどをパラメタ化したいと思っていて調査していましたが、テーブルのデータを読み込むところでエラーになってしまいます。これは回避方法がなさそうなので、 isEmpty メソッドなので例外を明示的に処理するなどしか対処法はなさそうです。

2019/01/22-23 AWS Glue Crawler が struct をカラムに持つテーブルに使いづらい

AWS Glue を色々と触っているのですが、どうにも正しい使い方がよく分からなくなってきました。

Glue Job で Parquet フォーマットで書き出して Athena から読み込みたいのですが、パーティションによってキー数が大きく異なる JSON 形式のカラムがあるためにうまくいきません。統一的に扱うなら string にして保存してしまえば良いんですけど、そうすると SparkSQL で上手く扱えない(SparkSQL で string を JSON として扱う方法があれば教えてください ありました、get_json_object 関数を使うことで string 形式の JSON からバリューを抜き出すことができました。)。

Glue Job で string ではなく struct で書き出すと、Glue Crawler がパーティションごとに異なるスキーマを生成してしまい、テーブル全体としてスキーマが一致しない(incompatible) ので Athena で読めない。その場合のエラーは HIVE_PARTITION_SCHEMA_MISMATCH なので、回避方法はあります:

https://forums.aws.amazon.com/thread.jspa?messageID=805011

Athena と AWS Glue を併用する際のベストプラクティス - Amazon Athena

stackoverflow.com

ただこれを回避すると、別の HIVE_CANNOT_OPEN_SPLIT というエラーが。私の環境では Schema mismatch, metastore schema for row column contents has 185 fields but parquet schema has 149 fields と出ます。色々 Glue Crawler 上で試しましたが回避方法は見つかっていません。

adtech-blog.united.jp

結局、struct を使うのが問題の根源なので、 string か、せめて map<string,string> のようなもう少し汎用的な形で保存できればよいなと思っています。これなら Crawler ではなく MSCK REPAIR TABLE コマンドでパーティション追加するだけでいけるはず。ただこれを Glue Job(PySpark) で実現する方法がまだ分かりません。

Glue は便利そうだなと思っていましたが、Crawler のようなデータ管理をよしなにやってくれるサービスが上手くいかないと逆に検証に時間を取られて徒労に終わってしまうなぁと思った今日この頃。。結局 Spark でのデータ変換をきちんと勉強して地道に変換した方が早い気がしています。

追記

上で記載しましたが、 get_json_object という関数を利用することで string 形式の JSON からバリューを抜き出すことができました。とりあえずこれで、 Glue Job で string 形式で保存した上で、あとから SparkSQL でアクセスすることができます。

API ドキュメントはこちら。先週結構探したつもりだったんですが、今日さがしたらすぐに見つかりました。。

spark.apache.org

2019/01/15 Glue における SQL 中心アーキテクチャ ETL、他

昨日居酒屋で飲んでいたら(かぶら屋 美味しいです、おすすめ)、隣の席の人が UberEATS の使いすぎで他の方から怒られていました。僕の友人でも UberEATS や LINE デリマを良く使う人はいるので、別にいいんじゃないかなと思ったら、どうやら人はタクシーやデリバリーなどの細かい出費の積み重ねで借金をしてしまった?ようです。

借金とは言わないまでも、昔はなかった通信費は今やもはや固定費になっているし、便利さと引き換えに可処分所得は減っているという記事をどこかで見たような気がしたなぁと思った週末でした。

「AWS Glue と SQLのみで、サクッとETL(Extract、Transform、Load)するJobを作成する」

dev.classmethod.jp

Glue のジョブはGlue が提供する Transform を利用しないといけないと思っていましたが、 入出力の雛形ジョブだけ Glue に用意して、SparkSQL を外部から設定できるようにすれば「SQL中心アーキテクチャ」による ETL が簡単に実現できるといった内容です。

外部(S3 上に置いておく想定)の設定ファイルは以下のようなものです:

{
    "source_database":"default", 
    "source_table":"elb_parquet_20150101", 
    "target_s3_url":"s3://<mybucket>/tmp/", 
    "target_format":"parquet", 
    "sql":"SELECT elb_name, count(*) as request_count FROM stagingtable group by elb_name order by elb_name"
}

これを S3 から読み込んで JSON パースして Glue 上で SparkSQL を実行します。これだと1テーブルしか扱えないので複数テーブル与えられるようにフォーマットを変更したいですが、これなら SQL だけ書ければ ETL 処理が実現できるので扱える人が増えそうです。

ただ、ETL ジョブの依存関係を解決しようとすると Glue だけでは難しそうです。Glue にも「トリガー」があって、他のジョブへ依存するジョブを作成できるのですが、パラメータレベルでは対応していないので、上のように SQL をパラメータ経由で渡すような運用だと「トリガー」では対応できません。

「Glueの使い方的な⑦(Step Functionsでジョブフロー)」

こちらも分かりやすい記事でした、Glue + Step Functions の組み合わせです。Glue の呼び出しには Lambda を使っています。

qiita.com

ワークフローが巨大だと GUI でも JSON でも扱いづらそうなので、ある程度意味のあるまとまりでまとめておく程度にした方がよさそうでしょうか。。CloudWatch を使えばワークフロー間の連携も実現できそうなので引き続き調べます。