/home/by-natures/dev*

データ界隈で働くエンジニアとしての技術的なメモと、たまに普通の日記。

2024/06/28 Prefect のキャッシュ削除について

ブログを書くのがご無沙汰になってしまいました。6月から新しい環境でデータエンジニア/マネージャーという形で働いており、システム面や事業内容のキャッチアップをしている段階です。私にとっては新しいツールが多く、楽しさ半分、早くキャッチアップして戦力にならないと・・・というのがもう半分という具合です。

転職記など書こうと思っていたのですが、以前の会社はリモートワークが多かったことと数ヶ月から1年程度のプロジェクトで働いていたため、転職して心機一転という感じが薄くて文章にするほどでもないかなと思っています。ただ後で見返すと当時考えていたことが言語化されていて面白かったりするので、気が向いたら何か書くかもしれません。

在宅勤務の日は小ネタでも良いのでブログでも書こうかと思っており、今日は Prefect から小ネタです。

検証環境

prefect == 2.19.4

Prefect のワーカーは PREFECT_LOCAL_STORAGE_PATH 配下に結果をキャッシュする

PREFECT_LOCAL_STORAGE_PATH という変数があります。デフォルトでは ~/.prefect/storage に設定されていますが、各 Flow Run の結果キャッシュや、リトライのための情報が保存されているようです。

Profiles and Configuration - Prefect Docs

(抜粋) The PREFECT_LOCAL_STORAGE_PATH value specifies the default location of local storage for flow runs.

試しに以下のフローを実行してキャッシュが生成されるかを確かめます。定数を返すタスクを2つ繋げただけのフローです:

$ cat cache_test.py 
from prefect import flow, task
from prefect.tasks import task_input_hash

@task(cache_key_fn=task_input_hash)
def task1():
    print(f"task1 is being called")
    return 1

@task(cache_key_fn=task_input_hash)
def task2():
    print(f"task2 is being called")
    return 2

@flow(retries=1, retry_delay_seconds=3, log_prints=True)
def execute_flow():
    return task1() + task2()

if __name__ == "__main__":
    print(execute_flow())

Prefect Cloud にログインした後、上記フローを実行します:

$ python cache_test.py 
00:21:02.849 | INFO    | prefect.engine - Created flow run 'psychedelic-viper' for flow 'execute-flow'
00:21:02.854 | INFO    | Flow run 'psychedelic-viper' - View at https://app.prefect.cloud/account/...(省略)
00:21:03.668 | INFO    | Flow run 'psychedelic-viper' - Created task run 'task1-0' for task 'task1'
00:21:03.670 | INFO    | Flow run 'psychedelic-viper' - Executing 'task1-0' immediately...
00:21:05.056 | INFO    | Task run 'task1-0' - task1 is being called
00:21:05.409 | INFO    | Task run 'task1-0' - Finished in state Completed()
00:21:05.700 | INFO    | Flow run 'psychedelic-viper' - Created task run 'task2-0' for task 'task2'
00:21:05.702 | INFO    | Flow run 'psychedelic-viper' - Executing 'task2-0' immediately...
00:21:06.329 | INFO    | Task run 'task2-0' - task2 is being called
00:21:06.641 | INFO    | Task run 'task2-0' - Finished in state Completed()
00:21:06.939 | INFO    | Flow run 'psychedelic-viper' - Finished in state Completed()
3

task1 is being called, task2 is being called がログ出力されており、各タスクが実行されたことが分かります。

各タスクには cache_key_fn パラメータをつけており、引数のハッシュ値に応じて結果をキャッシュするようにしているため、 PREFECT_LOCAL_STORAGE_PATH に該当する下記ディレクトリにはキャッシュが新たに保存されています:

$ ls ~/.prefect/storage/
798ed2da332646e8ad4f79d6898b72bc  d835d99101ad4ae891b7c4056c5d1bdb

フローを再実行すると、結果がキャッシュから取得されたことを示すログ Finished in state Cached(type=COMPLETED) が出力されていることが分かります:

