Faça a gestão das streams

Nesta página, vai saber como usar a API Datastream para:

  • Crie streams
  • Obtenha informações sobre streams e objetos de stream
  • Atualizar streams iniciando, pausando, retomando e modificando-as, bem como iniciando e parando o preenchimento para objetos de stream
  • Recupere streams com falhas definitivas
  • Ative o streaming de objetos grandes para streams Oracle
  • Elimine streams

Existem duas formas de usar a API Datastream. Pode fazer chamadas à API REST ou usar a CLI (interface de linha de comandos) do Google Cloud.

Para ver informações de alto nível sobre a utilização da Google Cloud CLI para gerir streams do Datastream, consulte o artigo Streams do Datastream da CLI gcloud.

Crie uma stream

Nesta secção, vai aprender a criar uma stream que é usada para transferir dados da sua origem para um destino. Os exemplos que se seguem não são exaustivos, mas realçam funcionalidades específicas do fluxo de dados. Para abordar o seu exemplo de utilização específico, use estes exemplos juntamente com a documentação de referência da API Datastream.

Esta secção aborda os seguintes exemplos de utilização:

Exemplo 1: transmita objetos específicos para o BigQuery

Neste exemplo, vai aprender a:

  • Faça streaming do MySQL para o BigQuery
  • Inclua um conjunto de objetos na stream
  • Defina o modo de escrita para a stream como apenas anexar
  • Preencher todos os objetos incluídos na stream

Segue-se um pedido para extrair todas as tabelas de schema1 e duas tabelas específicas de schema2: tableA e tableC. Os eventos são escritos num conjunto de dados no BigQuery.

O pedido não inclui o parâmetro customerManagedEncryptionKey. Por isso, o sistema de gestão de chaves interno é usado para encriptar os seus dados em vez da CMEK. Google Cloud

O parâmetro backfillAll associado à realização do preenchimento retroativo do histórico (ou da captura instantânea) está definido como um dicionário vazio ({}), o que significa que o Datastream preenche retroativamente os dados do histórico de todas as tabelas incluídas na stream.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream
{
  "displayName": "MySQL CDC to BigQuery",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "schema1" },
          {
            "database": "schema2",
            "mysqlTables": [
              {
                "table": "tableA",
                "table": "tableC"
              }
            ]
          }
        ]
      },
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "dataFreshness": "900s"
    }
  },
  "backfillAll": {}
}

gcloud

Para mais informações sobre como usar gcloud para criar uma stream, consulte a documentação do Google Cloud SDK.

Exemplo 2: exclua objetos específicos de uma stream com uma origem PostgreSQL

Neste exemplo, vai aprender a:

  • Faça streaming do PostgreSQL para o BigQuery
  • Exclua objetos da stream
  • Exclua objetos do preenchimento

O código seguinte mostra um pedido para criar um fluxo usado para transferir dados de uma base de dados PostgreSQL de origem para o BigQuery. Quando cria uma stream a partir de uma base de dados PostgreSQL de origem, tem de especificar dois campos adicionais específicos do PostgreSQL no seu pedido:

  • replicationSlot: um espaço de replicação é um pré-requisito para configurar uma base de dados PostgreSQL para replicação. Tem de criar um espaço de replicação para cada stream.
  • publication: uma publicação é um grupo de tabelas a partir do qual quer replicar alterações. O nome da publicação tem de existir na base de dados antes de iniciar uma stream. No mínimo, a publicação tem de incluir as tabelas especificadas na lista includeObjects da stream.

O parâmetro backfillAll associado à realização do preenchimento histórico (ou da captura instantânea) está definido para excluir uma tabela.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myPostgresStream
{
  "displayName": "PostgreSQL to BigQueryCloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp",
    "postgresqlSourceConfig": {
      "replicationSlot": "replicationSlot1",
      "publication": "publicationA",
      "includeObjects": {
        "postgresqlSchemas": {
          "schema": "schema1"
        }
      },
      "excludeObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
        "postgresqlTables": [
          {
            "table": "tableA",
            "postgresqlColumns": [
              { "column": "column5" }
              ]
              }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
           "location": "us",
           "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {
    "postgresqlExcludedObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
            "postgresqlTables": [
              { "table": "tableA" }
            ]
          }
        ]
      }
    }
  }

