Membangun koneksi aliran perubahan menggunakan Dataflow

Halaman ini menunjukkan cara membuat pipeline Dataflow yang menggunakan dan meneruskan data perubahan Spanner dengan menggunakan aliran perubahan. Anda dapat menggunakan contoh kode di halaman ini untuk membangun pipeline kustom.

Konsep inti

Berikut adalah beberapa konsep inti untuk pipeline Dataflow untuk aliran perubahan.

Dataflow

Dataflow adalah layanan serverless, cepat, dan hemat biaya yang mendukung pemrosesan streaming dan batch. Layanan ini menyediakan portabilitas dengan tugas pemrosesan yang ditulis menggunakan library open source Apache Beam dan mengotomatiskan penyediaan infrastruktur serta pengelolaan cluster. Dataflow menyediakan streaming mendekati real-time saat membaca dari aliran data perubahan.

Anda dapat menggunakan Dataflow untuk menggunakan aliran perubahan Spanner dengan konektor SpannerIO, yang menawarkan abstraksi melalui Spanner API untuk membuat kueri aliran perubahan. Dengan konektor ini, Anda tidak perlu mengelola siklus proses partisi aliran perubahan, yang diperlukan saat Anda menggunakan Spanner API secara langsung. Konektor ini memberi Anda aliran data perubahan catatan sehingga Anda dapat lebih fokus pada logika aplikasi, dan tidak terlalu fokus pada detail API tertentu dan partisi aliran perubahan dinamis. Sebaiknya gunakan konektor SpannerIO, bukan Spanner API dalam sebagian besar situasi saat Anda perlu membaca data aliran perubahan.

Template Dataflow adalah pipeline Dataflow siap pakai yang menerapkan kasus penggunaan umum. Lihat Template Dataflow untuk mengetahui ringkasannya.

Pipeline Dataflow

Pipeline Dataflow aliran perubahan Spanner terdiri dari empat bagian utama:

  1. Database Spanner dengan aliran perubahan
  2. Konektor SpannerIO
  3. Transformasi dan sink buatan pengguna
  4. Penulis I/O sink Apache Beam

gambar

Aliran data perubahan Spanner

Untuk mengetahui detail tentang cara membuat aliran perubahan, lihat Membuat aliran perubahan.

Konektor Apache Beam SpannerIO

Ini adalah konektor SpannerIO yang dijelaskan di bagian Dataflow sebelumnya. Konektor ini adalah konektor I/O sumber yang memancarkan PCollection rekaman perubahan data ke tahap selanjutnya dalam pipeline. Waktu peristiwa untuk setiap rekaman perubahan data yang dipancarkan adalah stempel waktu penerapan. Perhatikan bahwa rekaman yang dikeluarkan tidak berurutan, dan konektor SpannerIO menjamin tidak akan ada rekaman yang terlambat.

Saat bekerja dengan aliran perubahan, Dataflow menggunakan pembuatan titik pemeriksaan. Akibatnya, setiap pekerja mungkin menunggu hingga interval titik pemeriksaan yang dikonfigurasi untuk mem-buffer perubahan sebelum mengirim perubahan untuk diproses lebih lanjut.

Transformasi yang ditentukan pengguna

Transformasi yang ditentukan pengguna memungkinkan pengguna menggabungkan, mengubah, atau memodifikasi data pemrosesan dalam pipeline Dataflow. Kasus penggunaan umum untuk hal ini adalah penghapusan informasi identitas pribadi, pemenuhan persyaratan format data hilir, dan pengurutan. Lihat dokumentasi resmi Apache Beam untuk panduan pemrograman tentang transformasi.

Penulis I/O sink Apache Beam

Apache Beam berisi konektor I/O bawaan yang dapat digunakan untuk menulis dari pipeline Dataflow ke sink data seperti BigQuery. Sebagian besar tujuan data umum didukung secara native.

Template dataflow

Template Dataflow menyediakan metode untuk membuat tugas Dataflow berdasarkan image Docker bawaan untuk kasus penggunaan umum menggunakan konsol Google Cloud , CLI Google Cloud , atau panggilan Rest API.

Untuk aliran perubahan Spanner, kami menyediakan tiga template flex Dataflow:

Batasan berikut berlaku saat Anda menggunakan template Spanner change streams to Pub/Sub:

Menetapkan Izin IAM untuk template Dataflow

Sebelum membuat tugas Dataflow dengan tiga template fleksibel yang tercantum, pastikan Anda memiliki izin IAM yang diperlukan untuk akun layanan berikut:

Jika tidak memiliki izin IAM yang diperlukan, Anda harus menentukan akun layanan worker yang dikelola pengguna untuk membuat tugas Dataflow. Untuk mengetahui informasi selengkapnya, lihat Keamanan dan izin Dataflow.

Saat Anda mencoba menjalankan tugas dari template fleksibel Dataflow tanpa semua izin yang diperlukan, tugas Anda mungkin gagal dengan error failed to read the result file error atau permission denied on resource error. Untuk mengetahui informasi selengkapnya, lihat Memecahkan masalah Template Flex.

