Modello Datastream to Spanner

Il modello Datastream to Spanner è una pipeline di streaming che legge eventi Datastream da un bucket Cloud Storage e li scrive in un database Spanner. È destinato alla migrazione dei dati dalle origini Datastream a Spanner. Specifica il parametro gcsPubSubSubscription per leggere i dati dalle notifiche Pub/Sub OPPURE fornisci il parametro inputFilePattern per leggere direttamente i dati dai file in Cloud Storage.

Tutte le tabelle richieste per la migrazione devono esistere nel database Spanner di destinazione prima dell'esecuzione del modello. Pertanto, la migrazione dello schema da un database di origine a Spanner di destinazione deve essere completata prima della migrazione dei dati. I dati possono esistere nelle tabelle prima della migrazione. Questo modello non propaga le modifiche dello schema Datastream al database Spanner.

La coerenza dei dati è garantita solo alla fine della migrazione, quando tutti i dati sono stati scritti in Spanner. Per archiviare le informazioni sull'ordinamento di ogni record scritto in Spanner, questo modello crea una tabella aggiuntiva (denominata tabella shadow) per ogni tabella nel database Spanner. Questa tabella viene utilizzata per garantire la coerenza alla fine della migrazione. Le tabelle shadow non vengono eliminate dopo la migrazione e possono essere utilizzate per scopi di convalida alla fine della migrazione.

Eventuali errori che si verificano durante l'operazione, come mancata corrispondenza dello schema, file JSON in formato errato o errori derivanti dall'esecuzione delle trasformazioni, vengono registrati in una coda di errori. La coda di errori è una cartella Cloud Storage che memorizza tutti gli eventi Datastream che hanno riscontrato errori, insieme al motivo dell'errore in formato di testo. Gli errori possono essere temporanei o permanenti e vengono archiviati nelle cartelle Cloud Storage appropriate nella coda di errori. Viene eseguito automaticamente un nuovo tentativo per gli errori temporanei, ma non per quelli permanenti. In caso di errori permanenti, hai la possibilità di apportare correzioni agli eventi di modifica e spostarli nel bucket di cui è possibile eseguire un nuovo tentativo durante l'esecuzione del modello.

Requisiti della pipeline

  • Uno stream Datastream nello stato In esecuzione o Non avviato.
  • Un bucket Cloud Storage in cui vengono replicati gli eventi Datastream.
  • Un database Spanner con tabelle esistenti. Queste tabelle possono essere vuote o contenere dati.

Parametri del modello

Parametri obbligatori

  • instanceId: l'istanza Spanner in cui vengono replicate le modifiche.
  • databaseId: il database Spanner in cui vengono replicate le modifiche.

