/home/by-natures/dev*

データ界隈で働くエンジニアとしての技術的なメモと、たまに普通の日記。

2019/02/26 StepFunctions + Athena 動作の読み解き

昨日も少し触れましたが、StepFunctions から Athena を呼んで ETL 処理をさせたいと思っています。AWS の方に、以下のリポジトリが参考になると紹介していただきました:

github.com

以下の図が動作概要図です:

f:id:bynatures:20190226181513p:plain

この図、一見分かったような気になりますが少し複雑でした。具体的には「2. Trigger based on a schedule」というところです。Step Functions の状態遷移図に対しての操作が、CloudWatch からの定期実行なんですね。CloudWatch から Lambda が起動され、状態遷移図を見に行って、実行できるものがあれば実行する、といった流れです。ですので Step Functions 自体はあくまでも状態遷移図を定義するだけで、実行してステートを動かすのはこのサンプルでは CloudWatch から定期実行された Lambda となります。

Athena との連携であれば、Athena へクエリを投げたあとに待機しないといけないのですが(Lambda に実行時間制限があるので待機はできない)、これは DynamoDB を使って実現されています。発行したクエリID を DynamoDB へ保存し、CloudWatch から定期実行される(ポーリングされる)Lambda で、DynamoDB に保存されているクエリID群を取得してステータスチェックしに行きます。

サンプルの athenarunner.py にはハンドラが1つしかないのですが、その中で「クエリの実行」「クエリのステータス確認」どちらも行っています。 start_athena_queriescheck_athena_queries メソッドの部分ですね:

def handler(event, context):

    logger.debug('*** Athena Runner lambda function starting ***')

    try:

        # Get config (including a single activity ARN) from local file
        config = load_config()

        # One round of starting Athena queries
        start_athena_queries(config)

        # One round of checking on Athena queries
        check_athena_queries(config)

        logger.debug('*** Master Athena Runner terminating ***')

    except Exception as e:
        logger.critical('*** ERROR: Athena runner lambda function failed ***')
        logger.critical(e.message)
        raise
CloudWatch を使わないクエリ状態のチェック

慣れればよいのかもしれませんが、登録とチェックを一つのハンドラで毎回ポーリングのたびに呼び出されるのがやや妙に思えました。Step Functions から Lambda を直接呼び出してクエリ登録まで行い、アクティビティタスク(CloudWatch からの Lambda でのポーリング)でクエリのステータス確認を行う、というのがシンプルな気がするなぁ、ということで、もう少しドキュメントを読んでみると、こんなことが書いてありました:

aws.amazon.com

重要な質問が出るかもしれません。なぜETLランナーはStep Functionsのステートマシンから独立して実行され、タスクをポーリングするのですか? 代わりにStep FunctionsステートマシンからAWS Lambdaを直接呼び出すことはできませんか? その後、その機能を開始し、完了するまでETLジョブを監視することはできませんか?

その答えは、AWS Lambdaは、1リクエストあたり最大実行時間が300秒、つまり5分であることです。(中略)CloudWatchイベントを通じてポーリングスケジュールを管理したくない場合は、ETLワークフローのステートマシンにポーリングループを実装できます。

CloudWatch でのポーリングではなく、状態遷移図側でループを書いてもよい、ということですね。クエリを実行するごとに状態遷移図が複雑になりますが、大規模な状態遷移図でなければこちらの方が楽かもしれません。

2019/02/25 AWS サーバレスアーキテクチャ諸々

先週から調べていたことの簡単なまとめです。

AWS SAM

サーバレスアーキテクチャに適した形で CloudFormation を使えるようにする拡張です。SAM = Serverless Application Model。サンプルを動かしていたのですが、IAM の設定がどうしてもうまくいかずにひとまず調査終了。Lambda を deploy する時に、Lambda の中で利用する S3 ファイルへの権限チェックが走るのですが、ここで弾かれてしまいました。CloudFormation での deploy が失敗するとロールバックされて IAM も消えてしまうので調査も難しく、S3 バケットへの権限を追加したりしましたがうまくいきませんでした。

CloudFormation の拡張なので、CloudFormation と並行して使えそうです。Lambda や StepFunctions などのコンポーネントが中心になる場合は利用すると良さそうです。

docs.aws.amazon.com

SQS から Lambda を呼ぶ

StepFunctions 間の連携を考えていて、SQS はどうかということで調べていました。去年から、SQS が発火させるイベントとして Lambda が使えるのでこれが便利そう。ただ、イベントドリブンにしようとするのは難しいようです。例えば「イベントA が完了したあとにイベントB, C を動かしたい」という一対多のグラフが考えられますが、SQS はあくまでもキューなので、B か C どちらかがキューから取り出すとそのイベントは消えてしまいます。イベント B, C どちらも呼び出すような Lambda を間に挟むとか、ちょっと工夫が必要です。

