/home/by-natures/dev*

データ界隈で働くエンジニアとしての技術的なメモと、たまに普通の日記。

2023/05/02 dbt テンプレート・マクロ・テスト導入, 文字コード問題(未解決)

先日書いた記事では、 CSV(UTF8) 形式のファイルを dbt-duckdb プラグインで読み込み、集計した結果を Parquet ファイルとして S3 へ保存する処理を dbt で実装しました:

bynatures.hatenadiary.jp

この時はひとまず動くことを目標にしていたので、今日は dbt の設定見直しをしていました。dbt で更に外部の DWH が必要ない dbt + DuckDB は手元でさくさく試すことが出来て楽しいです。

もくじ:

冗長な宣言のマクロ&テンプレート化

元データでは年代別の人口がカラムに分かれており、同じ処理を複数行に渡って記述していました。前回は途中諸略しましたが、1970年〜2010年で5年ごとに9ヵ年のカラムが存在します:

WITH original_data AS (
  SELECT "市区町村名"     AS CityName,
         "市区町村コード"   AS CityCode,
         "1970年"        AS Population1970,
         "1975年"        AS Population1975,
         "1980年"        AS Population1980,
         "1985年"        AS Population1985,
         "1990年"        AS Population1990,
         "1995年"        AS Population1995,
         "2000年"        AS Population2000,
         "2005年"        AS Population2005,
         "2010年"        AS Population2010
  FROM {{ source('external_source', 'population_census_jp_2014') }}
)
SELECT CityName,
       CityCode,
       CAST(REPLACE(Population1970, ',', '') AS INT) AS Population1970,
       CAST(REPLACE(Population1975, ',', '') AS INT) AS Population1975,
       CAST(REPLACE(Population1980, ',', '') AS INT) AS Population1980,
       CAST(REPLACE(Population1985, ',', '') AS INT) AS Population1985,
       CAST(REPLACE(Population1990, ',', '') AS INT) AS Population1990,
       CAST(REPLACE(Population1995, ',', '') AS INT) AS Population1995,
       CAST(REPLACE(Population2000, ',', '') AS INT) AS Population2000,
       CAST(REPLACE(Population2005, ',', '') AS INT) AS Population2005,
       CAST(REPLACE(Population2010, ',', '') AS INT) AS Population2010
FROM original_data

これを Jinja テンプレートを使ってループを組んでみました(カンマは末尾から先頭に持ってくることで、ループ終了処理を省いています):

{% set years = ["1970", "1975", "1980", "1985", "1990", "1995", "2000", "2005", "2010"] %}

WITH original_data AS (
  SELECT  "市区町村名"   AS CityName
         ,"市区町村コード" AS CityCode
         {% for year in years %}
           ,"{{year}}年" AS Population{{year}}
         {% endfor %}
  FROM {{ source('external_source', 'population_census_jp_2014') }}
)
SELECT  STRING_SPLIT(CityName, ' ')[1] AS PrefectureName
       ,STRING_SPLIT(CityName, ' ')[2] AS CityName
       ,CityCode
       {% for year in years %}
         ,{{ convert_str2int("Population"+year) }} AS Population{{year}}
       {% endfor %}
FROM original_data

プログラムとして見ると冗長性が省かれて良いはずなのですが、SQLで簡単にデータ変換処理をしようという dbt の思想からすると、本来実行される SQL とかなりかけ離れていて複雑な感じもします。業務でマクロやテンプレートを使う場合は不必要に複雑にしない&冗長性を省くバランスを取るために、ポリシー策定も必要になるかもしれません。

convert_str2intCAST(REPLACE(Population1970, ',', '') AS INT) AS Population1970 を定義にもつマクロです。Jinja のループで冗長性は十分に省けていますが、これは dbt のマクロを使ってみたかったので分離させました:

% cat macros/convert_str2int.sql
{% macro convert_str2int(column_name) %}
    CAST(REPLACE({{ column_name }}, ',', '') AS INT)
{% endmacro %}

テスト追加