gcloud

Para mais informações sobre como usar gcloud para criar uma stream, consulte a documentação do Google Cloud SDK.

Exemplo 3: especifique o modo de escrita apenas de anexação para uma stream

Quando faz streaming para o BigQuery, pode definir o modo de escrita: merge ou appendOnly. Para mais informações, consulte o artigo Configure o modo de escrita.

Se não especificar o modo de escrita no seu pedido para criar uma stream, é usado o modo merge predefinido.

O pedido seguinte mostra como definir o modo appendOnly quando cria uma stream do MySQL para o BigQuery.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream
{
  "displayName": "My append-only stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "myMySqlDb"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "appendOnly": {}
    }
  },
  "backfillAll": {}
}

gcloud

Para mais informações sobre como usar gcloud para criar uma stream, consulte a documentação do Google Cloud SDK.

Exemplo 4: faça streaming para um projeto diferente no BigQuery

Se criou os recursos da visualização de propriedade num projeto, mas quer transmitir para um projeto diferente no BigQuery, pode fazê-lo através de um pedido semelhante ao seguinte.

Se especificar sourceHierarchyDatasets para o conjunto de dados de destino, tem de preencher o campo projectId.

Se especificar singleTargetDataset para o conjunto de dados de destino, preencha o campo datasetId no formato projectId:datasetId.

REST

Para sourceHierarchyDatasets:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=crossProjectBqStream1
{
  "displayName": "My cross-project stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "myMySqlDb"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        },
        "projectId": "myProjectId2"
      }
    }
  },
  "backfillAll": {}
}

Para singleTargetDataset:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=crossProjectBqStream2
{
  "displayName": "My cross-project stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "myMySqlDb"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "singleTargetDataset": {
        "datasetId": "myProjectId2:myDatasetId"
      },
    }
  },
  "backfillAll": {}
}

gcloud

Para sourceHierarchyDatasets:

  datastream streams create crossProjectBqStream1 --location=us-central1
  --display-name=my-cross-project-stream --source=source-cp --mysql-source-config=mysql_source_config.json
  --destination=destination-cp --bigquery-destination-config=source_hierarchy_cross_project_config.json
  --backfill-none
  

O conteúdo do ficheiro de configuração source_hierarchy_cross_project_config.json:

  {"sourceHierarchyDatasets": {"datasetTemplate": {"location": "us-central1", "datasetIdPrefix": "prefix_"}, "projectId": "myProjectId2"}}
  

Para singleTargetDataset:

  datastream streams create crossProjectBqStream --location=us-central1
  --display-name=my-cross-project-stream --source=source-cp --mysql-source-config=mysql_source_config.json
  --destination=destination-cp --bigquery-destination-config=single_target_cross_project_config.json
  --backfill-none
  

O conteúdo do ficheiro de configuração single_target_cross_project_config.json:

  {"singleTargetDataset": {"datasetId": "myProjectId2:myDatastetId"}}
  

Para mais informações sobre como usar gcloud para criar uma stream, consulte a documentação do Google Cloud SDK.

Exemplo 5: faça streaming para um destino do Cloud Storage

Neste exemplo, vai aprender a:

  • Faça streaming da Oracle para o Cloud Storage
  • Defina um conjunto de objetos a incluir na stream
  • Defina a CMEK para encriptar dados em repouso

O pedido seguinte mostra como criar uma stream que escreve os eventos num contentor no Cloud Storage.

Neste exemplo de pedido, os eventos são escritos no formato de saída JSON e é criado um novo ficheiro a cada 100 MB ou 30 segundos (substituindo os valores predefinidos de 50 MB e 60 segundos).

Para o formato JSON, pode:

  • Inclua um ficheiro de esquema de tipos unificados no caminho. Como resultado, o Datastream escreve dois ficheiros no Cloud Storage: um ficheiro de dados JSON e um ficheiro de esquema Avro. O ficheiro de esquema tem o mesmo nome que o ficheiro de dados, com uma extensão .schema.

  • Ative a compressão gzip para que o Datastream comprima os ficheiros escritos no Cloud Storage.

Ao usar o parâmetro backfillNone, o pedido especifica que apenas as alterações em curso são transmitidas para o destino, sem preenchimento.

