ほんとの手

主に開発や仕事のメモクリップ

dbt で Python UDF をテストファーストで作成する

dbt で Python UDF をテストファーストで作成する方法についての記事です。データウェアハウスは Snowflake で動作確認しています。

作成

macro で UDF を作成する

dbt で UDF を作成する方法について調べると、コミュニティの記事が見つかります。この内容を踏襲し、UDF を作成するマクロを作成してこれを実行することで dbt 実行時に UDF を自動で作成できるようにします。

// macro/create_function/python/create_function_add_col.sql

{% macro create_function_add_col() %}
create or replace function {{target.schema}}.f_py_add_col(
  json_str varchar,
  col_name varchar,
  col_value varchar
)
  returns varchar
  language python
  runtime_version = '3.8'
  handler = 'main'
as $$

import json

def main(json_str, col_name):
  try:
    obj = json.loads(json_str)
  except:
    return None

  obj[col_name] = 'col_value'
  return json.dumps(obj)

$$;
{% endmacro %}
  • macros/create_function/python/ 配下にマクロファイルを作成する(フォルダ名はお好み)
  • {{ target.schema }} で作成先のスキーマを指定する
  • create or replace function にして、関数が存在していても必ず置き換えるようにする
  • UDF の定義の記述については DWH の仕様に従う
    • snowflake だと、runtime_version の指定が必須、とかがあった
    • パラメータ、戻り値を varchar でやり取りしているが、VARIANT でもいいかも

pre_hook で UDF を利用するモデルの前処理で関数を作成する

UDF を利用する前に上記の macro を実行して関数を定義しておく必要がありますが、dbt_project.yml の on-run-start ではなく利用するモデルの config で pre_hook でマクロ実行することにより事前の定義を行うことにします。どちらが良いかは諸説ありそうですが、私の場合はどのモデルで利用しているかの関連性が明確になるようにモデル内の config で定義するようにしています。

// models/ ... /<model_name>.sql

{{ config(pre_hook=[create_function_add_col()]) }}

...

テスト

singular テストで、テストファーストで関数を実装する

UDF の実装はデータの準備、関数の定義、クエリの実行、UDF 内の関数の実行と複数のステップがあり、一つずつ挙動を確認しながら実装していくと時間が掛かります。そこで、dbt の singular テストを先に作成してテストが通るように実装していくことで実行→修正のサイクルを短く早く回していけるようにします。

// tests/singular/macros/create_function/python/create_function_add_col.sql

{# NOTE: 事前に create_function_add_col() の実行が必要 #}

{% set inputs = [
    '{"col_1": "a"}',
    '{"col_1": "a", "col_2": "b"}'
] %}

with

test1_src as (
    {# test: 値が1つある #}
    select
        '{{ inputs[0] }}'::varchar as input,
        '{"col_1": "a", "col_a": "col_value"}'::varchar as expected

    {# test: 値が2つある #}
    union all select
        '{{ inputs[1] }}'::varchar as input,
        '{"col_1": "a", "col_2": "b", "col_a": "col_value"}'::varchar as expected
),

test1_result as (
    select
        *,
        {{target.schema}}.f_py_add_col(input, 'col_a') as result
    from test1_src
    where
        coalesce(result, '') != expected
),

final as (
    select * from test1_result
    -- 他のテストを追加した場合は最後に union する
    -- union all
    -- select * from test2_result
)

select * from final
  • tests/singular/macros/create_function/python/ 配下に singular テストファイルを作成する(フォルダ名はお好み)
  • 入力するデータは JSON 文字列でベタ書きする。jinja を使って配列化するなど効率よく記述してもよい
  • CTE で分けて、src に input, expected、result に result の値を記述して UDF の入力値、期待する返り値、実際の結果を出力する。結果が一致しない ( != expected ) 場合は singular テストが fail する
  • dbt test -s create_function_add_col を実行し、テストを修正・追加したり UDF を修正したりして実装を進めていく

モデルで UDF を利用する

UDF の実装が完了したら、モデル側で利用します。

// models/ ... /<model_name>.sql

{{ config(pre_hook=[create_function_add_col()]) }}

with
import as (select * from {{ ref('ref_model') }}),

add_col as (
    select
        *,
        parse_json({{target.schema}}.f_py_add_col(col_1, 'new_col')) as col_2
    from import
),

extracted as (
    select
        t1.*,
        (row_number() over (partition by t1.id order by t2.value::varchar)) + 1 as index,
        t2.value::varchar as t2_val
    from
        add_col t1,
        lateral flatten(INPUT => t1.col_2) as t2
)

select * from extracted
  • UDF 実行時も頭に {{ target.schema }} をつけてスキーマを指定する
  • VARIANT を lateral flatten を使って複数行に展開する
  • 展開元のデータの順序を扱いたい場合、row_number() を使って行番号を出力する

dbt で UDF を作成・利用すると、モデルと同様にコード管理できるようになったりテストが書きやすくなってよいですね。dbt での UDF 利用のご紹介でした。