Datastream BigQuery 迁移工具包


本页介绍了使用 Datastream 从 Dataflow Datastream to BigQuery 模板迁移到 Datastream 内置的 BigQuery 复制解决方案时的最佳实践。

准备工作

本页面中的说明假定:

  • 您熟悉并已安装 Docker
  • 您知道如何从 GitHub 等服务克隆代码库。
  • 您知道如何在 Datastream 中运行数据流。
  • 您已安装 Google Cloud CLI

迁移工具包概览

Datastream BigQuery 迁移工具包是由 Google Cloud提供的开源软件。借助该工具包,您可以从 Dataflow Datastream 到 BigQuery 模板迁移,但您也可以在从其他流水线迁移时使用该工具包,如后续的从其他流水线迁移部分中所述。

如需使用该工具包迁移 BigQuery 表,您需要执行以下步骤:

  1. 创建、启动和暂停以 BigQuery 为目标的数据流。
  2. 对每个需要迁移的 BigQuery 表运行迁移。
  3. 恢复数据流。

该工具包会执行以下操作:

  • 使用 Datastream 发现 API 检索源表架构。
  • 根据检索到的架构创建与 Datastream 兼容的 BigQuery 表。
  • 提取您要迁移的 BigQuery 表的架构,以确定必要的数据类型转换。
  • 将原表中的所有现有行复制到新表中,包括适当的列类型转换

如需详细了解该工具包的结构及其使用的实参,请参阅该工具包的 README.md 文件

设置迁移工具包

如需使用 Docker 运行迁移工具包,请执行以下步骤:

  1. 克隆代码库并切换到该代码库的目录:

    git clone https://github.com/GoogleCloudPlatform/datastream-bigquery-migration-toolkit &&
    cd datastream-bigquery-migration-toolkit
    
  2. 构建映像:

    docker build -t migration-service .
    
  3. 使用 Google Cloud CLI 凭据进行身份验证:

    docker run -ti \
    --name gcloud-config migration-service gcloud auth application-default login
    
  4. 设置 Google Cloud 项目媒体资源:

    docker run -ti --volumes-from gcloud-config migration-service \
    gcloud config set project PROJECT_ID
    

    PROJECT_ID 替换为您的 Google Cloud项目的标识符。