O pedido especifica o parâmetro da chave de encriptação gerida pelo cliente, que lhe permite controlar as chaves usadas para encriptar os dados em repouso num Google Cloud projeto. O parâmetro refere-se à CMEK que o Datastream usa para encriptar os dados transmitidos em fluxo contínuo da origem para o destino. Também especifica o conjunto de chaves para a CMEK.

Para mais informações acerca dos conjuntos de chaves, consulte o artigo Recursos do Cloud KMS. Para mais informações sobre a proteção dos seus dados através de chaves de encriptação, consulte o Cloud Key Management Service (KMS).

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
    connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "GcsBucketCp",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "jsonFileFormat": {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      },
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillNone": {}
}

gcloud

Para mais informações sobre como usar gcloud para criar uma stream, consulte a documentação do Google Cloud SDK.

Exemplo 6: fazer stream para uma tabela gerida do BigLake

Neste exemplo, vai saber como configurar uma stream para replicar dados de uma base de dados MySQL para uma tabela Iceberg do BigLake no modo append-only. Antes de criar o pedido, certifique-se de que concluiu os seguintes passos:

  • Ter um contentor do Cloud Storage onde quer armazenar os seus dados
  • Crie uma associação de recursos da nuvem
  • Conceda à ligação de recursos da nuvem acesso ao contentor do Cloud Storage

Em seguida, pode usar o seguinte pedido para criar a sua stream:

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlBigLakeStream
{
  "displayName": "MySQL to BigLake stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlBigLakeCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          {
            "database": "my-mysql-database"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id",
    "bigqueryDestinationConfig": {
      "blmtConfig": {
        "bucket": "my-gcs-bucket-name",
        "rootPath": "my/folder",
        "connectionName": "my-project-id.us-central1.my-bigquery-connection-name",
        "fileFormat": "PARQUET",
        "tableFormat": "ICEBERG"
        },
      "singleTargetDataset": {
        "datasetId": "my-project-id:my-bigquery-dataset-id"
      },
      "appendOnly": {}
    }
  },
  "backfillAll": {}
}

gcloud

datastream streams create mysqlBigLakeStream --location=us-central1
--display-name=mysql-to-bl-stream --source=source --mysql-source-config=mysql_source_config.json
--destination=destination --bigquery-destination-config=bl_config.json
--backfill-none

O conteúdo do ficheiro de configuração de origem mysql_source_config.json:

{"excludeObjects": {}, "includeObjects": {"mysqlDatabases":[{"database":"my-mysql-database"}]}}

O conteúdo do ficheiro de configuração bl_config.json:

{ "blmtConfig": { "bucket": "my-gcs-bucket-name", "rootPath": "my/folder", "connectionName": "my-project-id.us-central1.my-bigquery-connection-name", "fileFormat": "PARQUET", "tableFormat": "ICEBERG" }, "singleTargetDataset": {"datasetId": "my-project-id:my-bigquery-dataset-id"}, "appendOnly": {} }

Terraform

resource "google_datastream_stream" "stream" {
  stream_id    = "mysqlBlStream"
  location     = "us-central1"
  display_name = "MySQL to BigLake stream"

  source_config {
    source_connection_profile = "/projects/myProjectId1/locations/us-central1/streams/mysqlBlCp"
    mysql_source_config {
      include_objects {
        mysql_databases {
          database = "my-mysql-database"
        }
      }
    }
  }

  destination_config {
    destination_connection_profile = "projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id"
    bigquery_destination_config {
      single_target_dataset {
        dataset_id = "my-project-id:my-bigquery-dataset-id"
      }
      blmt_config {
        bucket          = "my-gcs-bucket-name"
        table_format    = "ICEBERG"
        file_format     = "PARQUET"
        connection_name = "my-project-id.us-central1.my-bigquery-connection-name"
        root_path       = "my/folder"
      }
      append_only {}
    }
  }

  backfill_none {}
}
    

Valide a definição de uma stream

Antes de criar uma stream, pode validar a respetiva definição. Desta forma, pode garantir que todas as verificações de validação são aprovadas e que a stream é executada com êxito quando for criada.

A validação de verificações de streams:

  • Se a origem está configurada corretamente para permitir que o fluxo de dados transmita dados a partir da mesma.
  • Se a stream consegue estabelecer ligação à origem e ao destino.
  • A configuração integral da stream.