Membangun pipeline Dataflow

Bagian ini membahas konfigurasi awal konektor, dan memberikan contoh untuk integrasi umum dengan fitur aliran perubahan Spanner.

Untuk mengikuti langkah-langkah ini, Anda memerlukan lingkungan pengembangan Java untuk Dataflow. Untuk mengetahui informasi selengkapnya, lihat Membuat pipeline Dataflow menggunakan Java.

Membuat aliran perubahan

Untuk mengetahui detail tentang cara membuat aliran perubahan, lihat Membuat aliran perubahan. Untuk melanjutkan langkah berikutnya, Anda harus memiliki database Spanner dengan aliran perubahan yang dikonfigurasi.

Memberikan hak istimewa kontrol akses yang sangat terperinci

Jika Anda ingin pengguna kontrol akses terperinci menjalankan tugas Dataflow, pastikan pengguna diberi akses ke peran database yang memiliki hak istimewa SELECT pada aliran perubahan dan hak istimewa EXECUTE pada fungsi bernilai tabel aliran perubahan. Pastikan juga bahwa principal menentukan peran database dalam konfigurasi SpannerIO atau dalam template flex Dataflow.

Untuk mengetahui informasi selengkapnya, lihat Tentang kontrol akses terperinci.

Tambahkan konektor SpannerIO sebagai dependensi

Konektor Apache Beam SpannerIO merangkum kompleksitas penggunaan aliran perubahan secara langsung menggunakan Cloud Spanner API, yang memancarkan PCollection dari rekaman data aliran perubahan ke tahap selanjutnya dalam pipeline.

Objek ini dapat digunakan di tahap lain dalam pipeline Dataflow pengguna. Integrasi aliran data perubahan adalah bagian dari konektor SpannerIO. Agar dapat menggunakan konektor SpannerIO, dependensi harus ditambahkan ke file pom.xml Anda:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>

Membuat database metadata

Konektor perlu melacak setiap partisi saat menjalankan pipeline Apache Beam. Metadata ini disimpan dalam tabel Spanner yang dibuat oleh konektor selama inisialisasi. Anda menentukan database tempat tabel ini akan dibuat saat mengonfigurasi konektor.

Seperti yang dijelaskan dalam Praktik terbaik aliran perubahan, sebaiknya buat database baru untuk tujuan ini, daripada mengizinkan konektor menggunakan database aplikasi Anda untuk menyimpan tabel metadatanya.

Pemilik tugas Dataflow yang menggunakan konektor SpannerIO harus memiliki izin IAM berikut yang ditetapkan dengan database metadata ini:

  • spanner.databases.updateDdl
  • spanner.databases.beginReadOnlyTransaction
  • spanner.databases.beginOrRollbackReadWriteTransaction
  • spanner.databases.read
  • spanner.databases.select
  • spanner.databases.write
  • spanner.sessions.create
  • spanner.sessions.get

Mengonfigurasi konektor

Konektor aliran data perubahan Spanner dapat dikonfigurasi sebagai berikut:

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");    // Needed for fine-grained access control only

Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
   startTime.getSeconds() + (10 * 60),
   startTime.getNanos()
);

SpannerIO
  .readChangeStream()
  .withSpannerConfig(spannerConfig)
  .withChangeStreamName("my-change-stream")
  .withMetadataInstance("my-meta-instance-id")
  .withMetadataDatabase("my-meta-database-id")
  .withMetadataTable("my-meta-table-name")
  .withRpcPriority(RpcPriority.MEDIUM)
  .withInclusiveStartAt(startTime)
  .withInclusiveEndAt(endTime);

Berikut adalah deskripsi opsi readChangeStream():

Konfigurasi Spanner (Wajib)

Digunakan untuk mengonfigurasi project, instance, dan database tempat aliran perubahan dibuat dan harus dikueri. Juga secara opsional menentukan peran database yang akan digunakan saat pokok IAM yang menjalankan tugas Dataflow adalah pengguna kontrol akses terperinci. Tugas mengasumsikan peran database ini untuk akses ke aliran perubahan. Untuk mengetahui informasi selengkapnya, lihat Tentang kontrol akses terperinci.

Nama aliran perubahan (Wajib diisi)

Nama ini mengidentifikasi aliran perubahan secara unik. Nama di sini harus sama dengan nama yang digunakan saat membuatnya.

ID instance metadata (Opsional)

Ini adalah instance untuk menyimpan metadata yang digunakan oleh konektor untuk mengontrol penggunaan data API aliran perubahan.

ID database metadata (Wajib)

Ini adalah database untuk menyimpan metadata yang digunakan oleh konektor untuk mengontrol penggunaan data API aliran perubahan.

Nama tabel metadata (Opsional)

Tindakan ini hanya boleh digunakan saat memperbarui pipeline yang ada.

Ini adalah nama tabel metadata yang sudah ada dan akan digunakan oleh konektor. Ini digunakan oleh konektor untuk menyimpan metadata guna mengontrol penggunaan data API aliran perubahan. Jika opsi ini tidak ada, Spanner akan membuat tabel baru dengan nama yang dibuat saat inisialisasi konektor.