Parametri facoltativi

  • inputFilePattern: il percorso del file Cloud Storage che contiene i file Datastream da replicare. In genere, si tratta del percorso root di uno stream. Il supporto per questa funzionalità è stato disattivato. Utilizza questa funzionalità solo per riprovare a inserire le voci che finiscono in una coda di messaggi non recapitabili grave.
  • inputFileFormat: il formato del file di output prodotto da Datastream. Ad esempio avro,json. Il valore predefinito è avro.
  • sessionFilePath: il percorso del file di sessione in Cloud Storage che contiene le informazioni di mapping di HarbourBridge.
  • projectId: l'ID progetto Spanner.
  • spannerHost: l'endpoint Cloud Spanner da chiamare nel modello. Ad esempio, https://batch-spanner.googleapis.com. Il valore predefinito è: https://batch-spanner.googleapis.com.
  • gcsPubSubSubscription: l'abbonamento Pub/Sub utilizzato in un criterio di notifica di Cloud Storage. Per il nome, utilizza il formato projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>.
  • streamName: il nome o il modello dello stream per cui eseguire il polling per le informazioni sullo schema e il tipo di origine.
  • shadowTablePrefix: il prefisso utilizzato per denominare le tabelle shadow. Valore predefinito: shadow_.
  • shouldCreateShadowTables: questo flag indica se le tabelle shadow devono essere create nel database Cloud Spanner. Il valore predefinito è: true.
  • rfcStartDateTime: la data e l'ora di inizio utilizzate per il recupero da Cloud Storage (https://tools.ietf.org/html/rfc3339). Il valore predefinito è: 1970-01-01T00:00:00.00Z.
  • fileReadConcurrency: il numero di file DataStream da leggere contemporaneamente. Il valore predefinito è: 30.
  • deadLetterQueueDirectory: il percorso del file utilizzato per archiviare l'output della coda di errori. Il percorso del file predefinito è una directory nella località temporanea del job Dataflow.
  • dlqRetryMinutes: il numero di minuti tra i nuovi tentativi della coda di messaggi non recapitabili. Il valore predefinito è 10.
  • dlqMaxRetryCount: il numero massimo di volte in cui è possibile riprovare a eseguire gli errori temporanei tramite la coda di messaggi non recapitabili. Il valore predefinito è 500.
  • dataStreamRootUrl: l'URL root dell'API Datastream. Il valore predefinito è: https://datastream.googleapis.com/.
  • datastreamSourceType: il tipo di database di origine a cui si connette Datastream. Esempio: mysql/oracle. Deve essere impostato durante il test senza un'esecuzione effettiva di Datastream.
  • roundJsonDecimals: se questo flag è impostato, arrotonda i valori decimali nelle colonne JSON a un numero che può essere archiviato senza perdita di precisione. Il valore predefinito è: false.
  • runMode: il tipo di modalità di esecuzione. Il valore predefinito è regolare. Utilizza la modalità retryDLQ per elaborare contemporaneamente solo i file di errori gravi con la pipeline di migrazione live. Utilizza la modalità retryAllDLQ solo quando la pipeline normale è arrestata. Questa modalità elabora sia le directory di nuovi tentativi sia quelle gravi. NON eseguire retryAllDLQ contemporaneamente a qualsiasi pipeline attiva, in quanto ciò causerebbe conflitti.
  • transformationContextFilePath: il percorso del file di contesto di trasformazione in spazio di archiviazione sul cloud utilizzato per popolare i dati utilizzati nelle trasformazioni eseguite durante le migrazioni. Ad esempio, l'ID shard al nome del database per identificare il database da cui è stata eseguita la migrazione di una riga.
  • directoryWatchDurationInMinutes: la durata per cui la pipeline deve continuare a eseguire il polling di una directory in GCS. I file di output di Datastream sono disposti in una struttura di directory che mostra il timestamp dell'evento raggruppato per minuti. Questo parametro deve essere approssimativamente uguale al ritardo massimo che potrebbe verificarsi tra l'evento nel database di origine e lo stesso evento scritto in GCS da Datastream. Il percentile 99,9 è pari a 10 minuti. Il valore predefinito è: 10.
  • spannerPriority: la priorità della richiesta per le chiamate Cloud Spanner. Il valore deve essere uno dei seguenti: [HIGH,MEDIUM,LOW]. Il valore predefinito è HIGH.
  • dlqGcsPubSubSubscription: l'abbonamento Pub/Sub utilizzato in un criterio di notifica di Cloud Storage per la directory di nuovi tentativi della coda di messaggi non recapitabili quando viene eseguita in modalità normale. Per il nome, utilizza il formato projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>. Se impostati, deadLetterQueueDirectory e dlqRetryMinutes vengono ignorati.
  • transformationJarPath: il percorso del file JAR personalizzato in Cloud Storage per il file che contiene la logica di trasformazione personalizzata per l'elaborazione dei record nella migrazione in avanti. Il valore predefinito è vuoto.
  • transformationClassName: il nome della classe completo con la logica di trasformazione personalizzata. È un campo obbligatorio se viene specificato transformationJarPath. Il valore predefinito è vuoto.
  • transformationCustomParameters: una stringa contenente eventuali parametri personalizzati da passare alla classe di trasformazione personalizzata. Il valore predefinito è vuoto.
  • filteredEventsDirectory: il percorso del file in cui archiviare gli eventi filtrati tramite la trasformazione personalizzata. Il valore predefinito è una directory nella località temporanea del job Dataflow. Il valore predefinito è sufficiente nella maggior parte delle condizioni.
  • shardingContextFilePath: il percorso del file di contesto di sharding in spazio di archiviazione sul cloud viene utilizzato per popolare l'ID shard nel database Spanner per ogni shard di origine.È previsto un file JSON con il formato: {"StreamToDbAndShardMap": Map<stream_name, Map<db_name, shard_id>>}.
  • tableOverrides: le sostituzioni del nome della tabella dall'origine a Spanner. Vengono scritte nel seguente formato: [{SourceTableName1, SpannerTableName1}, {SourceTableName2, SpannerTableName2}]. Questo esempio mostra il mapping della tabella Singers alla tabella Vocalists e della tabella Albums alla tabella Records. Ad esempio, [{Singers, Vocalists}, {Albums, Records}]. Il valore predefinito è vuoto.
  • columnOverrides: le sostituzioni del nome della colonna dall'origine a Spanner. Vengono scritte nel seguente formato: [{SourceTableName1.SourceColumnName1, SourceTableName1.SpannerColumnName1}, {SourceTableName2.SourceColumnName1, SourceTableName2.SpannerColumnName1}]. Tieni presente che SourceTableName deve rimanere lo stesso nella coppia di origine e Spanner. Per sostituire i nomi delle tabelle, utilizza tableOverrides.L'esempio mostra il mapping di SingerName a TalentName e di AlbumName a RecordName rispettivamente nelle tabelle Singers e Albums. Ad esempio, [{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}]. Il valore predefinito è vuoto.
  • schemaOverridesFilePath: un file che specifica le sostituzioni del nome della tabella e della colonna dall'origine a Spanner. Il valore predefinito è vuoto.
  • shadowTableSpannerDatabaseId: un database separato facoltativo per le tabelle shadow. Se non specificato, le tabelle shadow verranno create nel database principale. Se specificato, assicurati che sia specificato anche shadowTableSpannerInstanceId. Il valore predefinito è vuoto.
  • shadowTableSpannerInstanceId: un'istanza separata facoltativa per le tabelle shadow. Se non specificato, le tabelle shadow verranno create nell'istanza principale. Se specificato, assicurati che sia specificato anche shadowTableSpannerDatabaseId. Il valore predefinito è vuoto.
  • failureInjectionParameter: il parametro di inserimento degli errori. Utilizzato solo per i test. Il valore predefinito è vuoto.

Esegui il modello

Console

  1. Vai alla pagina Dataflow Crea job da modello.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome job univoco.
  4. (Facoltativo) Per Endpoint regionale, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Nel menu a discesa Modello Dataflow, seleziona il modello Cloud Datastream to Spanner.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

gcloud

Esegui il modello nella shell o nel terminale:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto in cui vuoi eseguire il job Dataflow Google Cloud
  • JOB_NAME: un nome job univoco a tua scelta
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • GCS_FILE_PATH: il percorso Cloud Storage utilizzato per archiviare gli eventi Datastream. Ad esempio: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: l'istanza Spanner.
  • CLOUDSPANNER_DATABASE: il database Spanner.
  • DLQ: il percorso Cloud Storage per la directory della coda di errori.

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta HTTP POST. Per saperne di più sull' API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto in cui vuoi eseguire il job Dataflow Google Cloud
  • JOB_NAME: un nome job univoco a tua scelta
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • GCS_FILE_PATH: il percorso Cloud Storage utilizzato per archiviare gli eventi Datastream. Ad esempio: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: l'istanza Spanner.
  • CLOUDSPANNER_DATABASE: il database Spanner.
  • DLQ: il percorso Cloud Storage per la directory della coda di errori.

Passaggi successivi