Para validar uma stream, adicione &validate_only=true ao URL antes do corpo do pedido:

POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"

Depois de fazer este pedido, vê as verificações de validação que o Datastream executa para a sua origem e destino, juntamente com a indicação de se as verificações são aprovadas ou reprovadas. Para qualquer verificação de validação que não seja aprovada, são apresentadas informações sobre o motivo da falha e o que fazer para retificar o problema.

Por exemplo, suponhamos que tem uma chave de encriptação gerida pelo cliente (CMEK) que quer que o Datastream use para encriptar os dados transmitidos em stream da origem para o destino. Como parte da validação da stream, o Datastream verifica se a chave existe e se o Datastream tem autorizações para usar a chave. Se qualquer uma destas condições não for cumprida, quando validar a stream, é devolvida a seguinte mensagem de erro:

CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS

Para resolver este problema, verifique se a chave que indicou existe e se a conta de serviço do Datastream tem a autorização cloudkms.cryptoKeys.get para a chave.

Depois de fazer as correções adequadas, faça o pedido novamente para garantir que todas as verificações de validação são aprovadas. No exemplo anterior, a verificação CMEK_VALIDATE_PERMISSIONS já não devolve uma mensagem de erro, mas tem um estado de PASSED.

Receba informações sobre uma stream

O código seguinte mostra um pedido para obter informações sobre uma stream. Estas informações incluem:

  • O nome da stream (identificador exclusivo)
  • Um nome intuitivo para a stream (nome a apresentar)
  • Datas/horas de quando a stream foi criada e atualizada pela última vez
  • Informações sobre os perfis de ligação de origem e destino associados à stream
  • O estado da stream

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID

A resposta é apresentada da seguinte forma:

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          },
          {
            "schema": "schema3",
            "oracleTables": [
              { "table": "tableA" },
              { "table": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": {},
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillAll": {}
}

gcloud

Para mais informações sobre a utilização de gcloud para obter informações sobre a sua stream, consulte a documentação do Google Cloud SDK.

Streams em direto

O código seguinte mostra um pedido para obter uma lista de todas as streams no projeto e na localização especificados.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams

gcloud

Para mais informações sobre a utilização de gcloud para obter informações sobre todas as suas streams, consulte a documentação do Google Cloud SDK.

Listar objetos de uma stream

O código seguinte mostra um pedido para obter informações sobre todos os objetos de uma stream.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects

gcloud

Para mais informações sobre a utilização de gcloud para obter informações sobre todos os objetos da sua stream, consulte a documentação do Google Cloud SDK.

A lista de objetos devolvida pode ser semelhante à seguinte:

REST

{
  "streamObjects": [
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object1",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object2",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object3",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

gcloud

Para mais informações sobre a utilização de gcloud para listar objetos de uma stream, consulte a documentação do SDK do Google Cloud.

Inicie uma stream

O código seguinte mostra um pedido para iniciar uma stream.

Ao usar o parâmetro updateMask no pedido, apenas os campos que especificar têm de ser incluídos no corpo do pedido. Para iniciar uma stream, altere o valor no campo state de CREATED para RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Para mais informações sobre como usar gcloud para iniciar a sua transmissão, consulte a documentação do SDK do Google Cloud.

Pause uma stream

O código seguinte mostra um pedido para pausar uma stream em execução.

Para este exemplo, o campo especificado para o parâmetro updateMask é o campo state. Ao pausar a stream, altera o respetivo estado de RUNNING para PAUSED.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "PAUSED"
}

gcloud

Para mais informações sobre a utilização de gcloud para pausar a stream, consulte a documentação do Google Cloud SDK.

Retome uma stream

O código seguinte mostra um pedido para retomar uma stream pausada.

Para este exemplo, o campo especificado para o parâmetro updateMask é o campo state. Ao retomar a stream, está a alterar o respetivo estado de PAUSED para RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Para mais informações sobre como usar gcloud para retomar a sua stream, consulte a documentação do Google Cloud SDK.

Recupere uma stream

Pode recuperar uma stream com falha permanente através do método RunStream. Cada tipo de base de dados de origem tem a sua própria definição do que é possível fazer com as operações de recuperação de streams. Para mais informações, consulte o artigo Recupere uma stream.

Recupere uma stream para uma origem MySQL ou Oracle

Os exemplos de código seguintes mostram pedidos para recuperar uma stream para uma origem MySQL ou Oracle a partir de várias posições de ficheiros de registo:

REST

Recuperar uma stream a partir da posição atual. Esta é a opção predefinida:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Recupere uma stream a partir da posição disponível seguinte:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "nextAvailableStartPosition": {}
  }
}

Recuperar uma stream a partir da posição mais recente:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "mostRecentStartPosition": {}
  }
}