Prioritas RPC (Opsional)

Prioritas permintaan yang akan digunakan untuk kueri aliran perubahan. Jika parameter ini dihilangkan, high priority akan digunakan.

InclusiveStartAt (Wajib)

Perubahan dari stempel waktu yang diberikan ditampilkan kepada pemanggil.

InclusiveEndAt (Opsional)

Perubahan hingga stempel waktu tertentu akan dikembalikan ke pemanggil. Jika parameter ini dihilangkan, perubahan akan dipancarkan tanpa batas.

Menambahkan transformasi dan sink untuk memproses data perubahan

Setelah langkah-langkah sebelumnya selesai, konektor SpannerIO yang dikonfigurasi siap untuk memancarkan PCollection objek DataChangeRecord. Lihat Contoh transformasi dan sink untuk beberapa contoh konfigurasi pipeline yang memproses data streaming ini dengan berbagai cara.

Perhatikan bahwa rekaman aliran perubahan yang dikeluarkan oleh konektor SpannerIO tidak berurutan. Hal ini karena PCollection tidak memberikan jaminan pengurutan apa pun. Jika Anda memerlukan streaming yang diurutkan, Anda harus mengelompokkan dan mengurutkan rekaman sebagai transformasi dalam pipeline: lihat Contoh: Urutkan menurut kunci. Anda dapat memperluas contoh ini untuk mengurutkan data berdasarkan kolom data apa pun, seperti berdasarkan ID transaksi.

Contoh transformasi dan tujuan

Anda dapat menentukan transformasi Anda sendiri dan menentukan tujuan untuk menulis data. Dokumentasi Apache Beam menyediakan berbagai transformasi yang dapat diterapkan, serta konektor I/O siap pakai untuk menulis data ke dalam sistem eksternal.

Contoh: Urutkan berdasarkan kunci

Contoh kode ini memancarkan rekaman perubahan data yang diurutkan berdasarkan stempel waktu commit dan dikelompokkan berdasarkan kunci utama menggunakan konektor Dataflow.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new BreakRecordByModFn()))
  .apply(ParDo.of(new KeyByIdFn()))
  .apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
  // Subsequent processing goes here

Contoh kode ini menggunakan status dan timer untuk menyimpan data dalam buffer untuk setiap kunci, dan menetapkan waktu habis masa berlaku timer ke beberapa waktu T yang dikonfigurasi pengguna di masa mendatang (ditentukan dalam fungsi BufferKeyUntilOutputTimestamp). Saat tanda air Dataflow melewati waktu T, kode ini akan menghapus semua data dalam buffer dengan stempel waktu kurang dari T, mengurutkan data ini berdasarkan stempel waktu penerapan, dan menampilkan pasangan nilai kunci dengan:

  • Kuncinya adalah kunci input, yaitu kunci utama yang di-hash ke array bucket berukuran 1.000.
  • Nilainya adalah record perubahan data yang diurutkan dan di-buffer untuk kunci.

Untuk setiap kunci, kami memiliki jaminan berikut:

  • Timer dijamin akan diaktifkan sesuai urutan stempel waktu masa berlaku.
  • Tahap hilir dijamin akan menerima elemen dalam urutan yang sama dengan urutan pembuatannya.

Misalnya, dengan kunci nilai 100, timer akan diaktifkan pada T1 dan T10, sehingga menghasilkan kumpulan data perubahan pada setiap stempel waktu. Karena catatan perubahan data yang dihasilkan pada T1 dibuat sebelum catatan perubahan data yang dihasilkan pada T10, catatan perubahan data yang dihasilkan pada T1 juga dijamin akan diterima oleh tahap berikutnya sebelum catatan perubahan data yang dihasilkan pada T10. Mekanisme ini membantu kami menjamin pengurutan stempel waktu commit yang ketat per kunci utama untuk pemrosesan hilir.

Proses ini akan berulang hingga pipeline berakhir dan semua rekaman perubahan data telah diproses (atau akan berulang tanpa batas jika tidak ada waktu berakhir yang ditentukan).

Perhatikan bahwa contoh kode ini menggunakan status dan timer, bukan jendela, untuk melakukan pengurutan per kunci. Alasannya adalah jendela tidak dijamin diproses secara berurutan. Artinya, jendela yang lebih lama dapat diproses setelah jendela yang lebih baru, yang dapat menyebabkan pemrosesan tidak berurutan.

BreakRecordByModFn

Setiap catatan perubahan data dapat berisi beberapa modifikasi. Setiap mod mewakili penyisipan, pembaruan, atau penghapusan ke satu nilai kunci utama. Fungsi ini memecah setiap rekaman perubahan data menjadi rekaman perubahan data terpisah, satu per modifikasi.

