Streaming pipeline. Reads data from Spanner Change Streams and writes them to a source.
Template parameters
| Parameter | Description |
|---|---|
changeStreamName |
The name of the Spanner change stream that the pipeline reads from. |
instanceId |
The name of the Spanner instance where the change stream is present. |
databaseId |
The name of the Spanner database that the change stream monitors. |
spannerProjectId |
The name of the Spanner project. |
metadataInstance |
The instance to store the metadata used by the connector to control the consumption of the change stream API data. |
metadataDatabase |
The database to store the metadata used by the connector to control the consumption of the change stream API data. |
sourceShardsFilePath |
Path to a Cloud Storage file containing connection profile information for source shards. |
startTimestamp |
Optional: The starting timestamp for reading changes. Defaults to empty. |
endTimestamp |
Optional: The end timestamp for reading changes. If no timestamp provided, reads indefinitely. Defaults to empty. |
shadowTablePrefix |
Optional: The prefix used to name shadow tables. Default: shadow_. |
sessionFilePath |
Optional: Session path in Cloud Storage that contains mapping information from HarbourBridge. |
filtrationMode |
Optional: Mode of filtration. Specifies how to drop certain records
based on a criteria. Supported modes are: none (filter
nothing), forward_migration (filter records written using the
forward migration pipeline). Defaults to forward_migration. |
shardingCustomJarPath |
Optional: Custom JAR file location in Cloud Storage that contains the
customization logic for fetching the shard id. If you set this parameter,
set the shardingCustomJarPath parameter. Defaults to empty. |
shardingCustomClassName |
Optional: Fully qualified class name having the custom shard id
implementation. If shardingCustomJarPath is specified, this
parameter is required. Defaults to empty. |
shardingCustomParameters |
Optional: String containing any custom parameters to be passed to the custom sharding class. Defaults to empty. |
sourceDbTimezoneOffset |
Optional: The timezone offset from UTC for the source database. Example value: +10:00. Defaults to: +00:00. |
dlqGcsPubSubSubscription |
Optional: The Pub/Sub subscription being used in a Cloud Storage notification policy for DLQ retry directory when running in regular mode. The name should be in the format of projects/<project-id>/subscriptions/<subscription-name>. When set, the deadLetterQueueDirectory and dlqRetryMinutes are ignored. |
skipDirectoryName |
Optional: Records skipped from reverse replication are written to this directory. Default directory name is skip. |
maxShardConnections |
Optional: The maximum number of connections that a given shard can accept. Defaults to: 10000. |
deadLetterQueueDirectory |
Optional: The path used when storing the error queue output. The default path is a directory under the Dataflow job's temp location. |
dlqMaxRetryCount |
Optional: The maximum number of times that temporary errors can be retried through the dead-letter queue. Defaults to 500. |
runMode |
Optional: The run mode type. Supported values: regular,
retryDLQ. Default: regular. Specify
retryDLQ is retry severe dead-letter queue records only. |
dlqRetryMinutes |
Optional: The number of minutes between dead-letter queue retries. Defaults to 10. |
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
region is
us-central1.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Spanner Change Streams to Source Database template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud CLI
In your shell or terminal, run the template:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/ \ --project=PROJECT_ID \ --region=REGION_NAME \ --parameters \ changeStreamName=CHANGE_STREAM_NAME,\ instanceId=INSTANCE_ID,\ databaseId=DATABASE_ID,\ spannerProjectId=SPANNER_PROJECT_ID,\ metadataInstance=METADATA_INSTANCE,\ metadataDatabase=METADATA_DATABASE,\ sourceShardsFilePath=SOURCE_SHARDS_FILE_PATH,\
Replace the following:
JOB_NAME: a unique job name of your choiceVERSION: the version of the template that you want to useYou can use the following values:
latestto use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
REGION_NAME: the region where you want to deploy your Dataflow job—for example,us-central1CHANGE_STREAM_NAME: the Name of the change stream to read fromINSTANCE_ID: the Cloud Spanner Instance Id.DATABASE_ID: the Cloud Spanner Database Id.SPANNER_PROJECT_ID: the Cloud Spanner Project Id.METADATA_INSTANCE: the Cloud Spanner Instance to store metadata when reading from changestreamsMETADATA_DATABASE: the Cloud Spanner Database to store metadata when reading from changestreamsSOURCE_SHARDS_FILE_PATH: the Path to GCS file containing the Source shard details
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launchParameter": { "jobName": "JOB_NAME", "parameters": { "changeStreamName": "CHANGE_STREAM_NAME", "instanceId": "INSTANCE_ID", "databaseId": "DATABASE_ID", "spannerProjectId": "SPANNER_PROJECT_ID", "metadataInstance": "METADATA_INSTANCE", "metadataDatabase": "METADATA_DATABASE", "sourceShardsFilePath": "SOURCE_SHARDS_FILE_PATH", }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/", "environment": { "maxWorkers": "10" } } }
Replace the following:
PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME: a unique job name of your choiceVERSION: the version of the template that you want to useYou can use the following values:
latestto use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
LOCATION: the region where you want to deploy your Dataflow job—for example,us-central1CHANGE_STREAM_NAME: the Name of the change stream to read fromINSTANCE_ID: the Cloud Spanner Instance Id.DATABASE_ID: the Cloud Spanner Database Id.SPANNER_PROJECT_ID: the Cloud Spanner Project Id.METADATA_INSTANCE: the Cloud Spanner Instance to store metadata when reading from changestreamsMETADATA_DATABASE: the Cloud Spanner Database to store metadata when reading from changestreamsSOURCE_SHARDS_FILE_PATH: the Path to GCS file containing the Source shard details
Template source code
Java
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.CASSANDRA_SOURCE_TYPE;
import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.MYSQL_SOURCE_TYPE;
import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.POSTGRES_SOURCE_TYPE;
import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.RUN_MODE_REGULAR;
import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.RUN_MODE_RETRY_ALL_DLQ;
import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.RUN_MODE_RETRY_DLQ;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.v2.cdc.dlq.DeadLetterQueueManager;
import com.google.cloud.teleport.v2.cdc.dlq.PubSubNotifiedDlqIO;
import com.google.cloud.teleport.v2.cdc.dlq.StringDeadLetterQueueSanitizer;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
import com.google.cloud.teleport.v2.spanner.migrations.shard.CassandraShard;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.cloud.teleport.v2.spanner.migrations.source.config.CassandraConnectionConfig;
import com.google.cloud.teleport.v2.spanner.migrations.source.config.JdbcShardConfig;
import com.google.cloud.teleport.v2.spanner.migrations.source.config.SourceConfigParser;
import com.google.cloud.teleport.v2.spanner.migrations.source.config.SourceConnectionConfig;
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
import com.google.cloud.teleport.v2.spanner.migrations.utils.CassandraConfigFileReader;
import com.google.cloud.teleport.v2.spanner.migrations.utils.CassandraDriverConfigLoader;
import com.google.cloud.teleport.v2.spanner.migrations.utils.DataflowWorkerMachineTypeUtils;
import com.google.cloud.teleport.v2.spanner.migrations.utils.ISecretManagerAccessor;
import com.google.cloud.teleport.v2.spanner.migrations.utils.SecretManagerAccessorImpl;
import com.google.cloud.teleport.v2.spanner.sourceddl.CassandraInformationSchemaScanner;
import com.google.cloud.teleport.v2.spanner.sourceddl.MySqlInformationSchemaScanner;
import com.google.cloud.teleport.v2.spanner.sourceddl.PostgreSQLInformationSchemaScanner;
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceSchema;
import com.google.cloud.teleport.v2.templates.SpannerToSourceDb.Options;
import com.google.cloud.teleport.v2.templates.changestream.TrimmedShardedDataChangeRecord;
import com.google.cloud.teleport.v2.templates.constants.Constants;
import com.google.cloud.teleport.v2.templates.transforms.AssignShardIdFn;
import com.google.cloud.teleport.v2.templates.transforms.ConvertChangeStreamErrorRecordToFailsafeElementFn;
import com.google.cloud.teleport.v2.templates.transforms.ConvertDlqRecordToTrimmedShardedDataChangeRecordFn;
import com.google.cloud.teleport.v2.templates.transforms.FilterRecordsFn;
import com.google.cloud.teleport.v2.templates.transforms.PreprocessRecordsFn;
import com.google.cloud.teleport.v2.templates.transforms.SourceWriterTransform;
import com.google.cloud.teleport.v2.templates.transforms.SpannerInformationSchemaProcessorTransform;
import com.google.cloud.teleport.v2.templates.transforms.UpdateDlqMetricsFn;
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.SpannerServiceFactoryImpl;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** This pipeline reads Spanner Change streams data and writes them to a source DB. */
@Template(
name = "Spanner_to_SourceDb",
category = TemplateCategory.STREAMING,
displayName = "Spanner Change Streams to Source Database",
description =
"Streaming pipeline. Reads data from Spanner Change Streams and"
+ " writes them to a source.",
optionsClass = Options.class,
flexContainerName = "spanner-to-sourcedb",
contactInformation = "https://cloud.google.com/support",
hidden = false,
streaming = true)
public class SpannerToSourceDb {
private static final Logger LOG = LoggerFactory.getLogger(SpannerToSourceDb.class);
// JDBC Drivers
private static final String MYSQL_DRIVER = "com.mysql.cj.jdbc.Driver";
private static final String POSTGRESQL_DRIVER = "org.postgresql.Driver";
// JDBC URL Prefixes
private static final String MYSQL_JDBC_PREFIX = "jdbc:mysql://";
private static final String POSTGRESQL_JDBC_PREFIX = "jdbc:postgresql://";
/**
* Options supported by the pipeline.
*
* <p>Inherits standard configuration options.
*/
public interface Options extends PipelineOptions, StreamingOptions {
@TemplateParameter.Text(
order = 1,
optional = false,
description = "Name of the change stream to read from",
helpText =
"This is the name of the Spanner change stream that the pipeline will read from.")
String getChangeStreamName();
void setChangeStreamName(String value);
@TemplateParameter.Text(
order = 2,
optional = false,
description = "Cloud Spanner Instance Id.",
helpText =
"This is the name of the Cloud Spanner instance where the changestream is present.")
String getInstanceId();
void setInstanceId(String value);
@TemplateParameter.Text(
order = 3,
optional = false,
description = "Cloud Spanner Database Id.",
helpText =
"This is the name of the Cloud Spanner database that the changestream is monitoring")
String getDatabaseId();
void setDatabaseId(String value);
@TemplateParameter.ProjectId(
order = 4,
optional = false,
description = "Cloud Spanner Project Id.",
helpText = "This is the name of the Cloud Spanner project.")
String getSpannerProjectId();
void setSpannerProjectId(String projectId);
@TemplateParameter.Text(
order = 5,
optional = false,
description = "Cloud Spanner Instance to store metadata when reading from changestreams",
helpText =
"This is the instance to store the metadata used by the connector to control the"
+ " consumption of the change stream API data.")
String getMetadataInstance();
void setMetadataInstance(String value);
@TemplateParameter.Text(
order = 6,
optional = false,
description = "Cloud Spanner Database to store metadata when reading from changestreams",
helpText =
"This is the database to store the metadata used by the connector to control the"
+ " consumption of the change stream API data.")
String getMetadataDatabase();
void setMetadataDatabase(String value);
@TemplateParameter.Text(
order = 36,
optional = true,
description = "Cloud Spanner Database to store change stream connector metadata",
helpText =
"This is the database to store the metadata used by the change stream connector. "
+ "If not provided, it defaults to the metadata database.")
String getChangeStreamMetadataDatabase();
void setChangeStreamMetadataDatabase(String value);
@TemplateParameter.Text(
order = 7,
optional = true,
description = "Cloud Spanner metadata table name",
helpText =
"The Spanner change streams connector metadata table name to use. If not provided,"
+ " Spanner automatically creates the streams connector metadata table during the pipeline flow"
+ " change. You must provide this parameter when updating an existing pipeline to ensure"
+ " that the metadata table from the original job is carried over.")
String getSpannerMetadataTableName();
void setSpannerMetadataTableName(String value);
@TemplateParameter.Text(
order = 8,
optional = true,
description = "Changes are read from the given timestamp",
helpText = "Read changes from the given timestamp.")
@Default.String("")
String getStartTimestamp();
void setStartTimestamp(String value);
@TemplateParameter.Text(
order = 9,
optional = true,
description = "Changes are read until the given timestamp",
helpText =
"Read changes until the given timestamp. If no timestamp provided, reads indefinitely.")
@Default.String("")
String getEndTimestamp();
void setEndTimestamp(String value);
@TemplateParameter.Text(
order = 10,
optional = true,
description = "Cloud Spanner shadow table prefix.",
helpText = "The prefix used to name shadow tables. Default: `shadow_`.")
@Default.String("rev_shadow_")
String getShadowTablePrefix();
void setShadowTablePrefix(String value);
@TemplateParameter.GcsReadFile(
order = 11,
optional = false,
description = "Path to GCS file containing the the Source shard details",
helpText = "Path to GCS file containing connection profile info for source shards.")
String getSourceShardsFilePath();
void setSourceShardsFilePath(String value);
@TemplateParameter.GcsReadFile(
order = 12,
optional = true,
description = "Session File Path in Cloud Storage",
helpText =
"Session file path in Cloud Storage that contains mapping information from"
+ " HarbourBridge")
String getSessionFilePath();
void setSessionFilePath(String value);
@TemplateParameter.Enum(
order = 13,
optional = true,
enumOptions = {@TemplateEnumOption("none"), @TemplateEnumOption("forward_migration")},
description = "Filtration mode",
helpText =
"Mode of Filtration, decides how to drop certain records based on a criteria. Currently"
+ " supported modes are: none (filter nothing), forward_migration (filter records"
+ " written via the forward migration pipeline). Defaults to forward_migration.")
@Default.String("forward_migration")
String getFiltrationMode();
void setFiltrationMode(String value);
@TemplateParameter.GcsReadFile(
order = 14,
optional = true,
description = "Custom jar location in Cloud Storage",
helpText =
"Custom jar location in Cloud Storage that contains the customization logic"
+ " for fetching shard id.")
@Default.String("")
String getShardingCustomJarPath();
void setShardingCustomJarPath(String value);
@TemplateParameter.Text(
order = 15,
optional = true,
description = "Custom class name",
helpText =
"Fully qualified class name having the custom shard id implementation. It is a"
+ " mandatory field in case shardingCustomJarPath is specified")
@Default.String("")
String getShardingCustomClassName();
void setShardingCustomClassName(String value);
@TemplateParameter.Text(
order = 16,
optional = true,
description = "Custom sharding logic parameters",
helpText =
"String containing any custom parameters to be passed to the custom sharding class.")
@Default.String("")
String getShardingCustomParameters();
void setShardingCustomParameters(String value);
@TemplateParameter.Text(
order = 17,
optional = true,
description = "SourceDB timezone offset",
helpText =
"This is the timezone offset from UTC for the source database. Example value: +10:00")
@Default.String("+00:00")
String getSourceDbTimezoneOffset();
void setSourceDbTimezoneOffset(String value);
@TemplateParameter.PubsubSubscription(
order = 18,
optional = true,
description =
"The Pub/Sub subscription being used in a Cloud Storage notification policy for DLQ"
+ " retry directory when running in regular mode.",
helpText =
"The Pub/Sub subscription being used in a Cloud Storage notification policy for DLQ"
+ " retry directory when running in regular mode. The name should be in the format"
+ " of projects/<project-id>/subscriptions/<subscription-name>. When set, the"
+ " deadLetterQueueDirectory and dlqRetryMinutes are ignored.")
String getDlqGcsPubSubSubscription();
void setDlqGcsPubSubSubscription(String value);
@TemplateParameter.Text(
order = 19,
optional = true,
description = "Directory name for holding skipped records",
helpText =
"Records skipped from reverse replication are written to this directory. Default"
+ " directory name is skip.")
@Default.String("skip")
String getSkipDirectoryName();
void setSkipDirectoryName(String value);
@TemplateParameter.Long(
order = 20,
optional = true,
description = "Maximum connections per shard.",
helpText = "This will come from shard file eventually.")
@Default.Long(10000)
Long getMaxShardConnections();
void setMaxShardConnections(Long value);
@TemplateParameter.Text(
order = 21,
optional = true,
description = "Dead letter queue directory.",
helpText =
"The file path used when storing the error queue output. "
+ "The default file path is a directory under the Dataflow job's temp location.")
@Default.String("")
String getDeadLetterQueueDirectory();
void setDeadLetterQueueDirectory(String value);
@TemplateParameter.Integer(
order = 22,
optional = true,
description = "Dead letter queue maximum retry count",
helpText =
"The max number of times temporary errors can be retried through DLQ. Defaults to 500.")
@Default.Integer(500)
Integer getDlqMaxRetryCount();
void setDlqMaxRetryCount(Integer value);
@TemplateParameter.Enum(
order = 23,
optional = true,
description = "Run mode - currently supported are : regular, retryDLQ, or retryAllDLQ",
enumOptions = {
@TemplateEnumOption(RUN_MODE_REGULAR),
@TemplateEnumOption(RUN_MODE_RETRY_DLQ),
@TemplateEnumOption(RUN_MODE_RETRY_ALL_DLQ)
},
helpText =
"This is the run mode type. Default is regular. Use `retryDLQ` mode to process exclusively severe error files concurrently with your reverse migration pipeline. Use `retryAllDLQ` mode only when the regular pipeline is stopped. This mode processes both retry and severe directories. Do NOT run `retryAllDLQ` concurrently with any active pipeline as it will cause conflicts.")
@Default.String(RUN_MODE_REGULAR)
String getRunMode();
void setRunMode(String value);
@TemplateParameter.Integer(
order = 24,
optional = true,
description = "Dead letter queue retry minutes",
helpText = "The number of minutes between dead letter queue retries. Defaults to 10.")
@Default.Integer(10)
Integer getDlqRetryMinutes();
void setDlqRetryMinutes(Integer value);
@TemplateParameter.Enum(
order = 25,
optional = true,
description = "Source database type, ex: mysql",
enumOptions = {
@TemplateEnumOption("mysql"),
@TemplateEnumOption("cassandra"),
@TemplateEnumOption("postgresql")
},
helpText = "The type of source database to reverse replicate to.")
@Default.String("mysql")
String getSourceType();
void setSourceType(String value);
@TemplateParameter.GcsReadFile(
order = 26,
optional = true,
description = "Custom transformation jar location in Cloud Storage",
helpText =
"Custom jar location in Cloud Storage that contains the custom transformation logic for processing records"
+ " in reverse replication.")
@Default.String("")
String getTransformationJarPath();
void setTransformationJarPath(String value);
@TemplateParameter.Text(
order = 27,
optional = true,
description = "Custom class name for transformation",
helpText =
"Fully qualified class name having the custom transformation logic. It is a"
+ " mandatory field in case transformationJarPath is specified")
@Default.String("")
String getTransformationClassName();
void setTransformationClassName(String value);
@TemplateParameter.Text(
order = 28,
optional = true,
description = "Custom parameters for transformation",
helpText =
"String containing any custom parameters to be passed to the custom transformation class.")
@Default.String("")
String getTransformationCustomParameters();
void setTransformationCustomParameters(String value);
@TemplateParameter.Text(
order = 29,
optional = true,
description = "Table name overrides from spanner to source",
regexes =
"^\\[([[:space:]]*\\{[[:graph:]]+[[:space:]]*,[[:space:]]*[[:graph:]]+[[:space:]]*\\}[[:space:]]*(,[[:space:]]*)*)*\\]$",
example = "[{Singers, Vocalists}, {Albums, Records}]",
helpText =
"These are the table name overrides from spanner to source. They are written in the"
+ "following format: [{SpannerTableName1, SourceTableName1}, {SpannerTableName2, SourceTableName2}]"
+ "This example shows mapping Singers table to Vocalists and Albums table to Records.")
@Default.String("")
String getTableOverrides();
void setTableOverrides(String value);
@TemplateParameter.Text(
order = 30,
optional = true,
description = "Column name overrides from spanner to source",
regexes =
"^\\[([[:space:]]*\\{[[:space:]]*[[:graph:]]+\\.[[:graph:]]+[[:space:]]*,[[:space:]]*[[:graph:]]+\\.[[:graph:]]+[[:space:]]*\\}[[:space:]]*(,[[:space:]]*)*)*\\]$",
example =
"[{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}]",
helpText =
"These are the column name overrides from spanner to source. They are written in the"
+ "following format: [{SpannerTableName1.SpannerColumnName1, SpannerTableName1.SourceColumnName1}, {SpannerTableName2.SpannerColumnName1, SpannerTableName2.SourceColumnName1}]"
+ "Note that the SpannerTableName should remain the same in both the spanner and source pair. To override table names, use tableOverrides."
+ "The example shows mapping SingerName to TalentName and AlbumName to RecordName in Singers and Albums table respectively.")
@Default.String("")
String getColumnOverrides();
void setColumnOverrides(String value);
@TemplateParameter.GcsReadFile(
order = 31,
optional = true,
description = "File based overrides from spanner to source",
helpText =
"A file which specifies the table and the column name overrides from spanner to source.")
@Default.String("")
String getSchemaOverridesFilePath();
void setSchemaOverridesFilePath(String value);
@TemplateParameter.Text(
order = 32,
optional = true,
description = "Directory name for holding filtered records",
helpText =
"Records skipped from reverse replication are written to this directory. Default"
+ " directory name is skip.")
@Default.String("filteredEvents")
String getFilterEventsDirectoryName();
void setFilterEventsDirectoryName(String value);
@TemplateParameter.Boolean(
order = 33,
optional = true,
description = "Boolean setting if reverse migration is sharded",
helpText =
"Sets the template to a sharded migration. If source shard template contains more"
+ " than one shard, the value will be set to true. This value defaults to false.")
@Default.Boolean(false)
Boolean getIsShardedMigration();
void setIsShardedMigration(Boolean value);
@TemplateParameter.Text(
order = 34,
optional = true,
description = "Failure injection parameter",
helpText = "Failure injection parameter. Only used for testing.")
@Default.String("")
String getFailureInjectionParameter();
void setFailureInjectionParameter(String value);
@TemplateParameter.Enum(
order = 35,
enumOptions = {
@TemplateEnumOption("LOW"),
@TemplateEnumOption("MEDIUM"),
@TemplateEnumOption("HIGH")
},
optional = true,
description = "Priority for Spanner RPC invocations",
helpText =
"The request priority for Cloud Spanner calls. The value must be one of:"
+ " [`HIGH`,`MEDIUM`,`LOW`]. Defaults to `HIGH`.")
@Default.Enum("HIGH")
RpcPriority getSpannerPriority();
void setSpannerPriority(RpcPriority value);
}
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
UncaughtExceptionLogger.register();
LOG.info("Starting Spanner change streams to sink");
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
boolean isRetryDLQMode = RUN_MODE_RETRY_DLQ.equals(options.getRunMode());
options.setStreaming(!isRetryDLQMode);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
long startTime = System.currentTimeMillis();
Pipeline pipeline = Pipeline.create(options);
pipeline
.getOptions()
.as(DataflowPipelineWorkerPoolOptions.class)
.setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED);
// calculate the max connections per worker
int maxNumWorkers =
pipeline.getOptions().as(DataflowPipelineWorkerPoolOptions.class).getMaxNumWorkers() > 0
? pipeline.getOptions().as(DataflowPipelineWorkerPoolOptions.class).getMaxNumWorkers()
: 1;
int connectionPoolSizePerWorker =
calculateConnectionPoolSizePerWorker(options.getMaxShardConnections(), maxNumWorkers);
String workerMachineType =
pipeline.getOptions().as(DataflowPipelineWorkerPoolOptions.class).getWorkerMachineType();
DataflowWorkerMachineTypeUtils.validateMachineSpecs(workerMachineType, 4);
// Prepare Spanner config
SpannerConfig spannerConfig =
SpannerConfig.create()
.withProjectId(ValueProvider.StaticValueProvider.of(options.getSpannerProjectId()))
.withInstanceId(ValueProvider.StaticValueProvider.of(options.getInstanceId()))
.withDatabaseId(ValueProvider.StaticValueProvider.of(options.getDatabaseId()))
.withRpcPriority(ValueProvider.StaticValueProvider.of(options.getSpannerPriority()));
// Create shadow tables
// Note that there is a limit on the number of tables that can be created per DB: 5000.
// If we create shadow tables per shard, there will be an explosion of tables.
// Anyway the shadow table has Spanner PK so no need to again separate by the shard
// Lookup by the Spanner PK should be sufficient.
// Prepare Spanner config
SpannerConfig spannerMetadataConfig =
SpannerConfig.create()
.withProjectId(ValueProvider.StaticValueProvider.of(options.getSpannerProjectId()))
.withInstanceId(ValueProvider.StaticValueProvider.of(options.getMetadataInstance()))
.withDatabaseId(ValueProvider.StaticValueProvider.of(options.getMetadataDatabase()))
.withRpcPriority(ValueProvider.StaticValueProvider.of(options.getSpannerPriority()));
// Fetch DDLs and create shadow tables in a DoFn to avoid launcher-side timeout.
PCollectionTuple ddlTuple =
pipeline.apply(
"Process Information Schema",
new SpannerInformationSchemaProcessorTransform(
spannerConfig, spannerMetadataConfig, options.getShadowTablePrefix()));
final PCollectionView<Ddl> ddlView =
ddlTuple
.get(SpannerInformationSchemaProcessorTransform.MAIN_DDL_TAG)
.apply("View Main DDL", View.asSingleton());
DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
final PCollectionView<Ddl> shadowTableDdlView =
ddlTuple
.get(SpannerInformationSchemaProcessorTransform.SHADOW_TABLE_DDL_TAG)
.apply("View Shadow DDL", View.asSingleton());
List<Shard> shards = getShardList(options.getSourceType(), options.getSourceShardsFilePath());
// cassandra is always a single sharded migration.
// for JDBC, shards size and IsShardedMigration option is used below.
String shardingMode =
options.getSourceType().equals(CASSANDRA_SOURCE_TYPE)
? Constants.SHARDING_MODE_SINGLE_SHARD
: Constants.SHARDING_MODE_MULTI_SHARD;
if (MYSQL_SOURCE_TYPE.equals(options.getSourceType())) {
validateMySQLNotReadOnly(shards);
}
SourceSchema sourceSchema = fetchSourceSchema(options, shards);
LOG.info("Source schema: {}", sourceSchema);
if (shards.size() == 1 && !options.getIsShardedMigration()) {
shardingMode = Constants.SHARDING_MODE_SINGLE_SHARD;
Shard shard = shards.get(0);
if (shard.getLogicalShardId() == null) {
shard.setLogicalShardId(Constants.DEFAULT_SHARD_ID);
LOG.info(
"Logical shard id was not found, hence setting it to : " + Constants.DEFAULT_SHARD_ID);
}
}
buildPipeline(
pipeline,
options,
sourceSchema,
shards,
ddlView,
shadowTableDdlView,
spannerConfig,
spannerMetadataConfig,
connectionPoolSizePerWorker,
shardingMode,
startTime,
maxNumWorkers);
return pipeline.run();
}
static void buildPipeline(
Pipeline pipeline,
Options options,
SourceSchema sourceSchema,
List<Shard> shards,
PCollectionView<Ddl> ddlView,
PCollectionView<Ddl> shadowTableDdlView,
SpannerConfig spannerConfig,
SpannerConfig spannerMetadataConfig,
int connectionPoolSizePerWorker,
String shardingMode,
long startTime,
int maxNumWorkers) {
DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
boolean isRegularMode = RUN_MODE_REGULAR.equals(options.getRunMode());
PCollectionTuple reconsumedElements = null;
DeadLetterQueueManager dlqManager = buildDlqManager(options);
int reshuffleBucketSize =
maxNumWorkers
* (debugOptions.getNumberOfWorkerHarnessThreads() > 0
? debugOptions.getNumberOfWorkerHarnessThreads()
: Constants.DEFAULT_WORKER_HARNESS_THREAD_COUNT);
if (isRegularMode && (!Strings.isNullOrEmpty(options.getDlqGcsPubSubSubscription()))) {
reconsumedElements =
dlqManager.getReconsumerDataTransformForFiles(
pipeline.apply(
"Read retry from PubSub",
new PubSubNotifiedDlqIO(
options.getDlqGcsPubSubSubscription(),
// file paths to ignore when re-consuming for retry
new ArrayList<String>(
Arrays.asList(
"/severe/",
"/tmp_retry",
"/tmp_severe/",
".temp",
"/tmp_skip/",
"/" + options.getSkipDirectoryName())))));
} else {
if (isRegularMode) {
reconsumedElements =
dlqManager.getReconsumerDataTransform(
pipeline.apply(dlqManager.dlqReconsumer(options.getDlqRetryMinutes())));
} else { // retryDLQ or retryAllDLQ mode
PCollection<String> oneShotRecords =
pipeline.apply("Read severe from OneShot", dlqManager.dlqOneShotReconsumer(startTime));
if (RUN_MODE_RETRY_DLQ.equals(options.getRunMode())) {
reconsumedElements = dlqManager.getReconsumerDataTransform(oneShotRecords);
} else {
// retryAllDLQ mode: Drain both the severe (one-shot) and retry (continuous) buckets
PCollection<String> continuousRecords =
pipeline.apply(
"Read retry from Continuous",
dlqManager.dlqReconsumer(options.getDlqRetryMinutes()));
PCollection<String> allRecords =
PCollectionList.of(continuousRecords)
.and(oneShotRecords)
.apply("Flatten DLQ Records", Flatten.pCollections());
reconsumedElements = dlqManager.getReconsumerDataTransform(allRecords);
}
}
}
PCollection<FailsafeElement<String, String>> dlqJsonStrRecords =
reconsumedElements
.get(DeadLetterQueueManager.RETRYABLE_ERRORS)
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
PCollection<TrimmedShardedDataChangeRecord> dlqRecords =
dlqJsonStrRecords.apply(
"Convert DLQ records to TrimmedShardedDataChangeRecord",
ParDo.of(new ConvertDlqRecordToTrimmedShardedDataChangeRecordFn()));
PCollection<TrimmedShardedDataChangeRecord> mergedRecords = null;
if (options.getFailureInjectionParameter() != null
&& !options.getFailureInjectionParameter().isBlank()) {
spannerConfig =
SpannerServiceFactoryImpl.createSpannerService(
spannerConfig, options.getFailureInjectionParameter());
}
if (isRegularMode) {
PCollection<TrimmedShardedDataChangeRecord> changeRecordsFromDB =
pipeline
.apply(
getReadChangeStreamDoFn(
options,
spannerConfig)) // This emits PCollection<DataChangeRecord> which is Spanner
// change
// stream data
.apply("Reshuffle", Reshuffle.viaRandomKey())
.apply("Filteration", ParDo.of(new FilterRecordsFn(options.getFiltrationMode())))
.apply("Preprocess", ParDo.of(new PreprocessRecordsFn()));
mergedRecords =
PCollectionList.of(changeRecordsFromDB)
.and(dlqRecords)
.apply("Flatten", Flatten.pCollections());
} else {
mergedRecords = dlqRecords;
}
CustomTransformation customTransformation =
CustomTransformation.builder(
options.getTransformationJarPath(), options.getTransformationClassName())
.setCustomParameters(options.getTransformationCustomParameters())
.build();
if (options.getFailureInjectionParameter() != null
&& !options.getFailureInjectionParameter().isBlank()) {
spannerMetadataConfig =
SpannerServiceFactoryImpl.createSpannerService(
spannerMetadataConfig, options.getFailureInjectionParameter());
}
SourceWriterTransform.Result sourceWriterOutput =
mergedRecords
.apply(
"AssignShardId", // This emits PCollection<KV<Long,
// TrimmedShardedDataChangeRecord>> which is Spanner change stream data with key as
// PK
// mod
// number of parallelism
ParDo.of(
new AssignShardIdFn(
spannerConfig,
ddlView,
sourceSchema,
shardingMode,
shards.get(0).getLogicalShardId(),
options.getSkipDirectoryName(),
options.getShardingCustomJarPath(),
options.getShardingCustomClassName(),
options.getShardingCustomParameters(),
options.getMaxShardConnections() * shards.size(),
options.getSourceType(),
options.getSessionFilePath(),
options.getSchemaOverridesFilePath(),
options.getTableOverrides(),
options
.getColumnOverrides())) // currently assume that all shards accept
// the
// same source type
.withSideInputs(ddlView))
.setCoder(
KvCoder.of(VarLongCoder.of(), AvroCoder.of(TrimmedShardedDataChangeRecord.class)))
.apply("Reshuffle2", Reshuffle.of())
.apply(
"Write to source",
new SourceWriterTransform(
shards,
spannerMetadataConfig,
options.getSourceDbTimezoneOffset(),
ddlView,
shadowTableDdlView,
sourceSchema,
options.getShadowTablePrefix(),
options.getSkipDirectoryName(),
connectionPoolSizePerWorker,
options.getSourceType(),
customTransformation,
options.getSessionFilePath(),
options.getSchemaOverridesFilePath(),
options.getTableOverrides(),
options.getColumnOverrides()));
PCollection<FailsafeElement<String, String>> dlqPermErrorRecords =
reconsumedElements
.get(DeadLetterQueueManager.PERMANENT_ERRORS)
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
PCollection<FailsafeElement<String, String>> permErrorsFromSourceWriter =
sourceWriterOutput
.permanentErrors()
.setCoder(StringUtf8Coder.of())
.apply(
"Reshuffle3", Reshuffle.<String>viaRandomKey().withNumBuckets(reshuffleBucketSize))
.apply(
"Convert permanent errors from source writer to DLQ format",
ParDo.of(new ConvertChangeStreamErrorRecordToFailsafeElementFn()))
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
PCollection<FailsafeElement<String, String>> permanentErrors =
PCollectionList.of(dlqPermErrorRecords)
.and(permErrorsFromSourceWriter)
.apply(Flatten.pCollections())
.apply("Reshuffle", Reshuffle.viaRandomKey());
permanentErrors
.apply("Update DLQ metrics", ParDo.of(new UpdateDlqMetricsFn(isRegularMode)))
.apply(
"DLQ: Write Severe errors to GCS",
MapElements.via(new StringDeadLetterQueueSanitizer()))
.setCoder(StringUtf8Coder.of())
.apply(
"Write To DLQ for severe errors",
DLQWriteTransform.WriteDLQ.newBuilder()
.withDlqDirectory(dlqManager.getSevereDlqDirectoryWithDateTime())
.withTmpDirectory((options).getDeadLetterQueueDirectory() + "/tmp_severe/")
.setIncludePaneInfo(true)
.build());
PCollection<FailsafeElement<String, String>> retryErrors =
sourceWriterOutput
.retryableErrors()
.setCoder(StringUtf8Coder.of())
.apply(
"Reshuffle4", Reshuffle.<String>viaRandomKey().withNumBuckets(reshuffleBucketSize))
.apply(
"Convert retryable errors from source writer to DLQ format",
ParDo.of(new ConvertChangeStreamErrorRecordToFailsafeElementFn()))
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
retryErrors
.apply(
"DLQ: Write retryable Failures to GCS",
MapElements.via(new StringDeadLetterQueueSanitizer()))
.setCoder(StringUtf8Coder.of())
.apply(
"Write To DLQ for retryable errors",
DLQWriteTransform.WriteDLQ.newBuilder()
.withDlqDirectory(dlqManager.getRetryDlqDirectoryWithDateTime())
.withTmpDirectory(options.getDeadLetterQueueDirectory() + "/tmp_retry/")
.setIncludePaneInfo(true)
.build());
PCollection<FailsafeElement<String, String>> skippedRecords =
sourceWriterOutput
.skippedSourceWrites()
.setCoder(StringUtf8Coder.of())
.apply(
"Reshuffle5", Reshuffle.<String>viaRandomKey().withNumBuckets(reshuffleBucketSize))
.apply(
"Convert skipped records from source writer to DLQ format",
ParDo.of(new ConvertChangeStreamErrorRecordToFailsafeElementFn()))
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
skippedRecords
.apply(
"Write skipped records to GCS", MapElements.via(new StringDeadLetterQueueSanitizer()))
.setCoder(StringUtf8Coder.of())
.apply(
"Writing skipped records to GCS",
DLQWriteTransform.WriteDLQ.newBuilder()
.withDlqDirectory(
options.getDeadLetterQueueDirectory() + "/" + options.getSkipDirectoryName())
.withTmpDirectory(options.getDeadLetterQueueDirectory() + "/tmp_skip/")
.setIncludePaneInfo(true)
.build());
}
/**
* Returns a list of shards based on the source type and source shards file path. This should be
* removed in Phase 2 of Standardizing config.
*
* @param sourceType The type of the source database.
* @param sourceShardsFilePath The GCS path to the source shards configuration file.
* @return A list of shards.
*/
public static List<Shard> getShardList(String sourceType, String sourceShardsFilePath) {
ISecretManagerAccessor secretManagerAccessor = new SecretManagerAccessorImpl();
SourceConfigParser sourceConfigParser = new SourceConfigParser(secretManagerAccessor);
SourceConnectionConfig sourceConnectionConfig;
try {
// Parse the source shards configuration file to respective
// SourceConnectionConfig.
sourceConnectionConfig =
sourceConfigParser.parseConfiguration(sourceType, sourceShardsFilePath);
} catch (Exception e) {
LOG.error("Error parsing source config", e);
throw new RuntimeException("Error parsing source config", e);
}
List<Shard> shards;
if (sourceConnectionConfig instanceof JdbcShardConfig) {
shards = ((JdbcShardConfig) sourceConnectionConfig).getShardConfigs();
LOG.info("JDBC shard config is parsed.");
} else if (sourceConnectionConfig instanceof CassandraConnectionConfig) {
CassandraConfigFileReader cassandraConfigFileReader = new CassandraConfigFileReader();
shards =
cassandraConfigFileReader.getCassandraShard(
((CassandraConnectionConfig) sourceConnectionConfig).getOptionsMap());
LOG.info("Cassandra shard config is parsed.");
} else {
String errorMessage =
"Invalid source config for source type: "
+ sourceType
+ ". Source config parsed to: "
+ sourceConnectionConfig.getClass()
+ ". Source config file path: "
+ sourceShardsFilePath;
LOG.error(errorMessage);
throw new RuntimeException(errorMessage);
}
Preconditions.checkArgument(
shards != null && !shards.isEmpty(), "Shard list should have at least 1 element.");
return shards;
}
public static SpannerIO.ReadChangeStream getReadChangeStreamDoFn(
Options options, SpannerConfig spannerConfig) {
Timestamp startTime = Timestamp.now();
if (!options.getStartTimestamp().equals("")) {
startTime = Timestamp.parseTimestamp(options.getStartTimestamp());
}
String changeStreamMetadataDb = options.getChangeStreamMetadataDatabase();
if (Strings.isNullOrEmpty(changeStreamMetadataDb)) {
changeStreamMetadataDb = options.getMetadataDatabase();
}
LOG.info("Using database {} for change stream metadata.", changeStreamMetadataDb);
SpannerIO.ReadChangeStream readChangeStreamDoFn =
SpannerIO.readChangeStream()
.withSpannerConfig(spannerConfig)
.withChangeStreamName(options.getChangeStreamName())
.withMetadataInstance(options.getMetadataInstance())
.withMetadataDatabase(changeStreamMetadataDb)
.withInclusiveStartAt(startTime)
.withRpcPriority(options.getSpannerPriority());
if (options.getSpannerMetadataTableName() != null
&& !options.getSpannerMetadataTableName().isEmpty()) {
readChangeStreamDoFn =
readChangeStreamDoFn.withMetadataTable(options.getSpannerMetadataTableName());
}
if (!options.getEndTimestamp().equals("")) {
return readChangeStreamDoFn.withInclusiveEndAt(
Timestamp.parseTimestamp(options.getEndTimestamp()));
}
return readChangeStreamDoFn;
}
static DeadLetterQueueManager buildDlqManager(Options options) {
String tempLocation =
options.as(DataflowPipelineOptions.class).getTempLocation().endsWith("/")
? options.as(DataflowPipelineOptions.class).getTempLocation()
: options.as(DataflowPipelineOptions.class).getTempLocation() + "/";
String dlqDirectory =
options.getDeadLetterQueueDirectory().isEmpty()
? tempLocation + "dlq/"
: options.getDeadLetterQueueDirectory();
LOG.info("Dead-letter queue directory: {}", dlqDirectory);
options.setDeadLetterQueueDirectory(dlqDirectory);
return DeadLetterQueueManager.create(dlqDirectory, options.getDlqMaxRetryCount(), true);
}
static Connection createJdbcConnection(
Shard shard, String driverClassName, String jdbcUrlPrefix) {
try {
String sourceConnectionUrl =
new StringBuilder()
.append(jdbcUrlPrefix)
.append(shard.getHost())
.append(":")
.append(shard.getPort())
.append("/")
.append(shard.getDbName())
.toString();
HikariConfig config = new HikariConfig();
config.setJdbcUrl(sourceConnectionUrl);
config.setUsername(shard.getUserName());
config.setPassword(shard.getPassword());
config.setDriverClassName(driverClassName);
HikariDataSource ds = new HikariDataSource(config);
return ds.getConnection();
} catch (java.sql.SQLException e) {
LOG.error("Sql error while discovering jdbc schema: {}", e);
throw new RuntimeException(e);
}
}
/**
* Creates a {@link CqlSession} for the given {@link CassandraShard}.
*
* @param cassandraShard The shard containing connection details.
* @return A {@link CqlSession} instance.
*/
static CqlSession createCqlSession(CassandraShard cassandraShard) {
CqlSessionBuilder builder = CqlSession.builder();
DriverConfigLoader configLoader =
CassandraDriverConfigLoader.fromOptionsMap(cassandraShard.getOptionsMap());
builder.withConfigLoader(configLoader);
return builder.build();
}
static void validateMySQLNotReadOnly(List<Shard> shards) {
for (Shard shard : shards) {
try (Connection conn = createJdbcConnection(shard, MYSQL_DRIVER, MYSQL_JDBC_PREFIX)) {
if (conn != null) {
try (Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT @@read_only")) {
if (rs != null && rs.next() && rs.getInt(1) == 1) {
throw new RuntimeException(
"MySQL destination is in read-only mode for shard: " + shard.getLogicalShardId());
}
}
}
} catch (SQLException e) {
LOG.error(
"Error checking MySQL read-only status for shard {}: {}",
shard.getLogicalShardId(),
e.getMessage());
throw new RuntimeException("Error checking MySQL read-only status", e);
}
}
}
static SourceSchema fetchSourceSchema(Options options, List<Shard> shards) {
try {
return getSourceSchema(options, shards);
} catch (SQLException e) {
throw new RuntimeException("Unable to discover jdbc schema", e);
}
}
static SourceSchema getSourceSchema(Options options, List<Shard> shards) throws SQLException {
if (options.getSourceType().equals(MYSQL_SOURCE_TYPE)) {
try (Connection connection =
createJdbcConnection(shards.get(0), MYSQL_DRIVER, MYSQL_JDBC_PREFIX)) {
return new MySqlInformationSchemaScanner(connection, shards.get(0).getDbName()).scan();
}
} else if (options.getSourceType().equals(POSTGRES_SOURCE_TYPE)) {
try (Connection connection =
createJdbcConnection(shards.get(0), POSTGRESQL_DRIVER, POSTGRESQL_JDBC_PREFIX)) {
return new PostgreSQLInformationSchemaScanner(
connection, shards.get(0).getDbName(), shards.get(0).getNamespace())
.scan();
}
} else {
try (CqlSession session = createCqlSession((CassandraShard) shards.get(0))) {
return new CassandraInformationSchemaScanner(
session, ((CassandraShard) shards.get(0)).getKeySpaceName())
.scan();
}
}
}
static int calculateConnectionPoolSizePerWorker(Long maxShardConnections, int maxNumWorkers) {
int connectionPoolSizePerWorker = (int) (maxShardConnections / maxNumWorkers);
if (connectionPoolSizePerWorker < 1) {
throw new IllegalArgumentException(
"Max Dataflow workers "
+ maxNumWorkers
+ " is more than max per shard connections: "
+ maxShardConnections
+ " this can lead to more"
+ " database connections than desired. Either reduce the max allowed workers or"
+ " incease the max shard connections");
}
return connectionPoolSizePerWorker;
}
}