Halaman ini menunjukkan cara membuat pipeline Dataflow yang menggunakan dan meneruskan data perubahan Spanner dengan menggunakan aliran perubahan. Anda dapat menggunakan contoh kode di halaman ini untuk membangun pipeline kustom.
Konsep inti
Berikut adalah beberapa konsep inti untuk pipeline Dataflow untuk aliran perubahan.
Dataflow
Dataflow adalah layanan serverless, cepat, dan hemat biaya yang mendukung pemrosesan streaming dan batch. Layanan ini menyediakan portabilitas dengan tugas pemrosesan yang ditulis menggunakan library open source Apache Beam dan mengotomatiskan penyediaan infrastruktur serta pengelolaan cluster. Dataflow menyediakan streaming mendekati real-time saat membaca dari aliran data perubahan.
Anda dapat menggunakan Dataflow untuk menggunakan aliran perubahan Spanner dengan konektor SpannerIO, yang menawarkan abstraksi melalui Spanner API untuk membuat kueri aliran perubahan. Dengan konektor ini, Anda tidak perlu mengelola siklus proses partisi aliran perubahan, yang diperlukan saat Anda menggunakan Spanner API secara langsung. Konektor ini memberi Anda aliran data perubahan catatan sehingga Anda dapat lebih fokus pada logika aplikasi, dan tidak terlalu fokus pada detail API tertentu dan partisi aliran perubahan dinamis. Sebaiknya gunakan konektor SpannerIO, bukan Spanner API dalam sebagian besar situasi saat Anda perlu membaca data aliran perubahan.
Template Dataflow adalah pipeline Dataflow siap pakai yang menerapkan kasus penggunaan umum. Lihat Template Dataflow untuk mengetahui ringkasannya.
Pipeline Dataflow
Pipeline Dataflow aliran perubahan Spanner terdiri dari empat bagian utama:
- Database Spanner dengan aliran perubahan
- Konektor SpannerIO
- Transformasi dan sink buatan pengguna
- Penulis I/O sink Apache Beam
Aliran data perubahan Spanner
Untuk mengetahui detail tentang cara membuat aliran perubahan, lihat Membuat aliran perubahan.
Konektor Apache Beam SpannerIO
Ini adalah konektor SpannerIO yang dijelaskan di bagian Dataflow sebelumnya.
Konektor ini adalah konektor I/O sumber yang memancarkan PCollection rekaman perubahan data ke tahap selanjutnya dalam pipeline. Waktu peristiwa
untuk setiap rekaman perubahan data yang dipancarkan
adalah stempel waktu penerapan. Perhatikan bahwa rekaman yang dikeluarkan
tidak berurutan, dan konektor SpannerIO menjamin tidak akan ada
rekaman yang terlambat.
Saat bekerja dengan aliran perubahan, Dataflow menggunakan pembuatan titik pemeriksaan. Akibatnya, setiap pekerja mungkin menunggu hingga interval titik pemeriksaan yang dikonfigurasi untuk mem-buffer perubahan sebelum mengirim perubahan untuk diproses lebih lanjut.
Transformasi yang ditentukan pengguna
Transformasi yang ditentukan pengguna memungkinkan pengguna menggabungkan, mengubah, atau memodifikasi data pemrosesan dalam pipeline Dataflow. Kasus penggunaan umum untuk hal ini adalah penghapusan informasi identitas pribadi, pemenuhan persyaratan format data hilir, dan pengurutan. Lihat dokumentasi resmi Apache Beam untuk panduan pemrograman tentang transformasi.
Penulis I/O sink Apache Beam
Apache Beam berisi konektor I/O bawaan yang dapat digunakan untuk menulis dari pipeline Dataflow ke sink data seperti BigQuery. Sebagian besar tujuan data umum didukung secara native.
Template dataflow
Template Dataflow menyediakan metode untuk membuat tugas Dataflow berdasarkan image Docker bawaan untuk kasus penggunaan umum menggunakan konsol Google Cloud , CLI Google Cloud , atau panggilan Rest API.
Untuk aliran perubahan Spanner, kami menyediakan tiga template flex Dataflow:
Batasan berikut berlaku saat Anda menggunakan template Spanner change streams to Pub/Sub:
Pub/Sub memiliki batasan ukuran pesan 10 MB. Untuk mengetahui informasi selengkapnya, lihat Kuota dan batas Pub/Sub.
Template Spanner change streams to Pub/Sub tidak mendukung penanganan pesan berukuran besar karena batasan Pub/Sub.
Menetapkan Izin IAM untuk template Dataflow
Sebelum membuat tugas Dataflow dengan tiga template fleksibel yang tercantum, pastikan Anda memiliki izin IAM yang diperlukan untuk akun layanan berikut:
Jika tidak memiliki izin IAM yang diperlukan, Anda harus menentukan akun layanan worker yang dikelola pengguna untuk membuat tugas Dataflow. Untuk mengetahui informasi selengkapnya, lihat Keamanan dan izin Dataflow.
Saat Anda mencoba menjalankan tugas dari template fleksibel Dataflow tanpa semua izin yang diperlukan, tugas Anda mungkin gagal dengan error failed to read the result file error atau permission denied on resource error. Untuk mengetahui informasi selengkapnya, lihat Memecahkan masalah Template Flex.
Membangun pipeline Dataflow
Bagian ini membahas konfigurasi awal konektor, dan memberikan contoh untuk integrasi umum dengan fitur aliran perubahan Spanner.
Untuk mengikuti langkah-langkah ini, Anda memerlukan lingkungan pengembangan Java untuk Dataflow. Untuk mengetahui informasi selengkapnya, lihat Membuat pipeline Dataflow menggunakan Java.
Membuat aliran perubahan
Untuk mengetahui detail tentang cara membuat aliran perubahan, lihat Membuat aliran perubahan. Untuk melanjutkan langkah berikutnya, Anda harus memiliki database Spanner dengan aliran perubahan yang dikonfigurasi.
Memberikan hak istimewa kontrol akses yang sangat terperinci
Jika Anda ingin pengguna kontrol akses terperinci menjalankan tugas Dataflow, pastikan pengguna diberi akses ke peran database yang memiliki hak istimewa SELECT pada aliran perubahan dan hak istimewa EXECUTE pada fungsi bernilai tabel aliran perubahan. Pastikan juga bahwa
principal menentukan peran database dalam konfigurasi SpannerIO atau dalam
template flex Dataflow.
Untuk mengetahui informasi selengkapnya, lihat Tentang kontrol akses terperinci.
Tambahkan konektor SpannerIO sebagai dependensi
Konektor Apache Beam SpannerIO merangkum kompleksitas penggunaan aliran perubahan secara langsung menggunakan Cloud Spanner API, yang memancarkan PCollection dari rekaman data aliran perubahan ke tahap selanjutnya dalam pipeline.
Objek ini dapat digunakan di tahap lain dalam pipeline Dataflow pengguna. Integrasi aliran data perubahan adalah bagian dari
konektor SpannerIO. Agar dapat menggunakan konektor SpannerIO, dependensi
harus ditambahkan ke file pom.xml Anda:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>
Membuat database metadata
Konektor perlu melacak setiap partisi saat menjalankan pipeline Apache Beam. Metadata ini disimpan dalam tabel Spanner yang dibuat oleh konektor selama inisialisasi. Anda menentukan database tempat tabel ini akan dibuat saat mengonfigurasi konektor.
Seperti yang dijelaskan dalam Praktik terbaik aliran perubahan, sebaiknya buat database baru untuk tujuan ini, daripada mengizinkan konektor menggunakan database aplikasi Anda untuk menyimpan tabel metadatanya.
Pemilik tugas Dataflow yang menggunakan konektor SpannerIO harus memiliki izin IAM berikut yang ditetapkan dengan database metadata ini:
spanner.databases.updateDdlspanner.databases.beginReadOnlyTransactionspanner.databases.beginOrRollbackReadWriteTransactionspanner.databases.readspanner.databases.selectspanner.databases.writespanner.sessions.createspanner.sessions.get
Mengonfigurasi konektor
Konektor aliran data perubahan Spanner dapat dikonfigurasi sebagai berikut:
SpannerConfig spannerConfig = SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role"); // Needed for fine-grained access control only
Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
startTime.getSeconds() + (10 * 60),
startTime.getNanos()
);
SpannerIO
.readChangeStream()
.withSpannerConfig(spannerConfig)
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-meta-instance-id")
.withMetadataDatabase("my-meta-database-id")
.withMetadataTable("my-meta-table-name")
.withRpcPriority(RpcPriority.MEDIUM)
.withInclusiveStartAt(startTime)
.withInclusiveEndAt(endTime);
Berikut adalah deskripsi opsi readChangeStream():
Konfigurasi Spanner (Wajib)
Digunakan untuk mengonfigurasi project, instance, dan database tempat aliran perubahan dibuat dan harus dikueri. Juga secara opsional menentukan peran database yang akan digunakan saat pokok IAM yang menjalankan tugas Dataflow adalah pengguna kontrol akses terperinci. Tugas mengasumsikan peran database ini untuk akses ke aliran perubahan. Untuk mengetahui informasi selengkapnya, lihat Tentang kontrol akses terperinci.
Nama aliran perubahan (Wajib diisi)
Nama ini mengidentifikasi aliran perubahan secara unik. Nama di sini harus sama dengan nama yang digunakan saat membuatnya.
ID instance metadata (Opsional)
Ini adalah instance untuk menyimpan metadata yang digunakan oleh konektor untuk mengontrol penggunaan data API aliran perubahan.
ID database metadata (Wajib)
Ini adalah database untuk menyimpan metadata yang digunakan oleh konektor untuk mengontrol penggunaan data API aliran perubahan.
Nama tabel metadata (Opsional)
Tindakan ini hanya boleh digunakan saat memperbarui pipeline yang ada.
Ini adalah nama tabel metadata yang sudah ada dan akan digunakan oleh konektor. Ini digunakan oleh konektor untuk menyimpan metadata guna mengontrol penggunaan data API aliran perubahan. Jika opsi ini tidak ada, Spanner akan membuat tabel baru dengan nama yang dibuat saat inisialisasi konektor.
Prioritas RPC (Opsional)
Prioritas permintaan yang akan
digunakan untuk kueri aliran perubahan. Jika parameter ini dihilangkan, high
priority akan digunakan.
InclusiveStartAt (Wajib)
Perubahan dari stempel waktu yang diberikan ditampilkan kepada pemanggil.
InclusiveEndAt (Opsional)
Perubahan hingga stempel waktu tertentu akan dikembalikan ke pemanggil. Jika parameter ini dihilangkan, perubahan akan dipancarkan tanpa batas.
Menambahkan transformasi dan sink untuk memproses data perubahan
Setelah langkah-langkah sebelumnya selesai, konektor SpannerIO yang dikonfigurasi siap
untuk memancarkan PCollection objek DataChangeRecord.
Lihat Contoh transformasi dan sink untuk beberapa contoh konfigurasi pipeline yang memproses data streaming ini dengan berbagai cara.
Perhatikan bahwa rekaman aliran perubahan yang dikeluarkan oleh konektor SpannerIO tidak berurutan. Hal ini karena PCollection tidak memberikan jaminan pengurutan apa pun. Jika Anda memerlukan streaming yang diurutkan, Anda harus mengelompokkan dan mengurutkan rekaman sebagai transformasi dalam pipeline: lihat Contoh: Urutkan menurut kunci. Anda dapat memperluas contoh ini untuk mengurutkan data berdasarkan kolom data apa pun, seperti berdasarkan ID transaksi.
Contoh transformasi dan tujuan
Anda dapat menentukan transformasi Anda sendiri dan menentukan tujuan untuk menulis data. Dokumentasi Apache Beam menyediakan berbagai transformasi yang dapat diterapkan, serta konektor I/O siap pakai untuk menulis data ke dalam sistem eksternal.
Contoh: Urutkan berdasarkan kunci
Contoh kode ini memancarkan rekaman perubahan data yang diurutkan berdasarkan stempel waktu commit dan dikelompokkan berdasarkan kunci utama menggunakan konektor Dataflow.
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(ParDo.of(new BreakRecordByModFn()))
.apply(ParDo.of(new KeyByIdFn()))
.apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
// Subsequent processing goes here
Contoh kode ini menggunakan status dan timer untuk menyimpan data dalam buffer untuk setiap kunci, dan menetapkan waktu habis masa berlaku timer ke beberapa waktu T yang dikonfigurasi pengguna di masa mendatang (ditentukan dalam fungsi BufferKeyUntilOutputTimestamp). Saat tanda air Dataflow melewati waktu T, kode ini akan menghapus semua data dalam buffer dengan stempel waktu kurang dari T, mengurutkan data ini berdasarkan stempel waktu penerapan, dan menampilkan pasangan nilai kunci dengan:
- Kuncinya adalah kunci input, yaitu kunci utama yang di-hash ke array bucket berukuran 1.000.
- Nilainya adalah record perubahan data yang diurutkan dan di-buffer untuk kunci.
Untuk setiap kunci, kami memiliki jaminan berikut:
- Timer dijamin akan diaktifkan sesuai urutan stempel waktu masa berlaku.
- Tahap hilir dijamin akan menerima elemen dalam urutan yang sama dengan urutan pembuatannya.
Misalnya, dengan kunci nilai 100, timer akan diaktifkan pada T1 dan T10, sehingga menghasilkan kumpulan data perubahan pada setiap stempel waktu. Karena catatan perubahan data yang dihasilkan pada T1 dibuat sebelum catatan perubahan data yang dihasilkan pada T10, catatan perubahan data yang dihasilkan pada T1 juga dijamin akan diterima oleh tahap berikutnya sebelum catatan perubahan data yang dihasilkan pada T10. Mekanisme ini membantu kami menjamin pengurutan stempel waktu commit yang ketat per kunci utama untuk pemrosesan hilir.
Proses ini akan berulang hingga pipeline berakhir dan semua rekaman perubahan data telah diproses (atau akan berulang tanpa batas jika tidak ada waktu berakhir yang ditentukan).
Perhatikan bahwa contoh kode ini menggunakan status dan timer, bukan jendela, untuk melakukan pengurutan per kunci. Alasannya adalah jendela tidak dijamin diproses secara berurutan. Artinya, jendela yang lebih lama dapat diproses setelah jendela yang lebih baru, yang dapat menyebabkan pemrosesan tidak berurutan.
BreakRecordByModFn
Setiap catatan perubahan data dapat berisi beberapa modifikasi. Setiap mod mewakili penyisipan, pembaruan, atau penghapusan ke satu nilai kunci utama. Fungsi ini memecah setiap rekaman perubahan data menjadi rekaman perubahan data terpisah, satu per modifikasi.
private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
DataChangeRecord> {
@ProcessElement
public void processElement(
@Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
outputReceiver) {
record.getMods().stream()
.map(
mod ->
new DataChangeRecord(
record.getPartitionToken(),
record.getCommitTimestamp(),
record.getServerTransactionId(),
record.isLastRecordInTransactionInPartition(),
record.getRecordSequence(),
record.getTableName(),
record.getRowType(),
Collections.singletonList(mod),
record.getModType(),
record.getValueCaptureType(),
record.getNumberOfRecordsInTransaction(),
record.getNumberOfPartitionsInTransaction(),
record.getTransactionTag(),
record.isSystemTransaction(),
record.getMetadata()))
.forEach(outputReceiver::output);
}
}
KeyByIdFn
Fungsi ini menggunakan DataChangeRecord dan menghasilkan DataChangeRecord yang dikunci oleh kunci utama Spanner yang di-hash ke nilai bilangan bulat.
private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>> {
// NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
// Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
// of states and timers for performance purposes.
// Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
// On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
private static final int NUMBER_OF_BUCKETS = 1000;
@ProcessElement
public void processElement(
@Element DataChangeRecord record,
OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
// Hash the received keys into a bucket in order to have a
// deterministic number of buffers and timers.
String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);
outputReceiver.output(KV.of(bucketIndex, record));
}
}
BufferKeyUntilOutputTimestamp
Timer dan buffer adalah per kunci. Fungsi ini menyimpan setiap rekaman perubahan data hingga watermark melewati stempel waktu saat kita ingin menampilkan rekaman perubahan data yang disimpan.
Kode ini menggunakan timer perulangan untuk menentukan kapan harus mengosongkan buffer:
- Saat melihat catatan perubahan data untuk kunci pertama kalinya, timer akan disetel untuk diaktifkan pada stempel waktu penerapan catatan perubahan data +
incrementIntervalSeconds(opsi yang dapat dikonfigurasi pengguna). - Saat timer diaktifkan, timer akan menambahkan semua catatan perubahan data dalam buffer dengan stempel waktu yang kurang dari waktu habis masa berlaku timer ke
recordsToOutput. Jika buffer memiliki catatan perubahan data yang stempel waktunya lebih besar dari atau sama dengan waktu habis masa berlaku timer, catatan perubahan data tersebut akan ditambahkan kembali ke buffer, bukan dikeluarkan. Kemudian, timer berikutnya disetel ke waktu habis masa berlaku timer saat ini ditambahincrementIntervalInSeconds. - Jika
recordsToOutputtidak kosong, fungsi akan mengurutkan rekaman perubahan data direcordsToOutputberdasarkan stempel waktu penerapan dan ID transaksi, lalu menampilkannya.
private static class BufferKeyUntilOutputTimestamp extends
DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>> {
private static final Logger LOG =
LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);
private final long incrementIntervalInSeconds = 2;
private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
this.incrementIntervalInSeconds = incrementIntervalInSeconds;
}
@SuppressWarnings("unused")
@TimerId("timer")
private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@StateId("buffer")
private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();
@StateId("keyString")
private final StateSpec<ValueState<String>> keyString =
StateSpecs.value(StringUtf8Coder.of());
@ProcessElement
public void process(
@Element KV<String, DataChangeRecord> element,
@StateId("buffer") BagState<DataChangeRecord> buffer,
@TimerId("timer") Timer timer,
@StateId("keyString") ValueState<String> keyString) {
buffer.add(element.getValue());
// Only set the timer if this is the first time we are receiving a data change
// record with this key.
String elementKey = keyString.read();
if (elementKey == null) {
Instant commitTimestamp =
new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
Instant outputTimestamp =
commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
timer.set(outputTimestamp);
keyString.write(element.getKey());
}
}
@OnTimer("timer")
public void onExpiry(
OnTimerContext context,
@StateId("buffer") BagState<DataChangeRecord> buffer,
@TimerId("timer") Timer timer,
@StateId("keyString") ValueState<String> keyString) {
if (!buffer.isEmpty().read()) {
String elementKey = keyString.read();
final List<DataChangeRecord> records =
StreamSupport.stream(buffer.read().spliterator(), false)
.collect(Collectors.toList());
buffer.clear();
List<DataChangeRecord> recordsToOutput = new ArrayList<>();
for (DataChangeRecord record : records) {
Instant recordCommitTimestamp =
new Instant(record.getCommitTimestamp().toSqlTimestamp());
final String recordString =
record.getMods().get(0).getNewValuesJson().isEmpty()
? "Deleted record"
: record.getMods().get(0).getNewValuesJson();
// When the watermark passes time T, this means that all records with
// event time < T have been processed and successfully committed. Since the
// timer fires when the watermark passes the expiration time, we should
// only output records with event time < expiration time.
if (recordCommitTimestamp.isBefore(context.timestamp())) {
LOG.info(
"Outputting record with key {} and value {} at expiration " +
"timestamp {}",
elementKey,
recordString,
context.timestamp().toString());
recordsToOutput.add(record);
} else {
LOG.info(
"Expired at {} but adding record with key {} and value {} back to " +
"buffer due to commit timestamp {}",
context.timestamp().toString(),
elementKey,
recordString,
recordCommitTimestamp.toString());
buffer.add(record);
}
}
// Output records, if there are any to output.
if (!recordsToOutput.isEmpty()) {
// Order the records in place, and output them. The user would need
// to implement DataChangeRecordComparator class that sorts the
// data change records by commit timestamp and transaction ID.
Collections.sort(recordsToOutput, new DataChangeRecordComparator());
context.outputWithTimestamp(
KV.of(elementKey, recordsToOutput), context.timestamp());
LOG.info(
"Expired at {}, outputting records for key {}",
context.timestamp().toString(),
elementKey);
} else {
LOG.info("Expired at {} with no records", context.timestamp().toString());
}
}
Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
LOG.info("Setting next timer to {}", nextTimer.toString());
timer.set(nextTimer);
} else {
LOG.info(
"Timer not being set since the buffer is empty: ");
keyString.clear();
}
}
}
Mengurutkan transaksi
Pipeline ini dapat diubah untuk mengurutkan berdasarkan ID transaksi dan stempel waktu penerapan. Untuk melakukannya, simpan sementara data untuk setiap pasangan ID transaksi / stempel waktu commit, bukan untuk setiap kunci Spanner. Hal ini memerlukan modifikasi kode di KeyByIdFn.
Contoh: Menggabungkan transaksi
Contoh kode ini membaca catatan perubahan data, mengumpulkan semua catatan perubahan data yang termasuk dalam transaksi yang sama menjadi satu elemen, dan menghasilkan elemen tersebut. Perhatikan bahwa transaksi yang dihasilkan oleh kode contoh ini tidak diurutkan berdasarkan stempel waktu penerapan.
Contoh kode ini menggunakan buffer untuk menyusun transaksi dari rekaman perubahan data. Setelah menerima catatan perubahan data yang termasuk dalam transaksi untuk pertama kalinya, operasi ini akan membaca kolom numberOfRecordsInTransaction dalam catatan perubahan data, yang menjelaskan perkiraan jumlah catatan perubahan data yang termasuk dalam transaksi tersebut. Proses ini akan menyimpan sementara catatan perubahan data yang termasuk dalam transaksi tersebut hingga jumlah catatan yang disimpan sementara cocok dengan numberOfRecordsInTransaction, lalu menampilkan catatan perubahan data yang dikelompokkan.
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(ParDo.of(new KeyByTransactionIdFn()))
.apply(ParDo.of(new TransactionBoundaryFn()))
// Subsequent processing goes here
KeyByTransactionIdFn
Fungsi ini menerima DataChangeRecord dan menghasilkan DataChangeRecord yang dikunci oleh ID transaksi.
private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>> {
@ProcessElement
public void processElement(
@Element DataChangeRecord record,
OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
outputReceiver.output(KV.of(record.getServerTransactionId(), record));
}
}
TransactionBoundaryFn
Buffer TransactionBoundaryFn menerima key-value pair
{TransactionId, DataChangeRecord} dari KeyByTransactionIdFn dan
mem-buffer-nya dalam grup berdasarkan TransactionId. Jika jumlah
data yang di-buffer sama dengan jumlah data yang ada dalam
seluruh transaksi, fungsi ini akan mengurutkan objek DataChangeRecord
dalam grup berdasarkan urutan data dan menghasilkan key-value pair
{CommitTimestamp, TransactionId}, Iterable<DataChangeRecord>.
Di sini, kita mengasumsikan bahwa SortKey adalah class yang ditentukan pengguna yang merepresentasikan pasangan {CommitTimestamp, TransactionId}. Untuk mengetahui informasi selengkapnya tentang
SortKey, lihat contoh penerapan.
private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>> {
@StateId("buffer")
private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();
@StateId("count")
private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();
@ProcessElement
public void process(
ProcessContext context,
@StateId("buffer") BagState<DataChangeRecord> buffer,
@StateId("count") ValueState<Integer> countState) {
final KV<String, DataChangeRecord> element = context.element();
final DataChangeRecord record = element.getValue();
buffer.add(record);
int count = (countState.read() != null ? countState.read() : 0);
count = count + 1;
countState.write(count);
if (count == record.getNumberOfRecordsInTransaction()) {
final List<DataChangeRecord> sortedRecords =
StreamSupport.stream(buffer.read().spliterator(), false)
.sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
.collect(Collectors.toList());
final Instant commitInstant =
new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
.getTime());
context.outputWithTimestamp(
KV.of(
new SortKey(sortedRecords.get(0).getCommitTimestamp(),
sortedRecords.get(0).getServerTransactionId()),
sortedRecords),
commitInstant);
buffer.clear();
countState.clear();
}
}
}
Contoh: Memfilter menurut tag transaksi
Saat transaksi yang mengubah data pengguna diberi tag, tag yang sesuai dan jenisnya akan disimpan sebagai bagian dari DataChangeRecord. Contoh ini menunjukkan cara memfilter rekaman aliran perubahan berdasarkan tag transaksi yang ditentukan pengguna serta tag sistem:
Pemfilteran tag yang ditentukan pengguna untuk my-tx-tag:
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(Filter.by(record ->
!record.isSystemTransaction()
&& record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
// Subsequent processing goes here
Pemfilteran tag sistem/audit TTL:
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(Filter.by(record ->
record.isSystemTransaction()
&& record.getTransactionTag().equals("RowDeletionPolicy")))
// Subsequent processing goes here
Contoh: Ambil seluruh baris
Contoh ini berfungsi dengan tabel Spanner bernama Singer yang memiliki definisi berikut:
CREATE TABLE Singers (
SingerId INT64 NOT NULL,
FirstName STRING(1024),
LastName STRING(1024)
) PRIMARY KEY (SingerId);
Dalam mode pengambilan nilai OLD_AND_NEW_VALUES default dari aliran perubahan,
saat ada update pada baris Spanner, catatan perubahan data
yang diterima hanya akan berisi kolom yang diubah. Kolom yang dilacak tetapi tidak berubah tidak akan disertakan dalam rekaman. Kunci utama mod dapat digunakan untuk melakukan pembacaan snapshot Spanner pada stempel waktu commit dari catatan perubahan data untuk mengambil kolom yang tidak berubah atau bahkan mengambil seluruh baris.
Perhatikan bahwa kebijakan retensi database mungkin perlu diubah ke nilai yang lebih besar atau sama dengan kebijakan retensi aliran perubahan agar pembacaan snapshot berhasil.
Perhatikan juga bahwa penggunaan jenis pengambilan nilai NEW_ROW adalah cara yang direkomendasikan dan lebih efisien untuk melakukannya, karena jenis ini menampilkan semua kolom yang dilacak dari baris secara default dan tidak memerlukan pembacaan snapshot tambahan ke Spanner.
SpannerConfig spannerConfig = SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role"); // Needed for fine-grained access control only
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(spannerConfig)
// Assume we have a change stream "my-change-stream" that watches Singers table.
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
// Subsequent processing goes here
ToFullRowJsonFn
Transformasi ini akan melakukan pembacaan yang tidak valid pada stempel waktu commit setiap data yang diterima, dan memetakan seluruh baris ke JSON.
public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
// Since each instance of this DoFn will create its own session pool and will
// perform calls to Spanner sequentially, we keep the number of sessions in
// the pool small. This way, we avoid wasting resources.
private static final int MIN_SESSIONS = 1;
private static final int MAX_SESSIONS = 5;
private final String projectId;
private final String instanceId;
private final String databaseId;
private transient DatabaseClient client;
private transient Spanner spanner;
public ToFullRowJsonFn(SpannerConfig spannerConfig) {
this.projectId = spannerConfig.getProjectId().get();
this.instanceId = spannerConfig.getInstanceId().get();
this.databaseId = spannerConfig.getDatabaseId().get();
}
@Setup
public void setup() {
SessionPoolOptions sessionPoolOptions = SessionPoolOptions
.newBuilder()
.setMinSessions(MIN_SESSIONS)
.setMaxSessions(MAX_SESSIONS)
.build();
SpannerOptions options = SpannerOptions
.newBuilder()
.setProjectId(projectId)
.setSessionPoolOption(sessionPoolOptions)
.build();
DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
spanner = options.getService();
client = spanner.getDatabaseClient(id);
}
@Teardown
public void teardown() {
spanner.close();
}
@ProcessElement
public void process(
@Element DataChangeRecord element,
OutputReceiver<String> output) {
com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
element.getMods().forEach(mod -> {
JSONObject keysJson = new JSONObject(mod.getKeysJson());
JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
ModType modType = element.getModType();
JSONObject jsonRow = new JSONObject();
long singerId = keysJson.getLong("SingerId");
jsonRow.put("SingerId", singerId);
if (modType == ModType.INSERT) {
// For INSERT mod, get non-primary key columns from mod.
jsonRow.put("FirstName", newValuesJson.get("FirstName"));
jsonRow.put("LastName", newValuesJson.get("LastName"));
} else if (modType == ModType.UPDATE) {
// For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
try (ResultSet resultSet = client
.singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
.read(
"Singers",
KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
Arrays.asList("FirstName", "LastName"))) {
if (resultSet.next()) {
jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
JSONObject.NULL : resultSet.getString("FirstName"));
jsonRow.put("LastName", resultSet.isNull("LastName") ?
JSONObject.NULL : resultSet.getString("LastName"));
}
}
} else {
// For DELETE mod, there is nothing to do, as we already set SingerId.
}
output.output(jsonRow.toString());
});
}
}
Kode ini membuat klien database Spanner untuk melakukan pengambilan baris lengkap, dan mengonfigurasi kumpulan sesi agar hanya memiliki beberapa sesi, yang melakukan pembacaan dalam satu instance ToFullReowJsonFn secara berurutan.
Dataflow memastikan untuk memunculkan banyak instance fungsi ini, masing-masing dengan kumpulan kliennya sendiri.
Contoh: Spanner ke Pub/Sub
Dalam skenario ini, pemanggil melakukan streaming rekaman ke Pub/Sub secepat mungkin, tanpa pengelompokan atau penggabungan. Hal ini cocok untuk memicu pemrosesan downstream, seperti streaming semua baris baru yang dimasukkan ke dalam tabel Spanner ke Pub/Sub untuk pemrosesan lebih lanjut.
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
.apply(PubsubIO.writeStrings().to("my-topic"));
Perhatikan bahwa sink Pub/Sub dapat dikonfigurasi untuk memastikan semantik tepat satu kali.
Contoh: Spanner ke Cloud Storage
Dalam skenario ini, pemanggil mengelompokkan semua rekaman dalam jendela tertentu dan menyimpan grup tersebut dalam file Cloud Storage terpisah. Hal ini cocok untuk analisis dan pengarsipan pada satu titik waktu, yang terpisah dari periode retensi Spanner.
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(TextIO
.write()
.to("gs://my-bucket/change-stream-results-")
.withSuffix(".txt")
.withWindowedWrites()
.withNumShards(1));
Perhatikan bahwa sink Cloud Storage menyediakan semantik setidaknya sekali secara default. Dengan pemrosesan tambahan, pesan dapat diubah agar memiliki semantik tepat satu kali.
Kami juga menyediakan template Dataflow untuk kasus penggunaan ini: lihat Menghubungkan aliran perubahan ke Cloud Storage.
Contoh: Spanner ke BigQuery (tabel buku besar)
Di sini, pemanggil melakukan streaming rekaman perubahan ke BigQuery. Setiap catatan perubahan data ditampilkan sebagai satu baris di BigQuery. Ini sangat cocok untuk analisis. Kode ini menggunakan fungsi yang ditentukan sebelumnya, di bagian Fetch full row, untuk mengambil seluruh baris data dan menuliskannya ke BigQuery.
SpannerConfig spannerConfig = SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role"); // Needed for fine-grained access control only
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(spannerConfig)
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
.apply(BigQueryIO
.<String>write()
.to("my-bigquery-table")
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
.withSchema(new TableSchema().setFields(Arrays.asList(
new TableFieldSchema()
.setName("SingerId")
.setType("INT64")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("FirstName")
.setType("STRING")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("LastName")
.setType("STRING")
.setMode("REQUIRED")
)))
.withAutoSharding()
.optimizedWrites()
.withFormatFunction((String element) -> {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = null;
try {
jsonNode = objectMapper.readTree(element);
} catch (IOException e) {
e.printStackTrace();
}
return new TableRow()
.set("SingerId", jsonNode.get("SingerId").asInt())
.set("FirstName", jsonNode.get("FirstName").asText())
.set("LastName", jsonNode.get("LastName").asText());
}
)
);
Perhatikan bahwa sink BigQuery menyediakan semantik setidaknya sekali secara default. Dengan pemrosesan tambahan, pesan dapat diubah agar memiliki semantik tepat satu kali.
Kami juga menyediakan template Dataflow untuk kasus penggunaan ini; lihat Menghubungkan aliran perubahan ke BigQuery.
Memantau pipeline
Ada dua kelas metrik yang tersedia untuk memantau pipeline Dataflow aliran perubahan.
Metrik Dataflow standar
Dataflow menyediakan beberapa metrik untuk memastikan tugas Anda berjalan dengan baik, seperti keaktualan data, kelambatan sistem, throughput tugas, pemakaian CPU worker, dan lainnya. Anda dapat menemukan informasi selengkapnya di Menggunakan Monitoring untuk pipeline Dataflow.
Untuk pipeline aliran perubahan, ada dua metrik utama yang harus diperhitungkan: latensi sistem dan keaktualan data.
Latensi sistem akan memberi tahu Anda durasi waktu maksimum saat ini (dalam detik) item data diproses atau menunggu pemrosesan.
Keaktualan data akan menunjukkan jumlah waktu antara sekarang (real time) dan watermark output. Stempel waktu output T menunjukkan bahwa semua elemen dengan waktu peristiwa (tepat) sebelum T telah diproses untuk perhitungan. Dengan kata lain, keaktualan data mengukur seberapa baru pipeline, terkait pemrosesan peristiwa yang telah diterimanya.
Jika pipeline kekurangan sumber daya, Anda dapat melihat efek tersebut dalam dua metrik ini. Latensi sistem akan meningkat, karena item harus menunggu lebih lama sebelum diproses. Keaktualan data juga akan meningkat karena pipeline tidak akan dapat mengimbangi jumlah data yang diterima.
Metrik aliran perubahan kustom
Metrik ini ditampilkan di Cloud Monitoring dan mencakup:
- Latensi yang dikelompokkan (histogram) antara saat suatu rekaman di-commit di Spanner hingga saat rekaman tersebut dikeluarkan ke dalam PCollection oleh konektor. Metrik ini dapat digunakan untuk melihat masalah performa (latensi) pada pipeline.
- Jumlah total catatan data yang dibaca. Ini adalah indikasi keseluruhan jumlah rekaman yang dikeluarkan oleh konektor. Jumlah ini harus terus meningkat, mencerminkan tren penulisan di database Spanner yang mendasarinya.
- Jumlah partisi yang sedang dibaca. Harus selalu ada partisi yang dibaca. Jika angka ini nol, berarti terjadi error dalam pipeline.
- Total jumlah kueri yang dikeluarkan selama eksekusi konektor. Ini adalah indikasi keseluruhan kueri aliran perubahan yang dibuat ke instance Spanner selama eksekusi pipeline. Metrik ini dapat digunakan untuk memperkirakan beban dari konektor ke database Spanner.
Memperbarui pipeline yang ada
Pipeline yang sedang berjalan yang menggunakan konektor SpannerIO untuk memproses aliran perubahan dapat diupdate jika pemeriksaan
kompatibilitas tugas berhasil. Untuk melakukannya, Anda harus menetapkan parameter nama tabel metadata tugas baru secara eksplisit saat memperbaruinya. Gunakan nilai opsi pipeline metadataTable dari tugas yang Anda perbarui.
Jika Anda menggunakan template Dataflow yang disediakan Google, tetapkan
nama tabel menggunakan parameter spannerMetadataTableName. Anda juga dapat mengubah
tugas yang ada untuk menggunakan tabel metadata secara eksplisit dengan metode
withMetadataTable(your-metadata-table-name) dalam
konfigurasi konektor. Setelah selesai, Anda dapat mengikuti
petunjuk di Meluncurkan tugas pengganti dari
dokumentasi Dataflow untuk memperbarui tugas yang sedang berjalan.
Praktik terbaik untuk aliran perubahan dan Dataflow
Berikut adalah beberapa praktik terbaik untuk membangun koneksi aliran perubahan dengan menggunakan Dataflow.
Menggunakan database metadata terpisah
Sebaiknya buat database terpisah untuk konektor SpannerIO yang akan digunakan untuk penyimpanan metadata, daripada mengonfigurasinya untuk menggunakan database aplikasi Anda.
Untuk mengetahui informasi selengkapnya, lihat Mempertimbangkan database metadata terpisah.
Menentukan ukuran cluster
Aturan praktis untuk jumlah awal pekerja dalam tugas aliran perubahan Spanner adalah satu pekerja per 1.000 penulisan per detik. Perhatikan bahwa perkiraan ini dapat bervariasi bergantung pada beberapa faktor, seperti ukuran setiap transaksi, jumlah rekaman aliran perubahan yang dihasilkan dari satu transaksi dan transformasi, agregasi, atau tujuan lainnya yang digunakan dalam pipeline.
Setelah penyiapan sumber daya awal, penting untuk melacak metrik yang disebutkan dalam Memantau pipeline, untuk memastikan kondisi pipeline baik. Sebaiknya lakukan eksperimen dengan ukuran pool pekerja awal dan pantau cara pipeline menangani beban, dengan menambah jumlah node jika perlu. Penggunaan CPU adalah metrik utama untuk memeriksa apakah beban sudah tepat dan apakah diperlukan lebih banyak node.
Batasan umum
Ada beberapa batasan umum saat menggunakan aliran data perubahan Spanner dengan Dataflow:
Penskalaan otomatis
Dukungan penskalaan otomatis untuk pipeline apa pun yang menyertakan SpannerIO.readChangeStream
memerlukan Apache Beam 2.39.0 atau yang lebih tinggi.
Jika Anda menggunakan Apache Beam versi sebelum 2.39.0, pipeline yang menyertakan
SpannerIO.readChangeStream harus secara eksplisit menentukan algoritma
penskalaan otomatis sebagai NONE, seperti yang dijelaskan dalam Penskalaan otomatis horizontal.
Untuk menskalakan pipeline Dataflow secara manual, bukan menggunakan penskalaan otomatis, lihat Menskalakan pipeline streaming secara manual.
Runner V2
Konektor aliran perubahan Spanner memerlukan
Dataflow Runner V2.
Hal ini harus ditentukan secara manual selama eksekusi atau error akan terjadi. Anda dapat menentukan Runner V2 dengan mengonfigurasi tugas menggunakan
--experiments=use_unified_worker,use_runner_v2.
Snapshot
Konektor aliran perubahan Spanner tidak mendukung Snapshot Dataflow.
Menyelesaikan
Konektor aliran perubahan Spanner tidak mendukung menghentikan tugas. Anda hanya dapat membatalkan tugas yang ada.
Anda juga dapat memperbarui pipeline yang ada tanpa perlu menghentikannya.
OpenCensus
Untuk menggunakan OpenCensus guna memantau pipeline, tentukan versi 0.28.3 atau yang lebih baru.
NullPointerException saat pipeline dimulai
Bug di Apache Beam versi 2.38.0 dapat menyebabkan NullPointerException saat memulai pipeline dalam kondisi tertentu. Tindakan ini akan mencegah tugas Anda dimulai, dan menampilkan pesan error ini:
java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null
Untuk mengatasi masalah ini, gunakan Apache Beam versi 2.39.0 atau yang lebih baru, atau
tentukan versi beam-sdks-java-core secara manual sebagai 2.37.0:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.37.0</version>
</dependency>