private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
                                                     DataChangeRecord>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
    outputReceiver) {
    record.getMods().stream()
      .map(
          mod ->
              new DataChangeRecord(
                  record.getPartitionToken(),
                  record.getCommitTimestamp(),
                  record.getServerTransactionId(),
                  record.isLastRecordInTransactionInPartition(),
                  record.getRecordSequence(),
                  record.getTableName(),
                  record.getRowType(),
                  Collections.singletonList(mod),
                  record.getModType(),
                  record.getValueCaptureType(),
                  record.getNumberOfRecordsInTransaction(),
                  record.getNumberOfPartitionsInTransaction(),
                  record.getTransactionTag(),
                  record.isSystemTransaction(),
                  record.getMetadata()))
      .forEach(outputReceiver::output);
  }
}

KeyByIdFn

Fungsi ini menggunakan DataChangeRecord dan menghasilkan DataChangeRecord yang dikunci oleh kunci utama Spanner yang di-hash ke nilai bilangan bulat.

private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  // NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
  // Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
  // of states and timers for performance purposes.
  // Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
  // On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
  private static final int NUMBER_OF_BUCKETS = 1000;

  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
    // Hash the received keys into a bucket in order to have a
    // deterministic number of buffers and timers.
    String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);

    outputReceiver.output(KV.of(bucketIndex, record));
  }
}

BufferKeyUntilOutputTimestamp

Timer dan buffer adalah per kunci. Fungsi ini menyimpan setiap rekaman perubahan data hingga watermark melewati stempel waktu saat kita ingin menampilkan rekaman perubahan data yang disimpan.

Kode ini menggunakan timer perulangan untuk menentukan kapan harus mengosongkan buffer:

  1. Saat melihat catatan perubahan data untuk kunci pertama kalinya, timer akan disetel untuk diaktifkan pada stempel waktu penerapan catatan perubahan data + incrementIntervalSeconds (opsi yang dapat dikonfigurasi pengguna).
  2. Saat timer diaktifkan, timer akan menambahkan semua catatan perubahan data dalam buffer dengan stempel waktu yang kurang dari waktu habis masa berlaku timer ke recordsToOutput. Jika buffer memiliki catatan perubahan data yang stempel waktunya lebih besar dari atau sama dengan waktu habis masa berlaku timer, catatan perubahan data tersebut akan ditambahkan kembali ke buffer, bukan dikeluarkan. Kemudian, timer berikutnya disetel ke waktu habis masa berlaku timer saat ini ditambah incrementIntervalInSeconds.
  3. Jika recordsToOutput tidak kosong, fungsi akan mengurutkan rekaman perubahan data di recordsToOutput berdasarkan stempel waktu penerapan dan ID transaksi, lalu menampilkannya.
private static class BufferKeyUntilOutputTimestamp extends
    DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>>  {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);

  private final long incrementIntervalInSeconds = 2;

  private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
    this.incrementIntervalInSeconds = incrementIntervalInSeconds;
  }

  @SuppressWarnings("unused")
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("keyString")
  private final StateSpec<ValueState<String>> keyString =
      StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void process(
      @Element KV<String, DataChangeRecord> element,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    buffer.add(element.getValue());

    // Only set the timer if this is the first time we are receiving a data change
    // record with this key.
    String elementKey = keyString.read();
    if (elementKey == null) {
      Instant commitTimestamp =
          new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
      Instant outputTimestamp =
          commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
      timer.set(outputTimestamp);
      keyString.write(element.getKey());
    }
  }

  @OnTimer("timer")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    if (!buffer.isEmpty().read()) {
      String elementKey = keyString.read();

      final List<DataChangeRecord> records =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .collect(Collectors.toList());
      buffer.clear();

      List<DataChangeRecord> recordsToOutput = new ArrayList<>();
      for (DataChangeRecord record : records) {
        Instant recordCommitTimestamp =
            new Instant(record.getCommitTimestamp().toSqlTimestamp());
        final String recordString =
            record.getMods().get(0).getNewValuesJson().isEmpty()
                ? "Deleted record"
                : record.getMods().get(0).getNewValuesJson();
        // When the watermark passes time T, this means that all records with
        // event time < T have been processed and successfully committed. Since the
        // timer fires when the watermark passes the expiration time, we should
        // only output records with event time < expiration time.
        if (recordCommitTimestamp.isBefore(context.timestamp())) {
          LOG.info(
             "Outputting record with key {} and value {} at expiration " +
             "timestamp {}",
              elementKey,
              recordString,
              context.timestamp().toString());
          recordsToOutput.add(record);
        } else {
          LOG.info(
              "Expired at {} but adding record with key {} and value {} back to " +
              "buffer due to commit timestamp {}",
              context.timestamp().toString(),
              elementKey,
              recordString,
              recordCommitTimestamp.toString());
          buffer.add(record);
        }
      }

      // Output records, if there are any to output.
      if (!recordsToOutput.isEmpty()) {
        // Order the records in place, and output them. The user would need
        // to implement DataChangeRecordComparator class that sorts the
        // data change records by commit timestamp and transaction ID.
        Collections.sort(recordsToOutput, new DataChangeRecordComparator());
        context.outputWithTimestamp(
            KV.of(elementKey, recordsToOutput), context.timestamp());
        LOG.info(
            "Expired at {}, outputting records for key {}",
            context.timestamp().toString(),
            elementKey);
      } else {
        LOG.info("Expired at {} with no records", context.timestamp().toString());
      }
    }

    Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
    if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
      LOG.info("Setting next timer to {}", nextTimer.toString());
      timer.set(nextTimer);
    } else {
      LOG.info(
          "Timer not being set since the buffer is empty: ");
      keyString.clear();
    }
  }
}

