/home/by-natures/dev*

データ界隈で働くエンジニアとしての技術的なメモと、たまに普通の日記。

2019/03/12 DynamicFrame の書き出し

Glue の書き出しは結局 "from_options" で

Glue Job による DynamicFrame のデータ書き出し方法には書き出し方法がいくつかあって、 from_options を使っていたのですが、ふとドキュメントを見ていると from_catalog というメソッドが。

Glue Job を使う以上、Glue DataCatalog に寄せたい気持ちがあります。現状は from_options で Parquet ファイルを書き出してそれを DataCatalog 経由で読む…ということをしていますが、DataCatalog に存在するテーブルに追記し、DataCatalog からデータを読み込む、となると運用がシンプルで分かりやすいです。

しかし少し触ってみたところ、パーティション指定がある場合(from_catalog を使いたいタイミングだと、大抵パーティションはあると思うんですが)は from_options をお勧めします。というのも

  • from_catalog を使って書き込んでも、テーブルのパーティション情報は無視され、S3 の保存場所のルートに直接出力されてしまう
  • additional_options というパラメータがあり、そこに pathpartitionKeys を指定することは可能。ただしその場合はテーブルの保存場所は無視される
  • データ保存に際し、テーブルとの型チェックが行われることはない様子(int, bigint で確認したのみですが)

という挙動をして、変に from_catalog を使うよりは from_options で細かく制御できた方がよいなと思った次第です。

正しい使い方はこうだ、というのがあればご指摘ください。

メモ

調べてたリンクです、メモのみ

stackoverflow.com

createOrReplaceTempView した瞬間に RDD としてキャッシュされるのかと思っていましたが、明示的に cache 指定しないとそうはならないようです。しかも lazy とあるので、どちらにせよ createOrReplaceTempView には何も起こらず、処理が行われた場合にキャッシュするかどうか、ということのようです。

blog.codeship.com

Beanstalk VS CodeDeploy, みたいな記事が多いですが、CodeDeploy を使って Beanstalk をリリースできる、といった内容です。

2019/03/06 AWS での ETL 方法 (Glue Job / Athena)、Glue での ETL 基盤構築事例

AWS などのクラウドサービスは多くのサービスが提供されているので、それを組み合わせるだけで目的のシステムが構築できるかというと、似たようなサービスがあってどちらを使うべきか判断に迷う場面も多く、調査に時間を要すると感じます。今は ETL 処理に何を使ったら良いか調査していますが、AWS の方にも尋ねたりして、AWS では Athena(Presto), Glue Job(Spark), Redshift, あとは EMR などで Hadoop クラスタを構築してその上でジョブを走らせる、など様々な方法があることが分かりました。

大規模データが扱えるマネージドサービスに絞ると Athena か Glue Job となりますが、Athena はあくまでも分析をインタラクティブに行えるサービスで、中小規模の ETL には耐えますが大規模となると厳しいようです。パフォーマンスはともかく、1アカウントあたり同時実行数が20に限定されているので立ち行かなくなる画面も出てくるかと思います。

一方 Glue は「完全マネージド型 ETL」と謳っていることから、大規模データに対する ETL も実用的です。色々なリソース制限はありますが必要に応じて緩和が可能なようです。この比較は裏の中心となるアプリケーションが Presto か Spark か、というところでも納得できます。

ということで先日は Athena について色々調べていましたが、今後は Glue を中心に調査する予定です。Athena はすぐにクエリが書けるし、分析用の環境と ETL でデータ生成する環境が同じになると運用上便利かなと思っていたのですが、用途に適したサービスとして Glue 選び、運用を Glue に寄せた方がスケーラビリティ・コスト面で利が大きそうです。

ただ私の探し方が悪かったのか、Glue の活用事例があまり見つからなかったので AWS の方に教えていただきました。ETL 祭り(すごい名前)というイベントが2018年にあったようで、そちらの資料から。