テストも models フォルダ内に追加してみます。組み込みの Null チェックと、CityCode のユニーク制約チェックを設定しました:

% cat models/population_census/population_by_city.yml 
version: 2

models:
  - name: population_by_city
    columns:
      - name: PrefectureName
        tests:
          - not_null
      - name: CityName
        tests:
          - not_null
      - name: CityCode
        tests:
          - not_null
          - unique

dbt test でテスト実行をします。CI/CD を組む場合は dbt run の前に実行する感じになるでしょうか:

% dbt test
11:03:56  Running with dbt=1.4.6
11:03:56  Found 3 models, 4 tests, 0 snapshots, 0 analyses, 298 macros, 0 operations, 0 seed files, 2 sources, 0 exposures, 0 metrics
11:03:56  
11:03:56  Concurrency: 1 threads (target='dev')
11:03:56  
11:03:56  1 of 4 START test not_null_population_by_city_CityCode ......................... [RUN]
11:03:56  1 of 4 PASS not_null_population_by_city_CityCode ............................... [PASS in 0.10s]
11:03:56  2 of 4 START test not_null_population_by_city_CityName ......................... [RUN]
11:03:56  2 of 4 PASS not_null_population_by_city_CityName ............................... [PASS in 0.08s]
11:03:56  3 of 4 START test not_null_population_by_city_PrefectureName ................... [RUN]
11:03:56  3 of 4 PASS not_null_population_by_city_PrefectureName ......................... [PASS in 0.08s]
11:03:56  4 of 4 START test unique_population_by_city_CityCode ........................... [RUN]
11:03:56  4 of 4 PASS unique_population_by_city_CityCode ................................. [PASS in 0.08s]
11:03:56  
11:03:56  Finished running 4 tests in 0 hours 0 minutes and 0.60 seconds (0.60s).
11:03:56  
11:03:56  Completed successfully
11:03:56  
11:03:56  Done. PASS=4 WARN=0 ERROR=0 SKIP=0 TOTAL=4

(未解決)Shift JIS 形式 CSV ファイルの読み込み

一番解決したい問題は文字コード問題です。

DuckDB では CSV ファイルの読み込みに ENCODING オプションをつけられるのですが、これが現在は UTF-8 しか対応していません。今回利用したオープンデータは Shift JIS 形式で提供されており、これをそのまま CSV で読み込もうとするとエラーとなります:

11:09:39  Runtime Error in model population_by_city_sjis (models/population_census/population_by_city_sjis.sql)
11:09:39    Invalid Input Error: Error in file "s3://<S3_BUCKET>/file01.csv" at line 8 in column "0": Invalid unicode (byte sequence mismatch) detected in CSV file. Parser options:
11:09:39      file=s3://dbt-duckdb-test-hitoshi-tsuda/file01.csv
11:09:39      delimiter=','
11:09:39      quote='"' (auto detected)
11:09:39      escape='' (auto detected)
11:09:39      header=1
11:09:39      sample_size=20480
11:09:39      ignore_errors=0
11:09:39      all_varchar=0

公式ドキュメントでも明確に UTF8 のみを受け付けるとあり、テストコードまで存在しているので意図的にそうしていると読み取れます:

ENCODING If this option is used, its value must be UTF8. With any other encoding an error will be thrown.

CSV Import/Export - DuckDB

   def test_encoding_wrong(self, duckdb_cursor):
        with pytest.raises(duckdb.BinderException, match="Copy is only supported for UTF-8 encoded files, ENCODING 'UTF-8'"):
            rel = duckdb_cursor.read_csv(TestFile('quote_escape.csv'), encoding=";")

duckdb/test_read_csv.py at master · duckdb/duckdb · GitHub

単発での取り込みなら手元で文字コード変換をして取り込むCSVファイルを生成すればよいのですが、実業務で外部システムの出力するファイルが UTF-8 以外であるケースはあると思うので、dbt で CSVダウンロード&文字コード変換処理を入れたりできるのか、引き続き調べます。