WITH
snapshot_validation AS (
SELECT
'<base_table>' AS table_id,
MAX(creation_time) AS snapshot_validation_time,
FROM
`<snapshot_dataset>.INFORMATION_SCHEMA.TABLES`
WHERE
REGEXP_CONTAINS( table_name, CONCAT('<base_table>','_',FORMAT_TIMESTAMP("%Y%m%d", TIMESTAMP_SUB(past_time, INTERVAL 1 DAY), "Asia/Tokyo") ))),
streaming_data_validation AS (
SELECT
table_id,
min_bigquery_insert_time AS streaming_validation_time
FROM
`<changetracking validation table ID>`
WHERE
dataset_id = '<changetracking_dataset>'
AND table_id = '<changetracking_table>'),
validation AS (
SELECT
a.table_id,
snapshot_validation_time,
streaming_validation_time
FROM
snapshot_validation AS a
INNER JOIN
streaming_data_validation AS b
ON
a.table_id = b.table_id),
nearest_snapshot AS (
SELECT
*,
CONCAT(${join(",", primary_key)}) AS primary_key
FROM
`<snapshot_dataset>.<base_table>_*` AS snapshot_table
WHERE
_TABLE_SUFFIX IN (FORMAT_TIMESTAMP("%Y%m%d", TIMESTAMP_SUB(past_time, INTERVAL 1 DAY), "Asia/Tokyo"))),
changetracking_for_two_days_until_specified_time AS (
SELECT * FROM (
SELECT
*,
id AS primary_key
FROM
`changetracking_dataset.changetracking_table`
WHERE
bigquery_insert_time BETWEEN TIMESTAMP_SUB(past_time, INTERVAL 2 DAY) AND past_time
) AS changetracking
),
changetracking_latest_version_key_group AS (
SELECT
primary_key,
MAX(CAST(changetrack_ver AS int64)) AS changetrack_ver,
MAX(changetrack_start_time) AS changetrack_start_time
FROM
changetracking_for_two_days_until_specified_time
GROUP BY
primary_key ),
changetracking_latest_version AS (
SELECT
a.*
FROM
changetracking_for_two_days_until_specified_time AS a
INNER JOIN
changetracking_latest_version_key_group AS b
ON
a.primary_key = b.primary_key
AND a.changetrack_ver = b.changetrack_ver ),
changetracking_without_duplication AS (
SELECT
*
FROM (
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY primary_key ORDER BY primary_key) AS row_number
FROM
changetracking_latest_version)
WHERE
row_number = 1 ),
nearest_snapshot_except_what_changetracking_included AS (
SELECT
*
FROM
nearest_snapshot
WHERE
primary_key NOT IN (
SELECT
primary_key
FROM
streaming_diff ) )
SELECT
... -- columns in the base table (cannot use *) to align with changetracking
FROM
nearest_snapshot_except_what_changetracking_included
UNION ALL
SELECT
... -- columns in the base table (cannot use *) since changetracking_without_duplication has more columns
FROM
changetracking_without_duplication
WHERE
changetrack_type != 'D'
AND
IF
(snapshot_validation_time IS NOT NULL,
TRUE,
ERROR( CONCAT("Cannot time-travel since snapshot data does not exist for the specified time." ) ))
AND
IF
(past_time > streaming_validation_time,
TRUE,
ERROR( CONCAT("Cannot time-travel since recording changetracking had not started at the time. check nearest daily snapshot directly. Specify time after: ", streaming_validation_time)))