Mengurutkan transaksi

Pipeline ini dapat diubah untuk mengurutkan berdasarkan ID transaksi dan stempel waktu penerapan. Untuk melakukannya, simpan sementara data untuk setiap pasangan ID transaksi / stempel waktu commit, bukan untuk setiap kunci Spanner. Hal ini memerlukan modifikasi kode di KeyByIdFn.

Contoh: Menggabungkan transaksi

Contoh kode ini membaca catatan perubahan data, mengumpulkan semua catatan perubahan data yang termasuk dalam transaksi yang sama menjadi satu elemen, dan menghasilkan elemen tersebut. Perhatikan bahwa transaksi yang dihasilkan oleh kode contoh ini tidak diurutkan berdasarkan stempel waktu penerapan.

Contoh kode ini menggunakan buffer untuk menyusun transaksi dari rekaman perubahan data. Setelah menerima catatan perubahan data yang termasuk dalam transaksi untuk pertama kalinya, operasi ini akan membaca kolom numberOfRecordsInTransaction dalam catatan perubahan data, yang menjelaskan perkiraan jumlah catatan perubahan data yang termasuk dalam transaksi tersebut. Proses ini akan menyimpan sementara catatan perubahan data yang termasuk dalam transaksi tersebut hingga jumlah catatan yang disimpan sementara cocok dengan numberOfRecordsInTransaction, lalu menampilkan catatan perubahan data yang dikelompokkan.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new KeyByTransactionIdFn()))
  .apply(ParDo.of(new TransactionBoundaryFn()))
  // Subsequent processing goes here

KeyByTransactionIdFn

Fungsi ini menerima DataChangeRecord dan menghasilkan DataChangeRecord yang dikunci oleh ID transaksi.

private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    outputReceiver.output(KV.of(record.getServerTransactionId(), record));
  }
}

TransactionBoundaryFn

Buffer TransactionBoundaryFn menerima key-value pair {TransactionId, DataChangeRecord} dari KeyByTransactionIdFn dan mem-buffer-nya dalam grup berdasarkan TransactionId. Jika jumlah data yang di-buffer sama dengan jumlah data yang ada dalam seluruh transaksi, fungsi ini akan mengurutkan objek DataChangeRecord dalam grup berdasarkan urutan data dan menghasilkan key-value pair {CommitTimestamp, TransactionId}, Iterable<DataChangeRecord>.

Di sini, kita mengasumsikan bahwa SortKey adalah class yang ditentukan pengguna yang merepresentasikan pasangan {CommitTimestamp, TransactionId}. Untuk mengetahui informasi selengkapnya tentang SortKey, lihat contoh penerapan.

private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>>  {
  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @StateId("count") ValueState<Integer> countState) {
    final KV<String, DataChangeRecord> element = context.element();
    final DataChangeRecord record = element.getValue();

    buffer.add(record);
    int count = (countState.read() != null ? countState.read() : 0);
    count = count + 1;
    countState.write(count);

    if (count == record.getNumberOfRecordsInTransaction()) {
      final List<DataChangeRecord> sortedRecords =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
              .collect(Collectors.toList());

      final Instant commitInstant =
          new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
              .getTime());
      context.outputWithTimestamp(
          KV.of(
              new SortKey(sortedRecords.get(0).getCommitTimestamp(),
                          sortedRecords.get(0).getServerTransactionId()),
              sortedRecords),
          commitInstant);
      buffer.clear();
      countState.clear();
    }
  }
}

Contoh: Memfilter menurut tag transaksi

Saat transaksi yang mengubah data pengguna diberi tag, tag yang sesuai dan jenisnya akan disimpan sebagai bagian dari DataChangeRecord. Contoh ini menunjukkan cara memfilter rekaman aliran perubahan berdasarkan tag transaksi yang ditentukan pengguna serta tag sistem:

Pemfilteran tag yang ditentukan pengguna untuk my-tx-tag:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           !record.isSystemTransaction()
           && record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
  // Subsequent processing goes here

Pemfilteran tag sistem/audit TTL:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           record.isSystemTransaction()
           && record.getTransactionTag().equals("RowDeletionPolicy")))
  // Subsequent processing goes here

Contoh: Ambil seluruh baris

Contoh ini berfungsi dengan tabel Spanner bernama Singer yang memiliki definisi berikut:

CREATE TABLE Singers (
  SingerId INT64 NOT NULL,
  FirstName STRING(1024),
  LastName STRING(1024)
) PRIMARY KEY (SingerId);