Recuperar uma stream a partir de uma posição específica (replicação baseada em binlog do MySQL):

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Substitua o seguinte:

  • NAME_OF_THE_LOG_FILE: o nome do ficheiro de registo a partir do qual quer recuperar a sua stream
  • POSITION: a posição no ficheiro de registo a partir da qual quer recuperar a stream. Se não fornecer o valor, o Datastream recupera a stream a partir do início do ficheiro.

Por exemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 4
      }
    }
  }
}

Recuperar uma stream a partir de uma posição específica (replicação baseada em GTID do MySQL):

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "GTID_SET"
      }
    }
  }
}

Substitua GTID_SET por um ou mais GTIDs únicos ou intervalos de GTIDs a partir dos quais quer recuperar a sua stream.

Por exemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:1-3"
      }
    }
  }
}

Recuperar uma stream a partir de uma posição específica (Oracle):

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Substitua scn pelo número de alteração do sistema (SCN) no ficheiro de registo de repetição a partir do qual quer recuperar a sua stream. Este campo é obrigatório.

Por exemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 234234
      }
    }
  }
}

Para mais informações sobre as opções de recuperação disponíveis, consulte o artigo Recupere uma stream.

gcloud

A recuperação de um stream através do gcloud não é suportada.

Recupere uma stream para uma origem PostgreSQL

O seguinte exemplo de código mostra um pedido para recuperar uma stream para uma origem do PostgreSQL. Durante a recuperação, a stream começa a ler a partir do primeiro número de sequência de registo (LSN) no espaço de replicação configurado para a stream.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Por exemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run

Se quiser alterar o espaço de replicação, atualize primeiro a stream com o novo nome do espaço de replicação:

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot
{
  "sourceConfig": {
    "postgresqlSourceConfig": {
      "replicationSlot": "NEW_REPLICATION_SLOT_NAME"
    }
  }
}

gcloud

A recuperação de um stream através do gcloud não é suportada.

Recupere uma stream para uma origem do SQL Server

Os seguintes exemplos de código mostram pedidos de exemplo para recuperar uma stream para uma origem do SQL Server.

REST

Recuperar uma stream a partir da primeira posição disponível:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Por exemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run

Recupere uma stream a partir de um número de sequência de registo preferencial:

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": lsn
      }
    }
  }
}

Por exemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": 0000123C:0000BA78:0004
      }
    }
  }
}

gcloud

A recuperação de um stream através do gcloud não é suportada.

Inicie ou retome uma stream a partir de uma posição específica

Pode iniciar uma stream ou retomar uma stream pausada a partir de uma posição específica para origens MySQL e Oracle. Isto pode ser útil quando quiser fazer o preenchimento com uma ferramenta externa ou iniciar a CDC a partir de uma posição que indicar. Para uma origem do MySQL, tem de indicar uma posição do binlog ou um conjunto GTID. Para uma origem do Oracle, tem de indicar um número de alteração do sistema (SCN) no ficheiro de registo de refazer.

O código seguinte mostra um pedido para iniciar ou retomar uma stream já criada a partir de uma posição específica.

Inicie ou retome uma stream a partir de uma posição binlog específica (MySQL):

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Substitua o seguinte:

  • NAME_OF_THE_LOG_FILE: O nome do ficheiro de registo a partir do qual quer iniciar a stream.
  • POSITION: a posição no ficheiro de registo a partir da qual quer iniciar a sua stream. Se não fornecer o valor, o fluxo de dados começa a ler a partir do início do ficheiro.

Por exemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 2
      }
    }
  }
}

gcloud

Não é possível iniciar nem retomar um stream a partir de uma posição específica com o comando gcloud. Para obter informações sobre a utilização de gcloud para iniciar ou retomar uma stream, consulte a documentação do Cloud SDK.