AWS ETL祭り - AWS Glue活用事例@primeNumber

speakerdeck.com

Glue を使って、現 Hadoop 環境(on EC2)のリプレイスをされています。Lambda Architecture に沿って Glue を実装しているのが簡潔で分かりやすい説明でした。コスト面も Hadoop クラスタを EC2 上に構築している状態からは75%カットと、かなり削減できたとのことです。

以前は Hadoop 専属エンジニアがいて運用・管理していたようなのですが、Glue を使ったアーキテクチャではこの「問題」が解決されたと紹介されています。「Hadoop エンジニアに頼らない開発ができるように」とまとめられています。資料からはスケールアウト性において Hadoop エンジニアの負荷が高そうだったり、リアルタイムデータの活用に着手できていないといった課題が挙げられていて、Hadoop エンジニアは数も少ないでしょうし開発リソースがボトルネックになっていたのかなと見受けられます。それを Glue などのマネージドサービスを利用することで回避したのかと思いました。

Architecting a data lake with Amazon S3, Amazon Kinesis, AWS Glue and Amazon Athena

こちらは Atlassian 社の事例です。Atlassian 社は自社のデータレイクのことを Socrates と呼んでいて、そのアーキテクチャの紹介がされています。ただプレゼンテーション用の資料なので、資料だけだとよく分かりません。Youtube に発表動画も上がっていたので合わせて見ると分かりやすいです:

スライド

www.slideshare.net

動画

www.youtube.com

雑記

動画が1時間もあるので、私なりに要点をまとめました。飛ばし飛ばしだったり、興味あるところは戻ってじっくりみている箇所もあったりして、包括的ではないのでご了承ください。

発表は2人の登壇者が行なっていて、前半の一人はデータレイクやETL処理の大枠について、後半の一人は Atlassian 社の実際の具体例について説明しています。

前半は DataCatalog についてわりと時間を割いて説明しています。DataCatalog は Hive Metastore の拡張として提供されているメタデータ管理システムです。S3 に溜めたデータをさまさまなサービス(Athena, EMR, Redshift, etc)から利用できますが、中央集権的に管理されている DataCatalog を参照することでデータを扱いやすくしているとのことです。S3, DataCatalog の2つはデータレイク構築に必須で、他のデータ処理サービスは必要に応じて使い分けると説明されています(22ページ目、発表だと13分過ぎ)

後半は具体例です。Atlassian 社が2017年末時点で約2年間運用してきた社内向けのデータレイク Socrates についての紹介で、エンジニアは9名おり、ウィークリーアクティブユーザーで1000人(ユーザ自体は2000人前後)いるということで、かなり多くのユーザが利用しているデータレイクのようです。発表は Ingest, Prepare, Organize, Discover の4つに分け、それぞれの課題が3つずつと、その解決方法を紹介しています。

Ingest のところでは、当初は REST API でデータをデータレイクに入れていた方法から、"Stream Hub" を構築した流れが紹介されています。また、Kinesis を利用したデータ検証(validation) と、最初に入ってきた(landed)データを圧縮、データフォーマット変換する流れを EMR で構築しています。

Prepare の課題の1つにデータエンジニアへの依存が問題として上がっています。テーブル作成、データ作成のスケジューリングなど、アナリストもデータサイエンティストも、その他データを扱う人々が様々なことをデータエンジニアに依存するようになり、次第にボトルネックとなっていきます。

Discover の課題は分かりやすくて、用途に適した可視化ツールが要望されていること、突発的な高負荷クエリでデータ分析が行えなくなること、どのテーブルが信頼できるものかというデータの信頼性を担保したい、という3つです。2つめは Presto クラスタを廃止して Athena へ移行することで、使用していないリソースに対してコストを払う必要がなくなりました。高負荷クエリが他のクエリ実行に影響を及ぼすことも Athena なら無くなるでしょうし、運用負荷も下がったようです。Athena に対しては課題も3つ紹介されていて、"Early Adopter" だったので Presto との比較情報がなかったこと、鍵認証を利用した JDBC 接続しか対応していないこと(すみませんあまり理解していないです、、)、コスト管理が手間であることが挙げられています。