Dalam mode pengambilan nilai OLD_AND_NEW_VALUES default dari aliran perubahan, saat ada update pada baris Spanner, catatan perubahan data yang diterima hanya akan berisi kolom yang diubah. Kolom yang dilacak tetapi tidak berubah tidak akan disertakan dalam rekaman. Kunci utama mod dapat digunakan untuk melakukan pembacaan snapshot Spanner pada stempel waktu commit dari catatan perubahan data untuk mengambil kolom yang tidak berubah atau bahkan mengambil seluruh baris.

Perhatikan bahwa kebijakan retensi database mungkin perlu diubah ke nilai yang lebih besar atau sama dengan kebijakan retensi aliran perubahan agar pembacaan snapshot berhasil.

Perhatikan juga bahwa penggunaan jenis pengambilan nilai NEW_ROW adalah cara yang direkomendasikan dan lebih efisien untuk melakukannya, karena jenis ini menampilkan semua kolom yang dilacak dari baris secara default dan tidak memerlukan pembacaan snapshot tambahan ke Spanner.

SpannerConfig spannerConfig = SpannerConfig
   .create()
   .withProjectId("my-project-id")
   .withInstanceId("my-instance-id")
   .withDatabaseId("my-database-id")
   .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
   .apply(SpannerIO
       .readChangeStream()
       .withSpannerConfig(spannerConfig)
       // Assume we have a change stream "my-change-stream" that watches Singers table.
       .withChangeStreamName("my-change-stream")
       .withMetadataInstance("my-metadata-instance-id")
       .withMetadataDatabase("my-metadata-database-id")
       .withInclusiveStartAt(Timestamp.now()))
   .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
   // Subsequent processing goes here

ToFullRowJsonFn

Transformasi ini akan melakukan pembacaan yang tidak valid pada stempel waktu commit setiap data yang diterima, dan memetakan seluruh baris ke JSON.

public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
 // Since each instance of this DoFn will create its own session pool and will
 // perform calls to Spanner sequentially, we keep the number of sessions in
 // the pool small. This way, we avoid wasting resources.
 private static final int MIN_SESSIONS = 1;
 private static final int MAX_SESSIONS = 5;
 private final String projectId;
 private final String instanceId;
 private final String databaseId;

 private transient DatabaseClient client;
 private transient Spanner spanner;

 public ToFullRowJsonFn(SpannerConfig spannerConfig) {
   this.projectId = spannerConfig.getProjectId().get();
   this.instanceId = spannerConfig.getInstanceId().get();
   this.databaseId = spannerConfig.getDatabaseId().get();
 }

 @Setup
 public void setup() {
   SessionPoolOptions sessionPoolOptions = SessionPoolOptions
      .newBuilder()
      .setMinSessions(MIN_SESSIONS)
      .setMaxSessions(MAX_SESSIONS)
      .build();
   SpannerOptions options = SpannerOptions
       .newBuilder()
       .setProjectId(projectId)
       .setSessionPoolOption(sessionPoolOptions)
       .build();
   DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
   spanner = options.getService();
   client = spanner.getDatabaseClient(id);
 }

 @Teardown
 public void teardown() {
   spanner.close();
 }

 @ProcessElement
 public void process(
   @Element DataChangeRecord element,
   OutputReceiver<String> output) {
   com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
   element.getMods().forEach(mod -> {
     JSONObject keysJson = new JSONObject(mod.getKeysJson());
     JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
     ModType modType = element.getModType();
     JSONObject jsonRow = new JSONObject();
     long singerId = keysJson.getLong("SingerId");
     jsonRow.put("SingerId", singerId);
     if (modType == ModType.INSERT) {
       // For INSERT mod, get non-primary key columns from mod.
       jsonRow.put("FirstName", newValuesJson.get("FirstName"));
       jsonRow.put("LastName", newValuesJson.get("LastName"));
     } else if (modType == ModType.UPDATE) {
       // For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
       try (ResultSet resultSet = client
         .singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
         .read(
           "Singers",
           KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
             Arrays.asList("FirstName", "LastName"))) {
         if (resultSet.next()) {
           jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
             JSONObject.NULL : resultSet.getString("FirstName"));
           jsonRow.put("LastName", resultSet.isNull("LastName") ?
             JSONObject.NULL : resultSet.getString("LastName"));
         }
       }
     } else {
       // For DELETE mod, there is nothing to do, as we already set SingerId.
     }

     output.output(jsonRow.toString());
   });
 }
}

Kode ini membuat klien database Spanner untuk melakukan pengambilan baris lengkap, dan mengonfigurasi kumpulan sesi agar hanya memiliki beberapa sesi, yang melakukan pembacaan dalam satu instance ToFullReowJsonFn secara berurutan. Dataflow memastikan untuk memunculkan banyak instance fungsi ini, masing-masing dengan kumpulan kliennya sendiri.

Contoh: Spanner ke Pub/Sub

Dalam skenario ini, pemanggil melakukan streaming rekaman ke Pub/Sub secepat mungkin, tanpa pengelompokan atau penggabungan. Hal ini cocok untuk memicu pemrosesan downstream, seperti streaming semua baris baru yang dimasukkan ke dalam tabel Spanner ke Pub/Sub untuk pemrosesan lebih lanjut.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(PubsubIO.writeStrings().to("my-topic"));

