1
/
5

【TECH BLOG】BigQueryで時を遡って過去のテーブルを再構成する

はじめに

こんにちは、データシステム部データ基盤ブロックSREの纐纈です。

本記事では、過去に遡ってBigQueryのデータを参照する方法(以下、タイムトラベルと呼びます)をご紹介します。また、この機能はBigQueryが提供している、変更または削除されたデータにアクセスするタイムトラベルとは異なることをご了承ください。

開発背景

この機能は過去データを日次スナップショットより細かい粒度で見たい、また障害対応時に障害発生前などピンポイントで時間指定して参照したいという要望を受け、開発することになりました。

さらに、BigQueryからこの機能を作るのに役立ちそうなテーブル関数という機能がリリースされたのもきっかけとなりました。


BigQuery release notes | Google Cloud
Case-insensitive collation support for BigQuery is now available for Preview. Collation determines how strings are sorted and compared in collation-supported operations. If case-insensitive collation is used, case is ignored in comparison and sorting oper
https://cloud.google.com/bigquery/docs/release-notes#September_28_2021


テーブル関数とは、事前にパラメータを使って定義したクエリをエイリアスのようにテーブルとして保存して、そのテーブルに対して関数を実行するかのようにクエリを書ける機能です。例えば、以下のようにテーブル関数を定義するとします。

CREATE TABLE FUNCTIONS `some_dataset.foo_records_by_name`(name_param STRING) AS
SELECT * FROM `some_dataset.foo` WHERE name = name_param

その上で、このようなクエリを実行するとします。

SELECT * FROM `foo_records_by_name`('bar')

すると、事前に定義したテーブル関数がパラメータを代入して、結果としてこちらのクエリが実行されます。

SELECT * FROM `some_dataset.foo` WHERE name = 'bar'

短いクエリだと受けられる恩恵が少ないですが、長いクエリに対しては重宝される機能かと思います。

タイムトラベルの機能

SELECT * FROM `<table ID>`('2021-01-01')

テーブル関数を使用して上のようにクエリを打つと、指定した日時の状態のデータを参照できます。

実際に実行されているクエリは、こちらです。クエリ内のpast_timeはTIMESTAMP型で、テーブル関数から渡されるパラメータです。

WITH
  snapshot_validation AS (
  SELECT
    '<base_table>' AS table_id,
    MAX(creation_time) AS snapshot_validation_time,
  FROM
    `<snapshot_dataset>.INFORMATION_SCHEMA.TABLES`
  WHERE
    REGEXP_CONTAINS( table_name, CONCAT('<base_table>','_',FORMAT_TIMESTAMP("%Y%m%d", TIMESTAMP_SUB(past_time, INTERVAL 1 DAY), "Asia/Tokyo") ))),
  streaming_data_validation AS (
  SELECT
    table_id,
    min_bigquery_insert_time AS streaming_validation_time
  FROM
    `<changetracking validation table ID>`
  WHERE
    dataset_id = '<changetracking_dataset>'
    AND table_id = '<changetracking_table>'),
  validation AS (
  SELECT
    a.table_id,
    snapshot_validation_time,
    streaming_validation_time
  FROM
    snapshot_validation AS a
  INNER JOIN
    streaming_data_validation AS b
  ON
    a.table_id = b.table_id),
  nearest_snapshot AS (
  SELECT
    *,
    CONCAT(${join(",", primary_key)}) AS primary_key
  FROM
    `<snapshot_dataset>.<base_table>_*` AS snapshot_table
  WHERE
    _TABLE_SUFFIX IN (FORMAT_TIMESTAMP("%Y%m%d", TIMESTAMP_SUB(past_time, INTERVAL 1 DAY), "Asia/Tokyo"))),
  changetracking_for_two_days_until_specified_time AS (
    SELECT * FROM (
      SELECT
        *,
        id AS primary_key
      FROM
        `changetracking_dataset.changetracking_table`
      WHERE
        bigquery_insert_time BETWEEN TIMESTAMP_SUB(past_time, INTERVAL 2 DAY) AND past_time
    ) AS changetracking
  ),
  changetracking_latest_version_key_group AS (
  SELECT
    primary_key,
    MAX(CAST(changetrack_ver AS int64)) AS changetrack_ver,
    MAX(changetrack_start_time) AS changetrack_start_time
  FROM
    changetracking_for_two_days_until_specified_time
  GROUP BY
    primary_key ),
  changetracking_latest_version AS (
  SELECT
    a.*
  FROM
    changetracking_for_two_days_until_specified_time AS a
  INNER JOIN
    changetracking_latest_version_key_group AS b
  ON
    a.primary_key = b.primary_key
    AND a.changetrack_ver = b.changetrack_ver ),
  changetracking_without_duplication AS (
  SELECT
    *
  FROM (
    SELECT
      *,
      ROW_NUMBER() OVER (PARTITION BY primary_key ORDER BY primary_key) AS row_number
    FROM
      changetracking_latest_version)
  WHERE
    row_number = 1 ),
  nearest_snapshot_except_what_changetracking_included AS (
  SELECT
    *
  FROM
    nearest_snapshot
  WHERE
    primary_key NOT IN (
    SELECT
      primary_key
    FROM
      streaming_diff ) )
  SELECT
    ... -- columns in the base table (cannot use *) to align with changetracking
  FROM
    nearest_snapshot_except_what_changetracking_included
  UNION ALL
  SELECT
    ... -- columns in the base table (cannot use *) since changetracking_without_duplication has more columns
  FROM
    changetracking_without_duplication
  WHERE
    changetrack_type != 'D'
  AND
  IF
    (snapshot_validation_time IS NOT NULL,
      TRUE,
      ERROR( CONCAT("Cannot time-travel since snapshot data does not exist for the specified time." ) ))
    AND
  IF
    (past_time > streaming_validation_time,
      TRUE,
      ERROR( CONCAT("Cannot time-travel since recording changetracking had not started at the time. check nearest daily snapshot directly. Specify time after: ", streaming_validation_time)))

このクエリの中では、パタメータに渡された日時をもとに以下の内容を実行しています。

  • 指定された日のテーブルコピーがあるかチェック
  • 差分データがあるかチェック
  • 日次で取っているテーブルのコピーからデータを取得する
  • テーブルコピーに記録されている最終時刻と指定した時間までの差分データを変更履歴ログから摘出する
  • 組み合わせて指定された時刻のテーブルの状態を再現する

そして、そのテーブルに対して元々のSELECT文のクエリを実行するという仕組みになっています。



使われているテーブルについて、簡単に説明します。

  • base_table:元となるテーブルで、このテーブルの過去データを見ることがタイムトラベル機能の目的です。
  • daily_snapshot:base_tableの日次テーブルコピー。データ基盤を構築するために、日次バッチによってBigQueryにテーブルデータを転送しており、その際にその日時点でのテーブルのコピーを取っています。データ転送用の日次バッチは日本時間0時に動かしていますが、必ずしも0時時点のデータとは限りません。テーブル定義はbase_tableと全く同じです。
  • change_tracking:base_tableの変更追跡ログ。これはSQL ServerのChange trackingという機能によって保存されているテーブルです。データベース上のテーブルに対してinsert, update, deleteの変更が入る度に、変更に関する情報が記録されています。

changetrackingのテーブルは、base_tableのカラムと変更追跡のカラム、また転送バッチが実行された時刻のカラムによって定義されています。この機能に使われている追加のカラムのみ、説明します。

続きはこちら

株式会社ZOZO's job postings

Weekly ranking

Show other rankings
Invitation from 株式会社ZOZO
If this story triggered your interest, have a chat with the team?