En las empresas con muchas fuentes de datos aisladas, puede ser difícil acceder a los datos de la empresa en toda la organización, sobre todo en tiempo real. Esto provoca que el acceso a los datos sea limitado y lento, lo que impide que la organización pueda introspeccionar.
Datastream proporciona acceso casi en tiempo real a los datos de cambios de varias fuentes de datos locales y basadas en la nube para crear acceso a los datos de la organización. Datastream proporciona una API de consumo unificada que democratiza el acceso de la organización a los datos empresariales más recientes disponibles en toda la organización, lo que permite crear escenarios integrados casi en tiempo real.
Un ejemplo de este tipo de situación es la transferencia de datos de una base de datos de origen a un servicio de almacenamiento o una cola de mensajes basados en la nube, y la transformación de estos datos en un formato que puedan leer otras aplicaciones y servicios que se comuniquen con este servicio de almacenamiento o esta cola de mensajes.
En este tutorial, aprenderá a usar Datastream para transferir esquemas, tablas y datos de una base de datos de Oracle de origen a una carpeta de un segmento de Cloud Storage. Cloud Storage es un servicio web para almacenar y acceder a datos en Google Cloud. El servicio combina el rendimiento y la escalabilidad de la nube de Google con funciones avanzadas de seguridad y uso compartido.
Como parte de la transferencia de esta información a una carpeta del segmento de Cloud Storage de destino, Datastream traduce esta información a Avro. Avro se define mediante un esquema escrito en JavaScript Object Notation (JSON). De esta forma, puede leer datos de diferentes fuentes de datos de forma uniforme.
Configurar variables de entorno
En este procedimiento, definirá las siguientes variables:
$PROJECT
: esta variable está asociada a tu Google Cloud proyecto. Todos los recursos de Google Cloud que asignes y utilices deben pertenecer a un proyecto.$TOKEN
: esta variable está asociada a un token de acceso. El token de acceso proporciona una sesión que Cloud Shell usa para realizar tareas en Datastream mediante APIs REST.
Inicia tu aplicación de Cloud Shell.
Después de autenticarte en tu aplicación con tu cuenta de Google, introduce el siguiente comando:
gcloud auth login
En la petición
Do you want to continue (Y/n)?
, introduceY
.Abre un navegador web y copia la URL en él.
Autentícate en el SDK de Google Cloud con tu cuenta de Google. Aparecerá un código en la página Iniciar sesión. Este código es tu token de acceso.
Copia el token de acceso, pégalo en el parámetro
Enter verification code:
de tu aplicación Cloud Shell y pulsaEnter
.En la petición, introduce
PROJECT=\"YOUR_PROJECT_NAME\"
para definir la variable de entorno$PROJECT
en tu proyecto Google Cloud.En la petición, introduce
gcloud config set project YOUR_PROJECT_NAME
para definir el proyecto en el que quieres trabajar como tu proyecto Google Cloud.La línea de comandos se actualiza para reflejar el proyecto activo y respeta este formato:
USERNAME@cloudshell:~ (YOUR_PROJECT_NAME)$
En la petición, introduce
TOKEN=$(gcloud auth print-access-token)
para obtener el token de acceso y guárdalo como una variable.En la petición, introduce los siguientes comandos para asegurarte de que las variables
$PROJECT
y$TOKEN
se han definido correctamente:echo $PROJECT
echo $TOKEN
Ahora que has definido tus variables, puedes enviar solicitudes a Datastream para crear y gestionar tanto perfiles de conexión como un flujo.
Crear y gestionar perfiles de conexión
En esta sección, creará y gestionará perfiles de conexión para una base de datos Oracle de origen y un segmento de destino en Cloud Storage.
Cuando creas estos perfiles de conexión, se crean registros que contienen información sobre la base de datos de origen y el segmento de Cloud Storage de destino. Datastream usa la información de los perfiles de conexión para transferir datos de la base de datos de origen a una carpeta del contenedor de destino.
Crear y gestionar perfiles de conexión implica lo siguiente:
- Crear perfiles de conexión para una base de datos Oracle de origen y un segmento de destino en Cloud Storage
- Obtener información sobre un perfil de conexión
- Modificar un perfil de conexión
- Realizar una llamada a la API Discover en el perfil de conexión de Oracle de origen. Esta llamada te permite consultar la base de datos para ver los objetos asociados a ella. Estos objetos incluyen los esquemas y las tablas que contienen los datos de la base de datos. Cuando usa Datastream para configurar un flujo, puede que no quiera extraer todos los objetos de la base de datos, sino un subconjunto de los objetos (por ejemplo, solo determinadas tablas y esquemas de la base de datos). Usa la API Discover para encontrar (o descubrir) el subconjunto de objetos de la base de datos que quieras extraer.
Crear perfiles de conexión
En este procedimiento, creará dos perfiles de conexión: uno a una base de datos Oracle de origen y otro a un segmento de destino en Cloud Storage.
- Crea un perfil de conexión a una base de datos de Oracle de origen. En el mensaje, introduce el siguiente comando:
ORACLE="{\"displayName\":\"DISPLAY_NAME\",\"oracle_profile\":{\"hostname\":\"HOSTNAME\",\" username\":\"USERNAME\",\"password\":\"PASSWORD\",\"database_service\":\"DATABASE_SERVICE\",\" port\":"PORT_NUMBER\"},\"no_connectivity\":{}}"
En la siguiente tabla se indican los valores de los parámetros de la base de datos de Oracle de origen:
Valor de parámetro | Reemplazar con |
---|---|
DISPLAY_NAME | Nombre visible del perfil de conexión a la base de datos de origen. |
HOSTNAME | Nombre de host del servidor de la base de datos de origen. |
USERNAME | Nombre de usuario de la cuenta de la base de datos de origen (por ejemplo, ROOT). |
PASSWORD | La contraseña de la cuenta de la base de datos de origen. |
DATABASE_SERVICE | El servicio que asegura que la base de datos de origen esté protegida y monitorizada. En el caso de las bases de datos Oracle, el servicio de base de datos suele ser ORCL. |
PORT_NUMBER | El número de puerto reservado para la base de datos de origen. En el caso de una base de datos Oracle, el número de puerto suele ser 1521. |
En la petición, introduce el comando
echo $ORACLE | jq
para ver el perfil de conexión de origen que has creado en texto fácil de leer.{ "displayName": "DISPLAY_NAME", "oracle_profile": { "hostname": "HOSTNAME", "username": "USERNAME", "password": "PASSWORD", "database_service": "DATABASE_SERVICE", "port": PORT_NUMBER }, "no_connectivity": {} }
Envía el perfil de conexión de Oracle para que se pueda crear. En el mensaje, introduce el siguiente comando:
curl -X POST -d $ORACLE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles? connection_profile_id=SOURCE_CONNECTION_PROFILE_ID En la siguiente tabla se describen los valores de los parámetros de este comando:
Valor de parámetro Reemplazar con DATASTREAM_API_VERSION La versión actual de la API DataStream (por ejemplo, v1
).PROJECT_PATH La ruta completa de tu proyecto Google Cloud (por ejemplo, projects/$PROJECT/locations/YOUR_PROJECT_LOCATION
).SOURCE_CONNECTION_PROFILE_ID El identificador único reservado para este perfil de conexión (por ejemplo, cp-1). Comprueba que veas las siguientes líneas de código:
{ "name": "PROJECT_PATH/operations/operation-SOURCE_CONNECTION_PROFILE_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "datastream.googleapis.com/DATASREAM_VERSION/PROJECT_PATH/connectionProfiles/
SOURCE_CONNECTION_PROFILE_ID" , "verb": "create", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false }Crea un perfil de conexión a un segmento de destino de Cloud Storage. En el mensaje, introduce el siguiente comando:
GOOGLECLOUDSTORAGE="{\"displayName\":\"DISPLAY_NAME\",\"gcs_profile\":{\"bucket_name\":\"BUCKET_NAME\",
\"root_path\":\"/FOLDER_PATH\"},\"no_connectivity\":{}}" En la siguiente tabla se describen los valores de los parámetros del segmento de destino:
Valor de parámetro Reemplazar con DISPLAY_NAME Nombre visible del perfil de conexión al bucket de destino. BUCKET_NAME Nombre del segmento de destino. FOLDER_PATH La carpeta del segmento de destino en la que Datastream transferirá los datos de la base de datos de origen (por ejemplo, /root/path). En la petición, introduce el comando
echo $GOOGLECLOUDSTORAGE | jq
para ver el perfil de conexión de destino que has creado en texto fácil de leer.{ "displayName": "DISPLAY_NAME", "gcs_profile": { "bucket_name": "BUCKET_NAME", "root_path": "/FOLDER_PATH" }, "no_connectivity": {} }
Envía el perfil de conexión de Cloud Storage para que se pueda crear. En el mensaje, introduce el siguiente comando:
curl -X POST -d $GOOGLECLOUDSTORAGE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles? connection_profile_id=DESTINATION_CONNECTION_PROFILE_ID Comprueba que veas las siguientes líneas de código:
{ "name": "PROJECT_PATH/operations/operation-DESTINATION_CONNECTION_PROFILE_OPERATION_ID", "metadata": { "@type": "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.
OperationMetadata" , "createTime": "DATE_AND_TIME_STAMP", "target": "datastream.googleapis.com/DATASTREAM_VERSION/PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID" , "verb": "create", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false }Confirma que se han creado ambos perfiles de conexión. En el mensaje, introduce el siguiente comando:
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles Verifica que recibes dos resultados para los perfiles de conexión de origen y de destino.
{ "connectionProfiles": [ { "name": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "gcsProfile": { "bucketName": "BUCKET_NAME", "rootPath": "FOLDER_PATH" }, "noConnectivity": {} }, { "name": "PROJECT_PATH/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "oracleProfile": { "hostname": "HOSTNAME", "port": PORT_NUMBER, "username": "USERNAME", "databaseService": "DATABASE_SERVICE" }, "noConnectivity": {} } ] }
Gestionar perfiles de conexión
En este procedimiento, gestionará los perfiles de conexión que ha creado para una base de datos Oracle de origen y un segmento de destino en Cloud Storage. Entre los datos que recoge se incluyen los siguientes:
- Obtener información sobre el perfil de conexión de Cloud Storage de destino
- Modificando este perfil de conexión. En este tutorial, cambiarás la carpeta del segmento de Cloud Storage de destino a /root/tutorial. Datastream transfiere los datos de la base de datos de origen a esta carpeta.
- Llamar a la API Discover en el perfil de conexión de Oracle de origen
Recupera información sobre el perfil de conexión de Cloud Storage de destino. En el mensaje, introduce el siguiente comando:
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles/ DESTINATION_CONNECTION_PROFILE_ID Verifica que veas información sobre este perfil de conexión.
{ "name": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "gcsProfile": { "bucketName": "BUCKET_NAME", "rootPath": "FOLDER_PATH" }, "noConnectivity": {} }
Modifica este perfil de conexión. Para ello, primero define una variable UPDATE. Esta variable contiene los valores del perfil de conexión que quiere cambiar. En este tutorial, cambiarás la carpeta del segmento de destino a /root/tutorial.
Para definir la variable, introduce el siguiente comando en la petición:
UPDATE="{\"gcsProfile\":{\"rootPath\":\"/root/tutorial\"}}"
En el mensaje, introduce el siguiente comando:
curl -X PATCH -d $UPDATE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles/ DESTINATION_CONNECTION_PROFILE_ID?update_mask=gcsProfile.rootPath Comprueba que veas las siguientes líneas de código:
{ "name": "PROJECT_PATH/operations/operation-DESTINATION_CONNECTION_PROFILE_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID", "verb": "update", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false }
Confirma que el perfil de conexión se ha modificado. En el mensaje, introduce el siguiente comando:
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles/ DESTINATION_CONNECTION_PROFILE_ID Verifica que la carpeta del segmento de destino del perfil de conexión de Cloud Storage ahora sea /root/tutorial.
{ "name": "PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "gcsProfile": { "bucketName": "BUCKET_NAME", "rootPath": "/root/tutorial" }, "noConnectivity": {} }
Usa la API Discover de Datastream para descubrir los esquemas y las tablas de la base de datos de Oracle de origen. Datastream proporciona acceso a esta base de datos a través del perfil de conexión de origen.
Descubre los esquemas de la base de datos de Oracle. En el mensaje, introduce el siguiente comando:
curl -X POST -d "{\"connection_profile_name\":\"projects/YOUR_PROJECT_NUMBER/locations/
YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID\", \"oracle_rdbms\":{\"oracleSchemas\":[{\"schema\":\"ROOT\"}]}}" -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles:discover Verifica que Datastream obtiene todos los esquemas de tu base de datos.
Recupera las tablas de un esquema de tu base de datos. En este tutorial, usarás la API Discover para obtener las tablas del esquema ROOT. Sin embargo, puede descubrir las tablas de cualquier esquema de su base de datos.
En el mensaje, introduce el siguiente comando:
curl -X POST -d "{\"connection_profile_name\":\"projects/YOUR_PROJECT_NUMBER/locations/
YOUR_PROJECT_LOCATION/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID\", \"oracle_rdbms\":{\"oracleSchemas\":[{\"schema\":\"ROOT\"}]}}" -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/connectionProfiles:discover - Comprueba que Datastream obtiene todas las tablas del esquema que has especificado (en este tutorial, el esquema ROOT).
Ahora que has creado y gestionado perfiles de conexión para una base de datos Oracle de origen y un segmento de destino en Cloud Storage, puedes crear y gestionar un flujo en Datastream.
Crear y gestionar un flujo
En esta sección, crearás y gestionarás un flujo. Datastream usa este flujo para transferir datos, esquemas y tablas de la base de datos de origen a una carpeta del segmento de Cloud Storage de destino.
Crear y gestionar un flujo incluye lo siguiente:
- Validar un flujo para asegurarse de que se ejecutará correctamente y de que supera todas las comprobaciones de validación. Entre estas comprobaciones se incluyen las siguientes:
- Si la fuente está configurada correctamente para permitir que Datastream transmita datos desde ella.
- Si la transmisión puede conectarse tanto al origen como al destino.
- La configuración integral del flujo.
- Se va a crear el flujo con las siguientes listas:
- Una lista de permitidos. En esta lista se especifican las tablas y los esquemas de la base de datos de origen que Datastream puede transferir a una carpeta del segmento de destino de Cloud Storage. En este tutorial, se trata de la carpeta /root/tutorial.
- Una lista de rechazo. En esta lista se especifican las tablas y los esquemas de la base de datos de origen que Datastream no puede transferir a la carpeta del segmento de destino de Cloud Storage.
- Obtener información sobre el flujo
- Modificar el flujo
- Iniciar la transmisión para que Datastream pueda transferir datos, esquemas y tablas de la base de datos de origen a una carpeta del segmento de Cloud Storage de destino.
- Usar la API Fetch Errors para detectar errores asociados al flujo
- Pausando la emisión. Cuando se pausa un flujo, Datastream no extrae ningún dato nuevo de la base de datos de origen al segmento de destino.
- Reanudar el flujo pausado para que Datastream pueda seguir transfiriendo datos al segmento de destino.
Crear un flujo
En este procedimiento, se crea un flujo desde la base de datos Oracle de origen a una carpeta del segmento de Cloud Storage de destino. El flujo que crees incluirá tanto una lista de permitidos como una lista de rechazados.
Definir una variable SCHEMAS. Esta variable define los esquemas que contienen los datos y las tablas que quieres que Datastream recupere de la base de datos de origen y transfiera a la carpeta /root/tutorial del segmento de destino de Cloud Storage. En este tutorial, asociará la variable SCHEMAS al esquema ROOT.
En el mensaje, introduce el siguiente comando:
SCHEMAS="{\"oracleSchemas\":[{\"schema\":\"ROOT\"}]}"
En la petición, introduce el comando echo $SCHEMAS | jq para ver el esquema ROOT que has definido para esta variable en un texto fácil de leer.
Crea un flujo. En el mensaje, introduce el siguiente comando:
STREAM="{\"display_name\":\"DISPLAY_NAME\",\"source_config\":{\"source_connection_profile_name\":\"
PROJECT_PATH/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID" ,\"oracle_source_config\":{\"allowlist\":$SCHEMAS,\"rejectlist\":{}}},\"destination_config\":{\"destination_connection_profile_name\" :\"PROJECT_PATH/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID\",\"gcs_destination_config\": {\"file_rotation_mb\":5,\"file_rotation_interval\":{\"seconds\":15},\"avro_file_format\":{}}, \"backfill_all\":{}}}" En la petición, introduce el comando
echo $STREAM | jq
para ver el flujo que has creado en un texto fácil de leer.{ "display_name": "DISPLAY_NAME", "source_config": { "source_connection_profile_name": "PROJECT_PATH/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID", "oracle_source_config": { "allowlist": { "oracleSchemas": [ { "schema": "ROOT" } ] }, "rejectlist": {} } }, "destination_config": { "destination_connection_profile_name": "PROJECT_PATH/connectionProfiles/
DESTINATION_CONNECTION_PROFILE_ID" , "gcs_destination_config": { "file_rotation_mb": 5, "file_rotation_interval": { "seconds": 15 }, "avro_file_format": {} } }, "backfill_all": {} }Usa esta tabla para entender los siguientes parámetros de la emisión:
Parámetro Descripción allowlist Los esquemas, que contienen tablas y datos, que se transferirán de la base de datos de origen a una carpeta del segmento de destino de Cloud Storage. En este tutorial, todas las tablas y los datos del esquema ROOT (y solo este esquema) se transferirán a la carpeta /root/tutorial del segmento de destino. rejectlist Los esquemas que contengan tablas y datos no se transferirán a una carpeta del segmento de destino de Cloud Storage. En este tutorial, el valor {} significa que no se impedirá que se transfieran tablas ni datos de la base de datos de origen al bucket de destino. file_rotation_mb Tamaño (en MB) de los archivos que contienen los datos que se transfieren de la base de datos de origen a una carpeta del segmento de Cloud Storage de destino. En este tutorial, a medida que se recuperan los datos de la base de datos de origen, se escriben en archivos de 5 MB. Si algún dato supera este tamaño, se segmentará en varios archivos de 5 MB. file_rotation_interval Número de segundos que transcurrirán antes de que Datastream cierre un archivo de una carpeta del segmento de destino de Cloud Storage y abra otro archivo para contener los datos que se transfieran desde la base de datos de origen. En este tutorial, el intervalo de rotación de archivos se ha definido en 15 segundos. avro_file_format El formato de los archivos que Datastream transferirá de la base de datos de origen a una carpeta del segmento de destino de Cloud Storage. En este tutorial, el formato de archivo es Avro.
backfill_all Este parámetro está asociado al relleno histórico. Si asignas a este parámetro un diccionario vacío ({}), Datastream rellenará los datos históricos:
- El historial de datos, además de los cambios que se produzcan en los datos, de la base de datos de origen a la de destino.
- Esquemas y tablas de la fuente al destino.
Valida el flujo para asegurarte de que se ejecutará correctamente y de que supera todas las comprobaciones de validación. En el mensaje, introduce el siguiente comando:
curl -X POST -d $STREAM -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
"https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams?stream_id= STREAM_ID&validate_only=true" Comprueba que veas la línea de código
{}
. Esto indica que la emisión ha superado todas las comprobaciones de validación y que no hay errores asociados a ella.Envía la emisión para que se pueda crear. En el mensaje, introduce el siguiente comando:
curl -X POST -d $STREAM -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams?stream_id=STREAM_ID Comprueba que veas las siguientes líneas de código:
{ "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/streams/STREAM_ID", "verb": "create", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false }
Confirma que se ha creado la emisión. En el mensaje, introduce el siguiente comando:
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams Verifica que recibes un resultado devuelto para el flujo que has creado.
{ "streams": [ { "name": "PROJECT_PATH/streams/STREAM_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "sourceConfig": { "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION
/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID" , "oracleSourceConfig": { "allowlist": { "oracleSchemas": [ { "schema": "ROOT" } ] }, "rejectlist": {} } }, "destinationConfig": { "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID" , "gcsDestinationConfig": { "fileRotationMb": 5, "fileRotationInterval": "15s" "avroFileFormat": {} } }, "state": "CREATED", "backfillAll": {} } ] }
Gestionar la emisión
En este procedimiento, se usa el flujo que has creado para transferir datos de una base de datos de Oracle de origen a una carpeta de un segmento de destino de Cloud Storage. Entre los datos que recoge se incluyen los siguientes:
- Obtener información sobre el flujo
- Modificar el flujo
- Iniciar la emisión
- Usar la API Fetch Errors para detectar errores asociados al flujo
- Pausar y reanudar la emisión
Recupera información sobre la emisión. En el mensaje, introduce el siguiente comando:
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID Verifica que veas información sobre este flujo.
{ "name": "PROJECT_PATH/streams/STREAM_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "sourceConfig": { "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION
/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID" , "oracleSourceConfig": { "allowlist": { "oracleSchemas": [ { "schema": "ROOT" } ] }, "rejectlist": {} } }, "destinationConfig": { "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID" , "gcsDestinationConfig": { "fileRotationMb": 5, "fileRotationInterval": "15s" "avroFileFormat": {} } }, "state": "CREATED", "backfillAll": {} }Modifica este flujo. Para ello, primero define una variable UPDATE. Esta variable contiene los valores del flujo que quiere cambiar. En este tutorial, cambia el tamaño (en MB) de los archivos que contienen los datos que se transfieren de la base de datos de origen a una carpeta del segmento de destino de Cloud Storage (de 5 a 100 MB). A medida que se recuperan los datos de la base de datos de origen, se escriben en archivos de 100 MB. Si algún dato supera este tamaño, se segmentará en varios archivos de 100 MB.
Para definir la variable, introduce el siguiente comando en la petición:
UPDATE="{\"destination_config\":{\"gcs_destination_config\":{\"file_rotation_mb\":100}}}"
En el mensaje, introduce el siguiente comando:
curl -X PATCH -d $UPDATE -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID/ ?update_mask=destination_config.gcs_destination_config.file_rotation_mb Comprueba que veas las siguientes líneas de código:
{ "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/streams/STREAM_ID", "verb": "update", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false }
Confirma que la emisión se ha modificado. En el mensaje, introduce el siguiente comando:
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID Comprueba que el valor del parámetro fileRotationMb del perfil de conexión de Cloud Storage ahora sea
100
.{ "name": "PROJECT_PATH/streams/STREAM_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "sourceConfig": { "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION
/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID" , "oracleSourceConfig": { "allowlist": { "oracleSchemas": [ { "schema": "ROOT" } ] }, "rejectlist": {} } }, "destinationConfig": { "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID" , "gcsDestinationConfig": { "fileRotationMb": 100, "fileRotationInterval": "15s" "avroFileFormat": {} } }, "state": "CREATED", "backfillAll": {} }Inicia la emisión. Para ello:
Cambia la variable
UPDATE
. En el mensaje, introduce el siguiente comando:UPDATE="{\"state\":\"RUNNING\"}"
A continuación, introduce el siguiente comando:
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID ?updateMask=state
Comprueba que veas las siguientes líneas de código.
{ "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/streams/STREAM_ID", "verb": "start", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false }
Al cabo de unos minutos, recupera información sobre el flujo para confirmar que se ha iniciado:
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID Verifica que el estado del flujo haya cambiado de
CREATED
aRUNNING
.{ "name": "PROJECT_PATH/streams/STREAM_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "sourceConfig": { "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION
/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID" , "oracleSourceConfig": { "allowlist": { "oracleSchemas": [ { "schema": "ROOT" } ] }, "rejectlist": {} } }, "destinationConfig": { "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID" , "gcsDestinationConfig": { "fileRotationMb": 100, "fileRotationInterval": "15s" "avroFileFormat": {} } }, "state": "RUNNING", "backfillAll": {} }Usar la API Fetch Errors para obtener los errores asociados al flujo.
En el mensaje, introduce el siguiente comando:
curl -X POST -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/ STREAM_ID:fetchErrors Comprueba que veas las siguientes líneas de código:
{ "name": "PROJECT_PATH/operations/operation-FETCH_ERRORS_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/streams/STREAM_ID", "verb": "fetchErrors", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false }
En el mensaje, introduce el siguiente comando:
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/operations/ operation-FETCH_ERRORS_OPERATION_ID Comprueba que veas las siguientes líneas de código:
{ "name": "PROJECT_PATH/operations/operation-FETCH_ERRORS_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION
.OperationMetadata" , "createTime": "DATE_AND_TIME_STAMP", "endTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/streams/STREAM_ID", "verb": "fetchErrors", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": true, "response": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.FetchErrorsResponse" } }
Pausar la emisión. Para ello:
Cambia la variable
UPDATE
. En el mensaje, introduce el siguiente comando:UPDATE="{\"state\":\"PAUSED\"}"
A continuación, introduce el siguiente comando:
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID ?updateMask=state
Comprueba que veas las siguientes líneas de código.
{ "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/streams/STREAM_ID", "verb": "start", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false }
Recupera información sobre la emisión para confirmar que está en pausa.
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID Verifica que el estado del flujo haya cambiado de
RUNNING
aPAUSED
.{ "name": "PROJECT_PATH/streams/STREAM_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "sourceConfig": { "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION
/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID" , "oracleSourceConfig": { "allowlist": { "oracleSchemas": [ { "schema": "ROOT" } ] }, "rejectlist": {} } }, "destinationConfig": { "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID" , "gcsDestinationConfig": { "fileRotationMb": 100, "fileRotationInterval": "15s" "avroFileFormat": {} } }, "state": "PAUSED", "backfillAll": {} }Reanuda el flujo pausado. Para ello:
Cambia la variable
UPDATE
. En el mensaje, introduce el siguiente comando:UPDATE="{\"state\":\"RUNNING\"}"
A continuación, introduce el siguiente comando:
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID ?updateMask=state
Comprueba que veas las siguientes líneas de código.
{ "name": "PROJECT_PATH/operations/operation-STREAM_OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.datastream.DATASTREAM_API_VERSION.OperationMetadata", "createTime": "DATE_AND_TIME_STAMP", "target": "PROJECT_PATH/streams/STREAM_ID", "verb": "start", "requestedCancellation": false, "apiVersion": "DATASTREAM_API_VERSION" }, "done": false }
Al cabo de unos segundos, recupera información sobre el flujo para confirmar que se está ejecutando de nuevo.
curl -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json"
https://datastream.googleapis.com/DATASTREAM_API_VERSION/PROJECT_PATH/streams/STREAM_ID Verifica que el estado del flujo haya cambiado de
PAUSED
aRUNNING
.{ "name": "PROJECT_PATH/streams/STREAM_ID", "createTime": "DATE_AND_TIME_STAMP", "updateTime": "DATE_AND_TIME_STAMP", "displayName": "DISPLAY_NAME", "sourceConfig": { "sourceConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION
/connectionProfiles/SOURCE_CONNECTION_PROFILE_ID" , "oracleSourceConfig": { "allowlist": { "oracleSchemas": [ { "schema": "ROOT" } ] }, "rejectlist": {} } }, "destinationConfig": { "destinationConnectionProfileName": "projects/YOUR_PROJECT_NUMBER/locations/YOUR_PROJECT_LOCATION/connectionProfiles/DESTINATION_CONNECTION_PROFILE_ID" , "gcsDestinationConfig": { "fileRotationMb": 100, "fileRotationInterval": "15s" "avroFileFormat": {} } }, "state": "RUNNING", "backfillAll": {} }
Ahora que ha creado y gestionado una emisión, ha confirmado que no hay errores asociados a ella y que su estado es RUNNING
, puede verificar que puede transferir datos de la base de datos de origen a una carpeta del segmento de destino de Cloud Storage.
Verificar la emisión
En este procedimiento, confirmas que Datastream:
- Transfiere los datos de todas las tablas asociadas al esquema
ROOT
de la base de datos Oracle de origen a la carpeta/root/tutorial
del segmento de Cloud Storage de destino. - Traduce los datos al formato de archivo Avro.
Ve a la página Navegador de almacenamiento de Cloud Storage.
Haz clic en el enlace que contiene tu segmento.
Si la pestaña OBJECTS no está activa, haz clic en ella.
Haz clic en la carpeta raíz y, a continuación, en la carpeta tutorial.
Compruebe que ve carpetas que representan tablas del esquema
ROOT
de su base de datos de Oracle de origen.Haz clic en una de las carpetas de la tabla y desglosa la información hasta que veas los datos asociados a la tabla.
Haga clic en el archivo que representa los datos y, a continuación, en DESCARGAR.
Abre el archivo con una herramienta de Avro (por ejemplo, Avro Viewer) para comprobar que el contenido se puede leer. De esta forma, se confirma que Datastream también ha traducido los datos al formato de archivo Avro.