Perhatikan bahwa sink Pub/Sub dapat dikonfigurasi untuk memastikan semantik tepat satu kali.

Contoh: Spanner ke Cloud Storage

Dalam skenario ini, pemanggil mengelompokkan semua rekaman dalam jendela tertentu dan menyimpan grup tersebut dalam file Cloud Storage terpisah. Hal ini cocok untuk analisis dan pengarsipan pada satu titik waktu, yang terpisah dari periode retensi Spanner.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO
    .write()
    .to("gs://my-bucket/change-stream-results-")
    .withSuffix(".txt")
    .withWindowedWrites()
    .withNumShards(1));

Perhatikan bahwa sink Cloud Storage menyediakan semantik setidaknya sekali secara default. Dengan pemrosesan tambahan, pesan dapat diubah agar memiliki semantik tepat satu kali.

Kami juga menyediakan template Dataflow untuk kasus penggunaan ini: lihat Menghubungkan aliran perubahan ke Cloud Storage.

Contoh: Spanner ke BigQuery (tabel buku besar)

Di sini, pemanggil melakukan streaming rekaman perubahan ke BigQuery. Setiap catatan perubahan data ditampilkan sebagai satu baris di BigQuery. Ini sangat cocok untuk analisis. Kode ini menggunakan fungsi yang ditentukan sebelumnya, di bagian Fetch full row, untuk mengambil seluruh baris data dan menuliskannya ke BigQuery.

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(spannerConfig)
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
  .apply(BigQueryIO
    .<String>write()
    .to("my-bigquery-table")
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
    .withSchema(new TableSchema().setFields(Arrays.asList(
      new TableFieldSchema()
        .setName("SingerId")
        .setType("INT64")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("FirstName")
        .setType("STRING")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("LastName")
        .setType("STRING")
        .setMode("REQUIRED")
    )))
    .withAutoSharding()
    .optimizedWrites()
    .withFormatFunction((String element) -> {
      ObjectMapper objectMapper = new ObjectMapper();
      JsonNode jsonNode = null;
      try {
        jsonNode = objectMapper.readTree(element);
      } catch (IOException e) {
        e.printStackTrace();
      }
      return new TableRow()
        .set("SingerId", jsonNode.get("SingerId").asInt())
        .set("FirstName", jsonNode.get("FirstName").asText())
        .set("LastName", jsonNode.get("LastName").asText());
    }
  )
);

Perhatikan bahwa sink BigQuery menyediakan semantik setidaknya sekali secara default. Dengan pemrosesan tambahan, pesan dapat diubah agar memiliki semantik tepat satu kali.

Kami juga menyediakan template Dataflow untuk kasus penggunaan ini; lihat Menghubungkan aliran perubahan ke BigQuery.

Memantau pipeline

Ada dua kelas metrik yang tersedia untuk memantau pipeline Dataflow aliran perubahan.

Metrik Dataflow standar

Dataflow menyediakan beberapa metrik untuk memastikan tugas Anda berjalan dengan baik, seperti keaktualan data, kelambatan sistem, throughput tugas, pemakaian CPU worker, dan lainnya. Anda dapat menemukan informasi selengkapnya di Menggunakan Monitoring untuk pipeline Dataflow.

Untuk pipeline aliran perubahan, ada dua metrik utama yang harus diperhitungkan: latensi sistem dan keaktualan data.

Latensi sistem akan memberi tahu Anda durasi waktu maksimum saat ini (dalam detik) item data diproses atau menunggu pemrosesan.

Keaktualan data akan menunjukkan jumlah waktu antara sekarang (real time) dan watermark output. Stempel waktu output T menunjukkan bahwa semua elemen dengan waktu peristiwa (tepat) sebelum T telah diproses untuk perhitungan. Dengan kata lain, keaktualan data mengukur seberapa baru pipeline, terkait pemrosesan peristiwa yang telah diterimanya.

Jika pipeline kekurangan sumber daya, Anda dapat melihat efek tersebut dalam dua metrik ini. Latensi sistem akan meningkat, karena item harus menunggu lebih lama sebelum diproses. Keaktualan data juga akan meningkat karena pipeline tidak akan dapat mengimbangi jumlah data yang diterima.

Metrik aliran perubahan kustom

Metrik ini ditampilkan di Cloud Monitoring dan mencakup:

  • Latensi yang dikelompokkan (histogram) antara saat suatu rekaman di-commit di Spanner hingga saat rekaman tersebut dikeluarkan ke dalam PCollection oleh konektor. Metrik ini dapat digunakan untuk melihat masalah performa (latensi) pada pipeline.
  • Jumlah total catatan data yang dibaca. Ini adalah indikasi keseluruhan jumlah rekaman yang dikeluarkan oleh konektor. Jumlah ini harus terus meningkat, mencerminkan tren penulisan di database Spanner yang mendasarinya.
  • Jumlah partisi yang sedang dibaca. Harus selalu ada partisi yang dibaca. Jika angka ini nol, berarti terjadi error dalam pipeline.
  • Total jumlah kueri yang dikeluarkan selama eksekusi konektor. Ini adalah indikasi keseluruhan kueri aliran perubahan yang dibuat ke instance Spanner selama eksekusi pipeline. Metrik ini dapat digunakan untuk memperkirakan beban dari konektor ke database Spanner.

