昨日も少し触れましたが、StepFunctions から Athena を呼んで ETL 処理をさせたいと思っています。AWS の方に、以下のリポジトリが参考になると紹介していただきました:
以下の図が動作概要図です:
この図、一見分かったような気になりますが少し複雑でした。具体的には「2. Trigger based on a schedule」というところです。Step Functions の状態遷移図に対しての操作が、CloudWatch からの定期実行なんですね。CloudWatch から Lambda が起動され、状態遷移図を見に行って、実行できるものがあれば実行する、といった流れです。ですので Step Functions 自体はあくまでも状態遷移図を定義するだけで、実行してステートを動かすのはこのサンプルでは CloudWatch から定期実行された Lambda となります。
Athena との連携であれば、Athena へクエリを投げたあとに待機しないといけないのですが(Lambda に実行時間制限があるので待機はできない)、これは DynamoDB を使って実現されています。発行したクエリID を DynamoDB へ保存し、CloudWatch から定期実行される(ポーリングされる)Lambda で、DynamoDB に保存されているクエリID群を取得してステータスチェックしに行きます。
サンプルの athenarunner.py
にはハンドラが1つしかないのですが、その中で「クエリの実行」「クエリのステータス確認」どちらも行っています。 start_athena_queries
と check_athena_queries
メソッドの部分ですね:
def handler(event, context): logger.debug('*** Athena Runner lambda function starting ***') try: # Get config (including a single activity ARN) from local file config = load_config() # One round of starting Athena queries start_athena_queries(config) # One round of checking on Athena queries check_athena_queries(config) logger.debug('*** Master Athena Runner terminating ***') except Exception as e: logger.critical('*** ERROR: Athena runner lambda function failed ***') logger.critical(e.message) raise
CloudWatch を使わないクエリ状態のチェック
慣れればよいのかもしれませんが、登録とチェックを一つのハンドラで毎回ポーリングのたびに呼び出されるのがやや妙に思えました。Step Functions から Lambda を直接呼び出してクエリ登録まで行い、アクティビティタスク(CloudWatch からの Lambda でのポーリング)でクエリのステータス確認を行う、というのがシンプルな気がするなぁ、ということで、もう少しドキュメントを読んでみると、こんなことが書いてありました:
重要な質問が出るかもしれません。なぜETLランナーはStep Functionsのステートマシンから独立して実行され、タスクをポーリングするのですか? 代わりにStep FunctionsステートマシンからAWS Lambdaを直接呼び出すことはできませんか? その後、その機能を開始し、完了するまでETLジョブを監視することはできませんか?
その答えは、AWS Lambdaは、1リクエストあたり最大実行時間が300秒、つまり5分であることです。(中略)CloudWatchイベントを通じてポーリングスケジュールを管理したくない場合は、ETLワークフローのステートマシンにポーリングループを実装できます。
CloudWatch でのポーリングではなく、状態遷移図側でループを書いてもよい、ということですね。クエリを実行するごとに状態遷移図が複雑になりますが、大規模な状態遷移図でなければこちらの方が楽かもしれません。