$ python cache_test.py 
00:21:28.433 | INFO    | prefect.engine - Created flow run 'paper-lionfish' for flow 'execute-flow'
00:21:28.437 | INFO    | Flow run 'paper-lionfish' - View at https://app.prefect.cloud/account/...(省略)
00:21:29.212 | INFO    | Flow run 'paper-lionfish' - Created task run 'task1-0' for task 'task1'
00:21:29.213 | INFO    | Flow run 'paper-lionfish' - Executing 'task1-0' immediately...
00:21:30.177 | INFO    | Task run 'task1-0' - Finished in state Cached(type=COMPLETED)
00:21:30.719 | INFO    | Flow run 'paper-lionfish' - Created task run 'task2-0' for task 'task2'
00:21:30.720 | INFO    | Flow run 'paper-lionfish' - Executing 'task2-0' immediately...```
00:21:31.038 | INFO    | Task run 'task2-0' - Finished in state Cached(type=COMPLETED)
00:21:31.566 | INFO    | Flow run 'paper-lionfish' - Finished in state Completed()
3

キャッシュを直接消すとエラーになる

この PREFECT_LOCAL_STORAGE_PATH ですが、以下のスレッドで Prefect の公式担当者より消しても問題ないという趣旨の回答があります:

PREFECT_LOCAL_STORAGE_PATH keeps growing - Archive - Prefect Community

試しにキャッシュを消して、フローを再実行してみましょう(結果は見やすさや匿名化のため一部省略):

$ rm -rf ~/.prefect/storage
$ python cache_test.py
00:35:06.197 | INFO    | prefect.engine - Created flow run 'famous-buzzard' for flow 'execute-flow'
00:35:06.200 | INFO    | Flow run 'famous-buzzard' - View at https://app.prefect.cloud/account/...
00:35:07.144 | INFO    | Flow run 'famous-buzzard' - Created task run 'task1-0' for task 'task1'
00:35:07.145 | INFO    | Flow run 'famous-buzzard' - Executing 'task1-0' immediately...
00:35:08.145 | INFO    | Task run 'task1-0' - Finished in state Cached(type=COMPLETED)
00:35:08.395 | ERROR   | Flow run 'famous-buzzard' - Encountered exception during execution:
Traceback (most recent call last):
  File "/path/to/prefect/env/lib/python3.9/site-packages/prefect/engine.py", line 877, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/path/to/prefect/env/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
  File "/path/to/prefect/env/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/path/to/prefect/task_restart_test.py", line 16, in execute_flow
    return task1() + task2()
...
  File "/path/to/prefect/env/lib/python3.9/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/path/to/prefect/env/lib/python3.9/site-packages/prefect/states.py", line 98, in _get_state_result
    result = await state.data.get()
  File "/path/to/prefect/env/lib/python3.9/site-packages/prefect/client/utilities.py", line 100, in with_injected_client
    return await fn(*args, **kwargs)
  File "/path/to/prefect/env/lib/python3.9/site-packages/prefect/results.py", line 630, in get
    blob = await self._read_blob(client=client)
  File "/path/to/prefect/env/lib/python3.9/site-packages/prefect/client/utilities.py", line 100, in with_injected_client
    return await fn(*args, **kwargs)
  File "/path/to/prefect/env/lib/python3.9/site-packages/prefect/results.py", line 645, in _read_blob
    content = await storage_block.read_path(self.storage_key)
  File "/path/to/prefect/env/lib/python3.9/site-packages/prefect/filesystems.py", line 220, in read_path
    raise ValueError(f"Path {path} does not exist.")
ValueError: Path ~/.prefect/storage/798ed2da332646e8ad4f79d6898b72bc does not exist.

何やらエラーが出てしまいました。 Finished in state Cached(type=COMPLETED) がログ出力されていることからキャッシュヒットしたことが伺えますが、実際のキャッシュファイルは削除されているため、エラーが発生しています。

エラーを追うと、どうやら prefect/src/prefect/states.pyステートファイルから前回の処理結果を読み取ろうとしています。 つまりステート自体はどこか別のところ・・・恐らく Prefect Cloud 側で管理されているものと思われます。

解決策:キャッシュのリフレッシュオプションをつけて再実行

管理画面から Flow Run, Task Run など色々な情報を削除して再実行しましたが上記エラーは解消しませんでした。今のところ唯一成功したのは、キャッシュクリアの方法を用いて再実行し、キャッシュを更新する方法です

$ export PREFECT_TASKS_REFRESH_CACHE=true
$ python cache_test.py 
01:06:15.147 | INFO    | prefect.engine - Created flow run 'giga-flounder' for flow 'execute-flow'
01:06:15.151 | INFO    | Flow run 'giga-flounder' - View at https://app.prefect.cloud/account/...
01:06:16.179 | INFO    | Flow run 'giga-flounder' - Created task run 'task1-0' for task 'task1'
01:06:16.181 | INFO    | Flow run 'giga-flounder' - Executing 'task1-0' immediately...
01:06:17.488 | INFO    | Task run 'task1-0' - task1 is being called
01:06:17.767 | INFO    | Task run 'task1-0' - Finished in state Completed()
01:06:18.012 | INFO    | Flow run 'giga-flounder' - Created task run 'task2-0' for task 'task2'
01:06:18.013 | INFO    | Flow run 'giga-flounder' - Executing 'task2-0' immediately...
01:06:18.493 | INFO    | Task run 'task2-0' - task2 is being called
01:06:18.752 | INFO    | Task run 'task2-0' - Finished in state Completed()
01:06:19.031 | INFO    | Flow run 'giga-flounder' - Finished in state Completed()
3

今回はわざとキャッシュを削除したため起きたエラーでしたが、サーバ上の容量逼迫などでキャッシュファイルを削除する運用になっていると、もしかしたら運用上この問題が発生するかもしれません。

API は全て見れていないので、もしかしたら API から Prefect Cloud 側のステート削除をする方法があるかもしれません。もしご存知の方がいれば教えてください。