Memperbarui pipeline yang ada

Pipeline yang sedang berjalan yang menggunakan konektor SpannerIO untuk memproses aliran perubahan dapat diupdate jika pemeriksaan kompatibilitas tugas berhasil. Untuk melakukannya, Anda harus menetapkan parameter nama tabel metadata tugas baru secara eksplisit saat memperbaruinya. Gunakan nilai opsi pipeline metadataTable dari tugas yang Anda perbarui.

Jika Anda menggunakan template Dataflow yang disediakan Google, tetapkan nama tabel menggunakan parameter spannerMetadataTableName. Anda juga dapat mengubah tugas yang ada untuk menggunakan tabel metadata secara eksplisit dengan metode withMetadataTable(your-metadata-table-name) dalam konfigurasi konektor. Setelah selesai, Anda dapat mengikuti petunjuk di Meluncurkan tugas pengganti dari dokumentasi Dataflow untuk memperbarui tugas yang sedang berjalan.

Praktik terbaik untuk aliran perubahan dan Dataflow

Berikut adalah beberapa praktik terbaik untuk membangun koneksi aliran perubahan dengan menggunakan Dataflow.

Menggunakan database metadata terpisah

Sebaiknya buat database terpisah untuk konektor SpannerIO yang akan digunakan untuk penyimpanan metadata, daripada mengonfigurasinya untuk menggunakan database aplikasi Anda.

Untuk mengetahui informasi selengkapnya, lihat Mempertimbangkan database metadata terpisah.

Menentukan ukuran cluster

Aturan praktis untuk jumlah awal pekerja dalam tugas aliran perubahan Spanner adalah satu pekerja per 1.000 penulisan per detik. Perhatikan bahwa perkiraan ini dapat bervariasi bergantung pada beberapa faktor, seperti ukuran setiap transaksi, jumlah rekaman aliran perubahan yang dihasilkan dari satu transaksi dan transformasi, agregasi, atau tujuan lainnya yang digunakan dalam pipeline.

Setelah penyiapan sumber daya awal, penting untuk melacak metrik yang disebutkan dalam Memantau pipeline, untuk memastikan kondisi pipeline baik. Sebaiknya lakukan eksperimen dengan ukuran pool pekerja awal dan pantau cara pipeline menangani beban, dengan menambah jumlah node jika perlu. Penggunaan CPU adalah metrik utama untuk memeriksa apakah beban sudah tepat dan apakah diperlukan lebih banyak node.

Batasan umum

Ada beberapa batasan umum saat menggunakan aliran data perubahan Spanner dengan Dataflow:

Penskalaan otomatis

Dukungan penskalaan otomatis untuk pipeline apa pun yang menyertakan SpannerIO.readChangeStream memerlukan Apache Beam 2.39.0 atau yang lebih tinggi.

Jika Anda menggunakan Apache Beam versi sebelum 2.39.0, pipeline yang menyertakan SpannerIO.readChangeStream harus secara eksplisit menentukan algoritma penskalaan otomatis sebagai NONE, seperti yang dijelaskan dalam Penskalaan otomatis horizontal.

Untuk menskalakan pipeline Dataflow secara manual, bukan menggunakan penskalaan otomatis, lihat Menskalakan pipeline streaming secara manual.

Runner V2

Konektor aliran perubahan Spanner memerlukan Dataflow Runner V2. Hal ini harus ditentukan secara manual selama eksekusi atau error akan terjadi. Anda dapat menentukan Runner V2 dengan mengonfigurasi tugas menggunakan --experiments=use_unified_worker,use_runner_v2.

Snapshot

Konektor aliran perubahan Spanner tidak mendukung Snapshot Dataflow.

Menyelesaikan

Konektor aliran perubahan Spanner tidak mendukung menghentikan tugas. Anda hanya dapat membatalkan tugas yang ada.

Anda juga dapat memperbarui pipeline yang ada tanpa perlu menghentikannya.

OpenCensus

Untuk menggunakan OpenCensus guna memantau pipeline, tentukan versi 0.28.3 atau yang lebih baru.

NullPointerException saat pipeline dimulai

Bug di Apache Beam versi 2.38.0 dapat menyebabkan NullPointerException saat memulai pipeline dalam kondisi tertentu. Tindakan ini akan mencegah tugas Anda dimulai, dan menampilkan pesan error ini:

java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null

Untuk mengatasi masalah ini, gunakan Apache Beam versi 2.39.0 atau yang lebih baru, atau tentukan versi beam-sdks-java-core secara manual sebagai 2.37.0:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.37.0</version>
</dependency>

Informasi selengkapnya