创建迁移工作流

在 BigQuery Migration API 中创建迁移工作流。迁移工作流包含一系列子任务,这些子任务用于将数据仓库迁移到 BigQuery。

代码示例

Node.js

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Node.js 设置说明进行操作。 如需了解详情,请参阅 BigQuery Node.js API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭证。如需了解详情,请参阅为客户端库设置身份验证

const {MigrationServiceClient} = require('@google-cloud/bigquery-migration').v2;
const {status} = require('@grpc/grpc-js');

const client = new MigrationServiceClient();

/**
 * Creates a migration workflow.
 *
 * Batch translate Teradata SQL scripts and DDL into BigQuery-compatible SQL.
 * It configures a translation task that reads input files from a
 * Google Cloud Storage source bucket and writes the converted output to a target bucket.
 *
 * @param {string} projectId The Google Cloud project ID (for example, 'example-project-id').
 * @param {string} location The Google Cloud location (for example, 'us').
 * @param {string} gcsSourcePath The Cloud Storage path for the source data (for example, 'gs://example-bucket/example-input-folder').
 * @param {string} gcsTargetPath The Cloud Storage path for the translated results (for example, 'gs://example-bucket/example-output-folder').
 */
async function createMigrationWorkflow(
  projectId,
  location,
  gcsSourcePath,
  gcsTargetPath,
) {
  const parent = client.locationPath(projectId, location);
  const migrationWorkflow = {
    displayName: 'Example BTEQ Migration Workflow',
    tasks: {
      'translation-task': {
        type: 'Translation_Teradata2BQ',
        translationConfigDetails: {
          gcsSourcePath,
          gcsTargetPath,
          sourceDialect: {
            teradataDialect: {
              mode: 'SQL',
            },
          },
          targetDialect: {
            bigqueryDialect: {},
          },
        },
      },
    },
  };

  const request = {
    parent,
    migrationWorkflow,
  };

  try {
    const [workflow] = await client.createMigrationWorkflow(request);
    console.log(`Created migration workflow: ${workflow.name}`);
    console.log(`  Display Name: ${workflow.displayName}`);
    console.log(`  State: ${workflow.state}`);
    console.log('  Tasks:');
    for (const taskName in workflow.tasks) {
      const task = workflow.tasks[taskName];
      console.log(`    - ${taskName}:`);
      console.log(`        Type: ${task.type}`);
      console.log(`        State: ${task.state}`);
    }
  } catch (err) {
    if (err.code === status.INVALID_ARGUMENT) {
      console.error(
        `Error: Invalid argument provided for creating Migration '${migrationWorkflow.displayName}'. ` +
          `Details: ${err.message}. Please check the request parameters and ensure they are valid.`,
      );
    } else {
      console.error('Error creating migration workflow:', err);
    }
  }
}

Python

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Python 设置说明进行操作。 如需了解详情,请参阅 BigQuery Python API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭证。如需了解详情,请参阅为客户端库设置身份验证

from google.api_core import exceptions
from google.cloud import bigquery_migration_v2
from google.cloud.bigquery_migration_v2.types import (
    migration_entities,
    translation_config,
)


def create_migration_workflow(
    project_id: str, location: str, gcs_source_path: str, gcs_target_path: str
) -> None:
    """Creates a Teradata SQL translation workflow

    Creates a migration workflow to batch translate Teradata SQL scripts
    and DDL into BigQuery-compatible SQL. It configures a translation task
    that reads input files from a Google Cloud Storage source bucket and
    writes the converted output to a target bucket.

    Args:
        project_id: The Google Cloud project ID.
        location: The geographic location of the migration workflow (for example, us).
        gcs_source_path: The Cloud Storage path for a directory of files to
            translate in a batch (for example, gs://example-bucket/example-input-folder/).
        gcs_target_path: The Cloud Storage path to write back the corresponding
            input files to (for example, gs://example-bucket/example-output-folder/).
    """
    client = bigquery_migration_v2.MigrationServiceClient()

    parent = f"projects/{project_id}/locations/{location}"

    source_dialect = bigquery_migration_v2.Dialect()
    source_dialect.teradata_dialect = bigquery_migration_v2.TeradataDialect(
        mode=bigquery_migration_v2.TeradataDialect.Mode.SQL
    )
    target_dialect = bigquery_migration_v2.Dialect()
    target_dialect.bigquery_dialect = bigquery_migration_v2.BigQueryDialect()

    translation_config_details = bigquery_migration_v2.TranslationConfigDetails(
        gcs_source_path=gcs_source_path,
        gcs_target_path=gcs_target_path,
        source_dialect=source_dialect,
        target_dialect=target_dialect,
    )

    task = migration_entities.MigrationTask(
        type="Translation_Teradata2BQ",
        translation_config_details=translation_config_details,
    )

    workflow = migration_entities.MigrationWorkflow(
        display_name="Example Teradata to BigQuery Migration Workflow",
        tasks={"translation-task": task},
    )

    try:
        response = client.create_migration_workflow(
            parent=parent,
            migration_workflow=workflow,
        )
        print(f"Created migration workflow: {response.name}")
        print(f"Display name: {response.display_name}")
        print(f"State: {response.state.name}")
    except exceptions.AlreadyExists as e:
        print(f"Migration workflow already exists: {e}")

后续步骤

如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅Google Cloud 示例浏览器