EMR や Presto などを AWS 上でクラスタ構築している状態から、AWS が同等のマネージドサービスを提供した段階で移行を検討し、それを実行しているんだなという印象です。質疑応答の質問は動画だと聞こえないのですが、、受け答えの中では、実際の運用では理想通りではなくてクラスタ平行稼働したりしている、というような答えがあります。

2019/02/27 Athena の結果を Parquet 形式で出力したい

今日は Athena でのクエリ結果の出力方法についてです。

ユースケースがハマるなら CTAS で、データサイズが小さいなら Python 側で処理して、そうでなければ現状は Athena を使うのは難しそうです。データの加工に Glue Job を使えば、SparkSQL を発行して Parquet ファイルを S3 に出力できるので、私の場合は Glue Job が適切そうです。

Athena 側で処理:CTAS

dev.classmethod.jp

Athena で CREATE TABLE AS という、クエリ結果を元にテーブル作成する機能が2018年10月頃に追加されました。

docs.aws.amazon.com

これを実行すると、Glue データカタログにテーブルが追加され、S3 にファイルが書き込まれます。WITH 句を使うことで出力フォーマットもテキストから Parquet へ変更できるなど使い勝手が良いです。ただ、今回はパーティションを日毎に追加していきたかったのですが、CTAS だとすでに同名のテーブルが存在している場合はエラーになってしまいました。

Python 側で処理:Dativa 社の dativatools

Dativa という会社が、Athena のクライアントを拡張して、まさに上に記載したことを実現しているライブラリを提供していました。pip でインストールできます。ただ Python のライブラリで実現

www.dativa.com

Athena を ETL で使うのは諦める

Athena を使う用途って大規模データに対して集計したいという場合が多いと思うのですが、それに対して形式を指定して出力する術が非常に限られているのが現状のようです。ETL 用途で利用する場合は柔軟性が高い Glue Job を使うのが私が調べた限りは良さそうです。EMR クラスタを自前で立てて・・・というのも選択肢かもしれません。

2019/02/26 StepFunctions + Athena 動作の読み解き

昨日も少し触れましたが、StepFunctions から Athena を呼んで ETL 処理をさせたいと思っています。AWS の方に、以下のリポジトリが参考になると紹介していただきました:

github.com

以下の図が動作概要図です:

f:id:bynatures:20190226181513p:plain

この図、一見分かったような気になりますが少し複雑でした。具体的には「2. Trigger based on a schedule」というところです。Step Functions の状態遷移図に対しての操作が、CloudWatch からの定期実行なんですね。CloudWatch から Lambda が起動され、状態遷移図を見に行って、実行できるものがあれば実行する、といった流れです。ですので Step Functions 自体はあくまでも状態遷移図を定義するだけで、実行してステートを動かすのはこのサンプルでは CloudWatch から定期実行された Lambda となります。

Athena との連携であれば、Athena へクエリを投げたあとに待機しないといけないのですが(Lambda に実行時間制限があるので待機はできない)、これは DynamoDB を使って実現されています。発行したクエリID を DynamoDB へ保存し、CloudWatch から定期実行される(ポーリングされる)Lambda で、DynamoDB に保存されているクエリID群を取得してステータスチェックしに行きます。

サンプルの athenarunner.py にはハンドラが1つしかないのですが、その中で「クエリの実行」「クエリのステータス確認」どちらも行っています。 start_athena_queriescheck_athena_queries メソッドの部分ですね:

