Le modèle Pub/Sub Avro vers BigQuery est un pipeline de streaming qui ingère les données Avro d'un abonnement Pub/Sub dans une table BigQuery. Toute erreur survenant lors de l'écriture dans la table BigQuery est traitée dans un sujet Pub/Sub non traité.
Avant d'exécuter un pipeline Dataflow pour ce scénario, déterminez si un abonnement Pub/Sub BigQuery avec une UDF répond à vos besoins.
Conditions requises pour ce pipeline
- L'abonnement Pub/Sub d'entrée doit exister.
- Le fichier de schéma des enregistrements Avro doit exister dans Cloud Storage.
- Le sujet Pub/Sub non traité doit exister.
- L'ensemble de données BigQuery de sortie doit exister.
Paramètres de modèle
Paramètres obligatoires
- schemaPath : emplacement Cloud Storage du fichier de schéma Avro. Exemple :
gs://path/to/my/schema.avsc - inputSubscription : abonnement d'entrée Pub/Sub à lire. Exemple :
projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_ID> - outputTableSpec : emplacement de la table de sortie BigQuery dans laquelle écrire la sortie. Par exemple,
<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>. Selon la propriétécreateDispositionspécifiée, la table de sortie peut être créée automatiquement à l'aide du schéma Avro fourni par l'utilisateur. - outputTopic : sujet Pub/Sub à utiliser pour les enregistrements non traités. Exemple :
projects/<PROJECT_ID>/topics/<TOPIC_NAME>
Paramètres facultatifs
- useStorageWriteApiAtLeastOnce : spécifie la sémantique d'écriture, lorsque vous utilisez l'API Storage Write. Pour utiliser la sémantique de type "au moins une fois" (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), définissez ce paramètre sur "true". Pour utiliser la sémantique de type "exactement une fois", définissez le paramètre sur
false. Ce paramètre ne s'applique que lorsque la valeur deuseStorageWriteApiest définie surtrue. La valeur par défaut estfalse. - writeDisposition : valeur WriteDisposition de BigQuery (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). Par exemple,
WRITE_APPEND,WRITE_EMPTYouWRITE_TRUNCATE. La valeur par défaut estWRITE_APPEND. - createDisposition : valeur CreateDisposition de BigQuery (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). Par exemple,
CREATE_IF_NEEDEDetCREATE_NEVER. La valeur par défaut estCREATE_IF_NEEDED. - useStorageWriteApi : si la valeur est définie sur "true", le pipeline utilise l'API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). La valeur par défaut est
false. Pour en savoir plus, consultez la page "Utiliser l'API Storage Write" (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams : spécifie le nombre de flux d'écriture, lorsque vous utilisez l'API Storage Write. Si
useStorageWriteApiest défini surtrueetuseStorageWriteApiAtLeastOncesurfalse, vous devez définir ce paramètre. La valeur par défaut est 0. - storageWriteApiTriggeringFrequencySec : spécifie la fréquence de déclenchement, en secondes, lorsque vous utilisez l'API Storage Write. Si
useStorageWriteApiest défini surtrueetuseStorageWriteApiAtLeastOncesurfalse, vous devez définir ce paramètre.
Exécuter le modèle
Console
- Accédez à la page Dataflow Créer un job à partir d'un modèle. Accéder à la page Créer un job à partir d'un modèle
- Dans le champ Nom du job, saisissez un nom de job unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est
us-central1.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub Avro to BigQuery template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Avro_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=DEADLETTER_TOPIC
Remplacez les éléments suivants :
JOB_NAME: nom de job unique de votre choixREGION_NAME: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1VERSION: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latestpour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH: chemin d'accès Cloud Storage au fichier de schéma Avro (par exemple,gs://MyBucket/file.avsc)SUBSCRIPTION_NAME: nom de l'abonnement d'entrée Pub/SubBIGQUERY_TABLE: nom de la table de sortie BigQueryDEADLETTER_TOPIC: sujet Pub/Sub à utiliser pour la file d'attente non traitée
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_Avro_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "DEADLETTER_TOPIC" } } }
Remplacez les éléments suivants :
JOB_NAME: nom de job unique de votre choixLOCATION: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1VERSION: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latestpour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH: chemin d'accès Cloud Storage au fichier de schéma Avro (par exemple,gs://MyBucket/file.avsc)SUBSCRIPTION_NAME: nom de l'abonnement d'entrée Pub/SubBIGQUERY_TABLE: nom de la table de sortie BigQueryDEADLETTER_TOPIC: sujet Pub/Sub à utiliser pour la file d'attente non traitée
Étapes suivantes
- Apprenez-en plus sur les modèles Dataflow.
- Consultez la liste des modèles fournis par Google.