La plantilla de transmisión de datos a SQL es una canalización de transmisión que lee datos de Datastream y los replica en cualquier base de datos de MySQL o PostgreSQL. La plantilla lee los datos de Cloud Storage mediante notificaciones de Pub/Sub y los replica en tablas de réplica de PostgreSQL. Especifica el parámetro gcsPubSubSubscription para leer datos de las notificaciones de Pub/Sub O proporciona el parámetro inputFilePattern para leer datos directamente de los archivos en Cloud Storage.
La plantilla no es compatible con el lenguaje de definición de datos (DDL) y espera que todas las tablas ya existan en la base de datos. La replicación usa transformaciones con estado de Dataflow para filtrar los datos inactivos y garantizar la coherencia dentro de los datos desordenados. Por ejemplo, si ya se pasó una versión más reciente de una fila, se ignorará una versión tardía de esa fila. El lenguaje de manipulación de datos (DML) que se ejecuta es el mejor intento de replicar perfectamente los datos de origen o destino. Las declaraciones DML ejecutadas siguen las siguientes reglas:
- Si existe una clave primaria, las operaciones de inserción y actualización usan una sintaxis de upsert (es decir,
INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE). - Si las claves primarias existen, las eliminaciones se replican como un DML borrado.
- Si no existe una clave primaria, se insertan las operaciones de inserción y actualización en la tabla.
- Si no existen claves primarias, se ignoran las eliminaciones.
Si usas las utilidades de Oracle para Postgres, agrega ROWID en SQL como la clave primaria cuando no exista ninguna.
Requisitos de la canalización
- Una transmisión de Datastream que está lista para replicar los datos o ya los está replicando.
- Las notificaciones de Pub/Sub de Cloud Storage están habilitadas para los datos de Datastream.
- Se propagó una base de datos de PostgreSQL con el esquema requerido.
- Se configura el acceso a la red entre los trabajadores de Dataflow y PostgreSQL.
Parámetros de la plantilla
Parámetros obligatorios
- inputFilePattern: Es la ubicación del archivo para que los archivos de Datastream se repliquen en Cloud Storage. Esta ubicación suele ser la ruta de acceso raíz de la transmisión.
- databaseHost: Es el host de SQL al que se realizará la conexión.
- databaseUser: Es el usuario de SQL con todos los permisos necesarios para escribir en todas las tablas en la replicación.
- databasePassword: La contraseña para el usuario de SQL.
Parámetros opcionales
- gcsPubSubSubscription: Es la suscripción de Pub/Sub con las notificaciones de archivos de Datastream. Por ejemplo,
projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID> - inputFileFormat: El formato del archivo de salida que produce Datastream. Por ejemplo,
avroojson. La configuración predeterminada esavro. - streamName: El nombre o la plantilla del flujo que se consultará para obtener la información del esquema. El valor predeterminado es
{_metadata_stream}. - rfcStartDateTime: La fecha y hora de inicio que se usa para recuperar desde Cloud Storage (https://tools.ietf.org/html/rfc3339). El valor predeterminado es: 1970-01-01T00:00:00.00Z.
- dataStreamRootUrl: URL raíz de la API de Datastream. La configuración predeterminada es https://datastream.googleapis.com/.
- databaseType: El tipo de base de datos en la que se escribirá (por ejemplo, Postgres). La configuración predeterminada es: postgres.
- databasePort: Es el puerto de la base de datos de SQL al que se realizará la conexión. El valor predeterminado es
5432. - databaseName: El nombre de la base de datos de SQL a la que se realizará la conexión. El valor predeterminado es
postgres. - defaultCasing: Es un botón de activación para el comportamiento de mayúsculas y minúsculas de la tabla. Por ejemplo,(es decir, LOWERCASE = mytable -> mytable, UPPERCASE = mytable -> MYTABLECAMEL = my_table -> myTable, SNAKE = myTable -> my_table. La configuración predeterminada es LOWERCASE.
- columnCasing: Es un botón de activación para el uso de mayúsculas y minúsculas en el nombre de la columna objetivo. LOWERCASE (predeterminado): my_column -> my_column. MAYÚSCULAS: my_column -> MY_COLUMN. CAMEL: my_column -> myColumn. SNAKE: myColumn -> my_column.
- schemaMap: Un mapa de claves o valores usados para dictar los cambios de nombre del esquema (es decir, old_name:new_name,CaseError:case_error). La configuración predeterminada es vacía.
- customConnectionString: Es la cadena de conexión opcional que se usará en lugar de la cadena de la base de datos predeterminada.
- numThreads: Determina el paralelismo clave del paso de formato a DML. Específicamente, el valor se pasa a Reshuffle.withNumBuckets. El valor predeterminado es 100.
- databaseLoginTimeout: Es el tiempo de espera en segundos para los intentos de acceso a la base de datos. Esto ayuda a evitar que se cuelgue la conexión cuando varios trabajadores intentan conectarse de forma simultánea.
- datastreamSourceType: Anula la detección del tipo de fuente para los datos de CDC de Datastream. Cuando se especifica, este valor se usará en lugar de derivar el tipo de fuente del campo read_method. Los valores válidos incluyen "mysql", "postgresql", "oracle", etcétera. Este parámetro es útil cuando el campo read_method contiene "cdc" y el tipo de fuente real no se puede determinar automáticamente.
- orderByIncludesIsDeleted: La ordenación por configuraciones de datos debe incluir la priorización de los datos que no se borraron. La configuración predeterminada es "false".
Ejecuta la plantilla
Console
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es
us-central1.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Cloud Datastream to SQL template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --enable-streaming-engine \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_SQL \ --parameters \ inputFilePattern=GCS_FILE_PATH,\ gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\ databaseHost=DATABASE_HOST,\ databaseUser=DATABASE_USER,\ databasePassword=DATABASE_PASSWORD
Reemplaza lo siguiente:
PROJECT_ID: El ID del proyecto Google Cloud en el que deseas ejecutar el trabajo de DataflowJOB_NAME: Es el nombre del trabajo que elijasREGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1VERSION: the version of the template that you want to useYou can use the following values:
latestto use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
GCS_FILE_PATH: es la ruta de acceso de Cloud Storage a los datos de Datastream. Por ejemplo:gs://bucket/path/to/data/GCS_SUBSCRIPTION_NAME: es la suscripción de Pub/Sub desde la que se leen los archivos modificados. Por ejemplo:projects/my-project-id/subscriptions/my-subscription-id.DATABASE_HOST: Es la IP del host de SQL.DATABASE_USER: Es tu usuario de SQL.DATABASE_PASSWORD: Es tu contraseña de SQL.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "GCS_FILE_PATH", "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME", "databaseHost": "DATABASE_HOST", "databaseUser": "DATABASE_USER", "databasePassword": "DATABASE_PASSWORD" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_SQL", } }
Reemplaza lo siguiente:
PROJECT_ID: El ID del proyecto Google Cloud en el que deseas ejecutar el trabajo de DataflowJOB_NAME: Es el nombre del trabajo que elijasLOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1VERSION: the version of the template that you want to useYou can use the following values:
latestto use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
GCS_FILE_PATH: es la ruta de acceso de Cloud Storage a los datos de Datastream. Por ejemplo:gs://bucket/path/to/data/GCS_SUBSCRIPTION_NAME: es la suscripción de Pub/Sub desde la que se leen los archivos modificados. Por ejemplo:projects/my-project-id/subscriptions/my-subscription-id.DATABASE_HOST: Es la IP del host de SQL.DATABASE_USER: Es tu usuario de SQL.DATABASE_PASSWORD: Es tu contraseña de SQL.
¿Qué sigue?
- Obtén información sobre las plantillas de Dataflow.
- Consulta la lista de plantillas que proporciona Google.