从 Dataflow 迁移到 Datastream 内置解决方案

  1. 创建以 BigQuery 为目标位置的 Datastream 数据流

  2. 开始播放视频,然后立即暂停。这样一来,Datastream 就能捕获在迁移开始之前捕获更改的位置。

  3. 排空 Datastream 和 Dataflow 流水线:

    1. 暂停现有的 Cloud Storage 目标数据流。
    2. 检查数据流的总延迟时间指标,并等待至少与当前延迟时间一样长的时间,以确保所有正在传输的事件都已写入目标位置。
    3. 排空 Dataflow 作业
  4. 执行迁移:

    1. dry_run 模式下运行迁移。在 dry_run 模式下,您可以生成 CREATE TABLE DDL 语句和用于复制数据的 SQL 语句,而无需执行这些语句:

      docker run -v output:/output -ti --volumes-from gcloud-config \
      migration-service python3 ./migration/migrate_table.py dry_run \
      --project-id PROJECT_ID \
      --stream-id STREAM_ID \
      --datastream-region STREAM_REGION \
      --source-schema-name SOURCE_SCHEMA_NAME \
      --source-table-name SOURCE_TABLE_NAME \
      --bigquery-source-dataset-name BIGQUERY_SOURCE_DATASET_NAME \
      --bigquery-source-table-name BIGQUERY_SOURCE_TABLE_NAME
      

      替换以下内容:

      • PROJECT_ID:您的 Google Cloud项目的唯一标识符。
      • STREAM_ID:BigQuery 目标数据流的唯一标识符。
      • STREAM_REGION:您的直播的位置,例如 us-central1
      • SOURCE_SCHEMA_NAME:源架构的名称。
      • SOURCE_TABLE_NAME:源表的名称。
      • BIGQUERY_SOURCE_DATASET_NAME:现有 BigQuery 数据集的名称。
      • BIGQUERY_SOURCE_TABLE_NAME:现有 BigQuery 表的名称。
    2. 检查 output/create_target_tableoutput/copy_rows 下的 .sql 文件。 以下是将在您的 Google Cloud 项目中执行的 SQL 命令:

      docker run -v output:/output -ti migration-service find output/create_target_table \
      -type f -print -exec cat {} \;
      
      docker run -v output:/output -ti migration-service find output/copy_rows \
      -type f -print -exec cat {} \;
      
    3. 如需执行 SQL 命令,请在 full 模式下运行迁移。借助 full 模式,您可以在 BigQuery 中创建表并复制现有 BigQuery 表中的所有行:

      docker run -v output:/output -ti --volumes-from gcloud-config \
      migration-service python3 ./migration/migrate_table.py full \
      --project-id PROJECT_ID \
      --stream-id STREAM_ID \
      --datastream-region STREAM_REGION \
      --source-schema-name SOURCE_SCHEMA_NAME \
      --source-table-name SOURCE_TABLE_NAME \
      --bigquery-source-dataset-name BIGQUERY_SOURCE_DATASET_NAME \
      --bigquery-source-table-name BIGQUERY_SOURCE_TABLE_NAME
      
  5. 恢复暂停的数据流。

  6. 打开 Google Cloud 日志浏览器,并使用以下查询查找 Datastream 日志:

    resource.type="datastream.googleapis.com/Stream"
    resource.labels.stream_id=STREAM_ID
    

    查找以下日志,其中 %d 是一个数字:

    Completed writing %d records into..
    

    此日志表明新数据流已成功将数据加载到 BigQuery 中。 只有在有 CDC 数据要加载时才会显示。

从其他流水线迁移

该工具包还可让您从其他流水线迁移到 Datastream 内置的 BigQuery 解决方案。该工具包可以使用 dry_run 模式,根据源数据库架构为与 Datastream 兼容的 BigQuery 表生成 CREATE TABLE DDL:

  docker run -v output:/output -ti --volumes-from gcloud-config \
  migration-service python3 ./migration/migrate_table.py dry_run \
  --project-id PROJECT_ID \
  --stream-id STREAM_ID \
  --datastream-region STREAM_REGION \
  --source-schema-name SOURCE_SCHEMA_NAME \
  --source-table-name SOURCE_TABLE_NAME \
  --bigquery-source-dataset-name BIGQUERY_SOURCE_DATASET_NAME \
  --bigquery-source-table-name BIGQUERY_SOURCE_TABLE_NAME

由于 BigQuery 表架构可能各不相同,因此很难提供用于复制行的通用 SQL 语句。您可以使用源表 DDL 目录 (output/source_table_ddl) 和目标表 DDL 目录 (output/create_target_table) 中的架构来创建 SQL 语句,并在源列上添加适当的转换

以下是您可以使用的 SQL 语句格式示例:

  INSERT INTO DESTINATION_TABLE (DESTINATION_COLUMN1, DESTINATION_COLUMN2...)
  SELECT SOURCE_COLUMN1, SOURCE_COLUMN2
  FROM SOURCE_TABLE;

替换以下内容:

  • DESTINATION_TABLE:BigQuery 中目标表的名称。
  • DESTINATION_COLUMN:目标表中的列名称。
  • SOURCE_COLUMN:源表中的列名称。
  • SOURCE_TABLE:源表的名称。

限制

该工具包具有以下限制:

  • 现有 BigQuery 表和新 BigQuery 表中的列名称需要匹配(忽略元数据列)。如果您应用会更改现有表格中列名称的 Dataflow 用户定义的函数 (UDF)(例如,通过添加前缀或更改大小写),迁移会失败。
  • 不支持跨区域和跨项目迁移。
  • 迁移操作是逐个表进行的。
  • 仅支持 Oracle 和 MySQL 数据源。