/home/by-natures/dev*

データ界隈で働くエンジニアとしての技術的なメモと、たまに普通の日記。

2019/02/26 StepFunctions + Athena 動作の読み解き

昨日も少し触れましたが、StepFunctions から Athena を呼んで ETL 処理をさせたいと思っています。AWS の方に、以下のリポジトリが参考になると紹介していただきました:

github.com

以下の図が動作概要図です:

f:id:bynatures:20190226181513p:plain

この図、一見分かったような気になりますが少し複雑でした。具体的には「2. Trigger based on a schedule」というところです。Step Functions の状態遷移図に対しての操作が、CloudWatch からの定期実行なんですね。CloudWatch から Lambda が起動され、状態遷移図を見に行って、実行できるものがあれば実行する、といった流れです。ですので Step Functions 自体はあくまでも状態遷移図を定義するだけで、実行してステートを動かすのはこのサンプルでは CloudWatch から定期実行された Lambda となります。

Athena との連携であれば、Athena へクエリを投げたあとに待機しないといけないのですが(Lambda に実行時間制限があるので待機はできない)、これは DynamoDB を使って実現されています。発行したクエリID を DynamoDB へ保存し、CloudWatch から定期実行される(ポーリングされる)Lambda で、DynamoDB に保存されているクエリID群を取得してステータスチェックしに行きます。

サンプルの athenarunner.py にはハンドラが1つしかないのですが、その中で「クエリの実行」「クエリのステータス確認」どちらも行っています。 start_athena_queriescheck_athena_queries メソッドの部分ですね:

def handler(event, context):

    logger.debug('*** Athena Runner lambda function starting ***')

    try:

        # Get config (including a single activity ARN) from local file
        config = load_config()

        # One round of starting Athena queries
        start_athena_queries(config)

        # One round of checking on Athena queries
        check_athena_queries(config)

        logger.debug('*** Master Athena Runner terminating ***')

    except Exception as e:
        logger.critical('*** ERROR: Athena runner lambda function failed ***')
        logger.critical(e.message)
        raise
CloudWatch を使わないクエリ状態のチェック

慣れればよいのかもしれませんが、登録とチェックを一つのハンドラで毎回ポーリングのたびに呼び出されるのがやや妙に思えました。Step Functions から Lambda を直接呼び出してクエリ登録まで行い、アクティビティタスク(CloudWatch からの Lambda でのポーリング)でクエリのステータス確認を行う、というのがシンプルな気がするなぁ、ということで、もう少しドキュメントを読んでみると、こんなことが書いてありました:

aws.amazon.com

重要な質問が出るかもしれません。なぜETLランナーはStep Functionsのステートマシンから独立して実行され、タスクをポーリングするのですか? 代わりにStep FunctionsステートマシンからAWS Lambdaを直接呼び出すことはできませんか? その後、その機能を開始し、完了するまでETLジョブを監視することはできませんか?

その答えは、AWS Lambdaは、1リクエストあたり最大実行時間が300秒、つまり5分であることです。(中略)CloudWatchイベントを通じてポーリングスケジュールを管理したくない場合は、ETLワークフローのステートマシンにポーリングループを実装できます。

CloudWatch でのポーリングではなく、状態遷移図側でループを書いてもよい、ということですね。クエリを実行するごとに状態遷移図が複雑になりますが、大規模な状態遷移図でなければこちらの方が楽かもしれません。