def handler(event, context):

    logger.debug('*** Athena Runner lambda function starting ***')

    try:

        # Get config (including a single activity ARN) from local file
        config = load_config()

        # One round of starting Athena queries
        start_athena_queries(config)

        # One round of checking on Athena queries
        check_athena_queries(config)

        logger.debug('*** Master Athena Runner terminating ***')

    except Exception as e:
        logger.critical('*** ERROR: Athena runner lambda function failed ***')
        logger.critical(e.message)
        raise
CloudWatch を使わないクエリ状態のチェック

慣れればよいのかもしれませんが、登録とチェックを一つのハンドラで毎回ポーリングのたびに呼び出されるのがやや妙に思えました。Step Functions から Lambda を直接呼び出してクエリ登録まで行い、アクティビティタスク(CloudWatch からの Lambda でのポーリング)でクエリのステータス確認を行う、というのがシンプルな気がするなぁ、ということで、もう少しドキュメントを読んでみると、こんなことが書いてありました:

aws.amazon.com

重要な質問が出るかもしれません。なぜETLランナーはStep Functionsのステートマシンから独立して実行され、タスクをポーリングするのですか? 代わりにStep FunctionsステートマシンからAWS Lambdaを直接呼び出すことはできませんか? その後、その機能を開始し、完了するまでETLジョブを監視することはできませんか?

その答えは、AWS Lambdaは、1リクエストあたり最大実行時間が300秒、つまり5分であることです。(中略)CloudWatchイベントを通じてポーリングスケジュールを管理したくない場合は、ETLワークフローのステートマシンにポーリングループを実装できます。

CloudWatch でのポーリングではなく、状態遷移図側でループを書いてもよい、ということですね。クエリを実行するごとに状態遷移図が複雑になりますが、大規模な状態遷移図でなければこちらの方が楽かもしれません。

2019/02/25 AWS サーバレスアーキテクチャ諸々

先週から調べていたことの簡単なまとめです。

AWS SAM

サーバレスアーキテクチャに適した形で CloudFormation を使えるようにする拡張です。SAM = Serverless Application Model。サンプルを動かしていたのですが、IAM の設定がどうしてもうまくいかずにひとまず調査終了。Lambda を deploy する時に、Lambda の中で利用する S3 ファイルへの権限チェックが走るのですが、ここで弾かれてしまいました。CloudFormation での deploy が失敗するとロールバックされて IAM も消えてしまうので調査も難しく、S3 バケットへの権限を追加したりしましたがうまくいきませんでした。

CloudFormation の拡張なので、CloudFormation と並行して使えそうです。Lambda や StepFunctions などのコンポーネントが中心になる場合は利用すると良さそうです。

docs.aws.amazon.com

SQS から Lambda を呼ぶ

StepFunctions 間の連携を考えていて、SQS はどうかということで調べていました。去年から、SQS が発火させるイベントとして Lambda が使えるのでこれが便利そう。ただ、イベントドリブンにしようとするのは難しいようです。例えば「イベントA が完了したあとにイベントB, C を動かしたい」という一対多のグラフが考えられますが、SQS はあくまでもキューなので、B か C どちらかがキューから取り出すとそのイベントは消えてしまいます。イベント B, C どちらも呼び出すような Lambda を間に挟むとか、ちょっと工夫が必要です。

aws.amazon.com

https://forums.aws.amazon.com/message.jspa?messageID=857349

NoSQL まとめ

今まで見てきた資料で一番詳しくまとまっていました。Amazon DynamoDB がなんなのか調べていました。

www.slideshare.net

StepFunctions のアクティビティステート

StepFunctions はタスクを実行するものではなくて、あくまでステートの管理をするだけ、とどこかで読みましたが、StepFunctions から直接 Lambda をキックすることができるので忘れていました。しかし「アクティビティステート」という機能を経由して、StepFunctions に記述されているタスクをワーカーに処理させることができます。

docs.aws.amazon.com

例えば Athena のクエリを発行して結果を利用する場合、Lambda 自体に実行時間制限があるので実行待機することはできません。これをアクティビティステートを経由して定期的にワーカーを動かしてAthena クエリの実行結果をチェックすることができます。