aws.amazon.com

https://forums.aws.amazon.com/message.jspa?messageID=857349

NoSQL まとめ

今まで見てきた資料で一番詳しくまとまっていました。Amazon DynamoDB がなんなのか調べていました。

www.slideshare.net

StepFunctions のアクティビティステート

StepFunctions はタスクを実行するものではなくて、あくまでステートの管理をするだけ、とどこかで読みましたが、StepFunctions から直接 Lambda をキックすることができるので忘れていました。しかし「アクティビティステート」という機能を経由して、StepFunctions に記述されているタスクをワーカーに処理させることができます。

docs.aws.amazon.com

例えば Athena のクエリを発行して結果を利用する場合、Lambda 自体に実行時間制限があるので実行待機することはできません。これをアクティビティステートを経由して定期的にワーカーを動かしてAthena クエリの実行結果をチェックすることができます。

Lambda を使う場合はアクティビティステートではなく、状態遷移図側でループを書いて仕舞えばよいのですが、Athena クエリを発行するたびに状態遷移図にループが挟まるのが残念ですね。。まだ実際に書いていないので、実際に使ってみます。

2019/02/19 コンウェイの法則

2月頭に箱根に行って、「ガラスの森美術館」に行こうとしたら閉館しており、隣に建っている「星の王子さまミュージアム」に行きました。そこで買ってきた星の王子様を週末読み終えたのですが、意外なバッドエンド?に驚きました。王子様が自殺する経緯について解釈が色々あるようで、そこが名作たる所以のようです。意外な結末についてぼんやり数日考えてやっと納得できました。文体が親しみやすいのに一回読んだだけだと理解できない不思議な本だなと感じます。

バラと王子様の関係はモラハラにあるという指摘も多いようです。それは正しいかなと思いつつも、バラと王子様の関係がたとえ歪だとしても唯一無二で、お互いにとって大切な存在だった、だからこそ王子様は、か弱いバラを残してきてしまい、そしておそらく亡くなっているだろうことを嘆いて天国、星空で再開したんだというのが僕の理解です。悲しいお話ですけど、悲しさばかりを伝えたい本や挿絵ではない気がしますのでそんな解釈をしてみました。

コンウェイの法則

medium.com

有名な法則ですが、今いる環境の問題がこの法則によるものかもしれないと思って興味を持ち始めました。ただ、まだ腹落ちしていません。コンウェイの法則自体が問題を引き起こしかねないのか、ただ単に現象を説明しているだけなのか・・・分野的にはソフトウェア工学なんでしょうか。開発・運用を分けることによる問題を DevOps で解決しようとしたり、横断組織を作ってみたり。何かを語るにはまだ知識が足らないですが、最近部署移動してきた身としては部署と部署の関係がソフトウェアに関係するというのはとても興味深いし、もっと知りたいです。

AWS SAM

StepFunctions はワークフローの可視化ができるのが大きなメリットで、それに対して CloudFormation を適用すると手動運用ができなくなり、都合が悪いと感じていました。しかしサーバレスアーキテクチャ用に CloudFormation を拡張した AWS SAM というものがあると知りました。(打ち合わせでサムサム言われて何のことだと思ってその場で調べました。。)

Lambda などのコンポーネントが YAML で書きやすくなっているように見受けられます、明日もう少し調べてみます。

dev.classmethod.jp

dekotech.dekokun.info

2019/02/15 Glue Job の同時実行数について

Glue Job から Spark ジョブを呼ぶことで、大規模データに対する ETL 処理をしたいなと思っているのですが、Glue Job に同時実行数の制限があることに気がつきました:

docs.aws.amazon.com

Glue Job にはテンプレート部分を記述して、実際に動かす SparkSQL はパラメータとして Glue Job に与えようと思ったのですが、同時実行数3だと苦しいです。追加料金で増やしてもらうことはできそうですが、数十・数百にまで上げてもらえるものなんでしょうか。

同時実行数は大幅に上げられなくとも、キューイングしてジョブ待機させておければ良いので、そのような解決方法が取れるか AWS Forum で質問中です。ニッチな課題かもしれないので、回答があるか不安ですが。。:

https://forums.aws.amazon.com/message.jspa?messageID=889894#889894

Glue Job の状態を見て必要に応じて sleep するという処理をしている方がいるようですが、多数のジョブが sleep だけでジョブ実行を待機するのは場当たり的すぎますし。。SQS にキューイングさせて、そこから Glue Job へ登録する何かがいて、ジョブ完了時は CloudWatch イベントを発火させて・・・というとずいぶん複雑になってしまいます。来週 AWS の方とお話する機会があるので聞いてみようと思います。

それでは良い週末を。

2019/02/12 CloudFormation