Inicie ou retome uma stream a partir de um conjunto de GTIDs específico (MySQL):

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "GTID_SET"
      }
    }
  }
}

Substitua GTID_SET por um ou mais GTIDs únicos ou intervalos de GTIDs a partir dos quais quer iniciar ou retomar a sua stream.

Por exemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:3-7"
      }
    }
  }
}

gcloud

Não é possível iniciar nem retomar um stream a partir de uma posição específica com o comando gcloud. Para obter informações sobre a utilização de gcloud para iniciar ou retomar uma stream, consulte a documentação do Cloud SDK.

Inicie ou retome uma stream a partir de um número de alteração do sistema específico no ficheiro de registo de refazer (Oracle):

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Substitua scn pelo número de alteração do sistema (SCN) no ficheiro de registo de repetição a partir do qual quer iniciar a sua stream. Este campo é obrigatório.

Por exemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 123123
      }
    }
  }
}

gcloud

Não é possível iniciar nem retomar um stream a partir de uma posição específica com o comando gcloud. Para obter informações sobre como usar gcloud para iniciar uma stream, consulte a documentação do Cloud SDK.

Modifique uma stream

O código seguinte mostra um pedido para atualizar a configuração de rotação de ficheiros de uma stream para rodar o ficheiro a cada 75 MB ou 45 segundos.

Para este exemplo, os campos especificados para o parâmetro updateMask incluem os campos fileRotationMb e fileRotationInterval, representados pelas flags destinationConfig.gcsDestinationConfig.fileRotationMb e destinationConfig.gcsDestinationConfig.fileRotationInterval, respetivamente.

REST

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

O código seguinte mostra um pedido para incluir um ficheiro de esquema de tipos unificados no caminho dos ficheiros que o Datastream escreve no Cloud Storage. Como resultado, o Datastream escreve dois ficheiros: um ficheiro de dados JSON e um ficheiro de esquema Avro.

Para este exemplo, o campo especificado é o campo jsonFileFormat, representado pela flag destinationConfig.gcsDestinationConfig.jsonFileFormat.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }  
    }
  }
}

O código seguinte mostra um pedido para o Datastream replicar os dados existentes, além das alterações contínuas aos dados, da base de dados de origem para o destino.

A secção oracleExcludedObjects do código mostra as tabelas e os esquemas que estão restritos de serem preenchidos novamente no destino.

Para este exemplo, todas as tabelas e esquemas vão ser preenchidos com dados históricos, exceto a tabelaA no esquema3.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schema": "schema3",
          "oracleTables": [
            { "table": "tableA" }
          ]
        }
      ]
    }
  }
}  

gcloud

Para mais informações sobre a utilização de gcloud para modificar a sua stream, consulte a documentação do Google Cloud SDK.

Inicie o repreenchimento de um objeto de uma stream

Uma stream no Datastream pode preencher dados do histórico, bem como transmitir alterações contínuas para um destino. As alterações contínuas são sempre transmitidas de uma origem para um destino. No entanto, pode especificar se quer que os dados do histórico sejam transmitidos.

Se quiser que os dados do histórico sejam transmitidos a partir da origem para o destino, use o parâmetro backfillAll.

A stream de dados também lhe permite transmitir dados do histórico apenas para tabelas de base de dados específicas. Para o fazer, use o parâmetro backfillAll e exclua as tabelas para as quais não quer dados do histórico.

Se quiser que apenas as alterações contínuas sejam transmitidas para o destino, use o parâmetro backfillNone. Se quiser que o Datastream transmita uma imagem instantânea de todos os dados existentes da origem para o destino, tem de iniciar o preenchimento manual dos objetos que contêm estes dados.

Outro motivo para iniciar o preenchimento de dados para um objeto é se os dados estiverem dessincronizados entre a origem e o destino. Por exemplo, um utilizador pode eliminar dados no destino inadvertidamente, e os dados ficam perdidos. Neste caso, iniciar o preenchimento de dados para o objeto serve como um "mecanismo de reposição" porque todos os dados são transmitidos para o destino de uma só vez. Como resultado, os dados são sincronizados entre a origem e o destino.

Antes de poder iniciar o preenchimento alternativo de um objeto de uma stream, tem de obter informações sobre o objeto.

Cada objeto tem um OBJECT_ID, que identifica o objeto de forma exclusiva. Usa o OBJECT_ID para iniciar o preenchimento para a stream.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob

gcloud

Para mais informações sobre a utilização de gcloud para iniciar o preenchimento alternativo de um objeto da sua stream, consulte a documentação do SDK do Google Cloud.

Pare o repreenchimento de um objeto de uma stream

Depois de iniciar o preenchimento alternativo para um objeto de uma stream, pode parar o preenchimento alternativo para o objeto. Por exemplo, se um utilizador modificar um esquema de base de dados, o esquema ou os dados podem ficar danificados. Não quer que este esquema ou dados sejam transmitidos para o destino e, por isso, interrompe o preenchimento dos dados antigos do objeto.

Também pode parar o preenchimento para um objeto para fins de equilíbrio de carga. A stream de dados pode executar vários preenchimentos em simultâneo. Isto pode colocar uma carga adicional na fonte. Se o carregamento for significativo, pare o repreenchimento para cada objeto e, em seguida, inicie o repreenchimento dos objetos, um a um.

Antes de poder parar o preenchimento para um objeto de uma stream, tem de fazer um pedido para obter informações sobre todos os objetos de uma stream. Cada objeto devolvido tem um OBJECT_ID, que identifica o objeto de forma exclusiva. Use o elemento OBJECT_ID para parar o preenchimento de substituição da stream.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob

gcloud

Para mais informações sobre como usar gcloud para parar o preenchimento alternativo de um objeto da sua stream, consulte a documentação do Google Cloud SDK.

Altere o número máximo de tarefas de CDC simultâneas

O código seguinte mostra como definir o número máximo de tarefas de captura de dados de alterações (CDC) simultâneas para um fluxo do MySQL como 7.

Para este exemplo, o campo especificado para o parâmetro updateMask é o campo maxConcurrentCdcTasks. Ao definir o valor como 7, está a alterar o número máximo de tarefas de CDC simultâneas do valor anterior para 7. Pode usar valores de 0 a 50 (inclusive). Se não definir o valor ou se o definir como 0, a predefinição do sistema de 5 tarefas é definida para a stream.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentCdcTasks": "7"
      }
    }  
}

gcloud

Para mais informações sobre a utilização do gcloud, consulte a documentação do SDK do Google Cloud.

Altere o número máximo de tarefas de preenchimento simultâneas

O código seguinte mostra como definir o número máximo de tarefas de preenchimento para um fluxo MySQL como 25.

Para este exemplo, o campo especificado para o parâmetro updateMask é o campo maxConcurrentBackfillTasks. Ao definir o valor como 25, está a alterar o número máximo de tarefas de preenchimento de reserva simultâneas do valor anterior para 25. Pode usar valores de 0 a 50 (inclusive). Se não definir o valor ou se o definir como 0, a predefinição do sistema de 16 tarefas é definida para a stream.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/
streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentBackfillTasks": "25"
      }
    }  
}

gcloud

Para mais informações sobre a utilização do gcloud, consulte a documentação do SDK do Google Cloud.

Ative o streaming de objetos grandes para origens Oracle

Pode ativar o streaming de objetos grandes, como objetos binários grandes (BLOB), objetos grandes de carateres (CLOB) e objetos grandes de carateres nacionais (NCLOB) para streams com origens Oracle. A flag streamLargeObjects permite-lhe incluir objetos grandes em streams novas e existentes. A flag está definida ao nível do fluxo. Não precisa de especificar as colunas de tipos de dados de objetos grandes.

O exemplo seguinte mostra como criar um fluxo que lhe permite transmitir objetos grandes.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleLobStream
{
  "displayName": "Oracle LOB stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1",
            "oracleTables": [
              {
                "table": "tableA",
                "oracleColumns": [
                  {
                    "column": "column1,column2"
                  }
                ]
              }
            ]
          }
        ]
      },
      "excludeObjects": {},
      "streamLargeObjects": {}
    }
  }
}

gcloud

Para mais informações sobre como usar gcloud para atualizar uma stream, consulte a documentação do SDK do Google Cloud.

Elimine uma stream

O código seguinte mostra um pedido para eliminar uma stream.

REST

DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID

gcloud

Para mais informações sobre como usar gcloud para eliminar a sua stream, consulte a documentação do Google Cloud SDK.

O que se segue?