Lambda を使う場合はアクティビティステートではなく、状態遷移図側でループを書いて仕舞えばよいのですが、Athena クエリを発行するたびに状態遷移図にループが挟まるのが残念ですね。。まだ実際に書いていないので、実際に使ってみます。

2019/02/19 コンウェイの法則

2月頭に箱根に行って、「ガラスの森美術館」に行こうとしたら閉館しており、隣に建っている「星の王子さまミュージアム」に行きました。そこで買ってきた星の王子様を週末読み終えたのですが、意外なバッドエンド?に驚きました。王子様が自殺する経緯について解釈が色々あるようで、そこが名作たる所以のようです。意外な結末についてぼんやり数日考えてやっと納得できました。文体が親しみやすいのに一回読んだだけだと理解できない不思議な本だなと感じます。

バラと王子様の関係はモラハラにあるという指摘も多いようです。それは正しいかなと思いつつも、バラと王子様の関係がたとえ歪だとしても唯一無二で、お互いにとって大切な存在だった、だからこそ王子様は、か弱いバラを残してきてしまい、そしておそらく亡くなっているだろうことを嘆いて天国、星空で再開したんだというのが僕の理解です。悲しいお話ですけど、悲しさばかりを伝えたい本や挿絵ではない気がしますのでそんな解釈をしてみました。

コンウェイの法則

medium.com

有名な法則ですが、今いる環境の問題がこの法則によるものかもしれないと思って興味を持ち始めました。ただ、まだ腹落ちしていません。コンウェイの法則自体が問題を引き起こしかねないのか、ただ単に現象を説明しているだけなのか・・・分野的にはソフトウェア工学なんでしょうか。開発・運用を分けることによる問題を DevOps で解決しようとしたり、横断組織を作ってみたり。何かを語るにはまだ知識が足らないですが、最近部署移動してきた身としては部署と部署の関係がソフトウェアに関係するというのはとても興味深いし、もっと知りたいです。

AWS SAM

StepFunctions はワークフローの可視化ができるのが大きなメリットで、それに対して CloudFormation を適用すると手動運用ができなくなり、都合が悪いと感じていました。しかしサーバレスアーキテクチャ用に CloudFormation を拡張した AWS SAM というものがあると知りました。(打ち合わせでサムサム言われて何のことだと思ってその場で調べました。。)

Lambda などのコンポーネントが YAML で書きやすくなっているように見受けられます、明日もう少し調べてみます。

dev.classmethod.jp

dekotech.dekokun.info

2019/02/15 Glue Job の同時実行数について

Glue Job から Spark ジョブを呼ぶことで、大規模データに対する ETL 処理をしたいなと思っているのですが、Glue Job に同時実行数の制限があることに気がつきました:

docs.aws.amazon.com

Glue Job にはテンプレート部分を記述して、実際に動かす SparkSQL はパラメータとして Glue Job に与えようと思ったのですが、同時実行数3だと苦しいです。追加料金で増やしてもらうことはできそうですが、数十・数百にまで上げてもらえるものなんでしょうか。

同時実行数は大幅に上げられなくとも、キューイングしてジョブ待機させておければ良いので、そのような解決方法が取れるか AWS Forum で質問中です。ニッチな課題かもしれないので、回答があるか不安ですが。。:

https://forums.aws.amazon.com/message.jspa?messageID=889894#889894

Glue Job の状態を見て必要に応じて sleep するという処理をしている方がいるようですが、多数のジョブが sleep だけでジョブ実行を待機するのは場当たり的すぎますし。。SQS にキューイングさせて、そこから Glue Job へ登録する何かがいて、ジョブ完了時は CloudWatch イベントを発火させて・・・というとずいぶん複雑になってしまいます。来週 AWS の方とお話する機会があるので聞いてみようと思います。

それでは良い週末を。