先週末、都内でキャンプをしまして、インドアな自分としてはとてもアクティブな体験でした。たき火を見ながらぼーっと話をするのもよいものです。翌朝起きたら、キャンプ場一体が雪で真っ白だったことにも驚きました。新鮮な体験でした。

AWS での運用について考えています。運用、というと前職で使っていた CloudFormation を思い出したので、これを改めて調べました:

qiita.com

codezine.jp

AWSで避けるべき5つの間違い | POSTD

postd.cc

CloudFormation でデプロイした環境には手動運用で変更を施すと、それ以降は CloudFormation で更新が掛けられなくなってしまいます。セットアップだけ使うと割り切るか、徹底して CloudFormation だけ使うか・・・でもそうすると上の記事で gumi の方がおっしゃっている

もちろん一方では、どうしても自動化できない、または自動化が難しい部分もあった。その代表的なものが「緊急対応・障害対応」だ。こちらはスピードが第一条件なので、どうしても手作業で対応せざるを得ない。その場合は、手作業で行った修正をコードに反映してインフラをリビルドする。

というのが気になります。リビルドと言っているので、やはり CloudFormation 等で環境を作り直すということなんでしょうか。私が今触っているコンポーネントは AWS Lambda や AWS Glue、StepFunction あたりが多く、インフラというよりソースコードの管理ができればよさそうなので、CloudFormation では扱いづらいかもしれません。

2019/02/06 DynamicFrame の出力スキーマを parquet-tools で確認

AWS Glue で書いた Parquet ファイルが Glue のデータカタログのスキーマと異なることが度々発生しているので、直接 Parquet ファイルのスキーマを確認できないか調べたところ、parquet-tools コマンドで確認できることがわかりました。

github.com

mvn install コマンドでインストールしたり、Maven リポジトリから jar をダウンロードして実行するなど色々方法はありますが、Mac OS 環境だと brew install できます。README には

cd parquet-tools && mvn clean package -Plocal 

でインストールするとありますが、Parquet の他のサブモジュールの成果物も必要なので、 git clone したあとに親モジュール内で mvn clean package -Plocal したほうが良さそう。そのあとも thrift コマンドがないとエラーが出たりしたので brew でインストールしましたが、Parquet が必要なバージョンと合わなかったりしたのでひとまず断念。

Maven リポジトリから jar をダウンロードできるので、少し使うだけならこれが一番よいなと思っていたのですが、エラーも吐かずにエラーコード1を返してきます。Fat jar のようなので java -jar で叩くだけだと思うんですが、、

最後に brew で入れてみると:

$ brew install parquet-tools

特にエラーもなく簡単にインストールできました。インストール後は parquet-tools コマンドとして利用できます:

$ parquet-tools schema file.snappy.parquet
message spark_schema {
  optional binary title (UTF8);
  optional binary word (UTF8);
  optional int64 num;
}

余談: Athena はある程度スキーマ差を吸収している?

例えば上のスキーマに対して Glue Catalog で numint として定義していると、 intbigint(int64) で型が異なるので、Redshift から読むとエラーになります。ただ Athena で同じようなクエリを投げてもエラーにはなりません。

structmap, struct に対して string のようなスキーマ差は Athena もエラーとなるのですが、ビット長が違う程度のスキーマ差は Athena は吸収してくれるようです。そのため、最終的に利用する方法が Redshift なのであれば、Athena ではなく Redshift でデータが読めるか確認しないといけません。

2019/02/05 AWS サービスにインストールされている boto3 のバージョン

AWS CodeCommit を利用して、Glue Job から CodeCommit からファイルを取得して集計処理をしたいなと思ったのですが、 get_file というメソッドが存在しないとエラーが起きてしまいました。

boto3.amazonaws.com

GitHub の boto3 リポジトリを見ると get_file など CodeCommit のメソッド群が追加されているのが 2018年末だったので、関連サービスで使われている boto3 のバージョンを確認しました:

  • Glue Job: 1.7.47
  • Sagemaker Sparkmagic(PySpark): 1.7.47
  • Sagemaker Sparkmagic(PySpark3): (boto3 が入っていない)
  • Lambda Python 2.7: 1.7.74
  • Lambda Python 3.6: 1.7.74
  • Lambda Python 3.7: 1.9.42

この中だと、Lambda Python 3.7 環境がもっとも新しく、CodeCommit の get_file メソッドが呼べました。Glue はそもそも Python2.7 固定なので逃げ道もないです。

Glue で実行する Python スクリプトはファイルを指定することもできるので、最新の boto3 と一緒に S3 上に配置して読み込ませることもできるかもしれません。ただ Glue Job のエディタで閲覧・編集できたほうが今の所便利なので、Lambda Python 3.7 で CodeCommit にアクセスしてファイル取得後、パラメータで Glue Job に渡そうかと思います。