Konektor Bigtable HBase Beam

Untuk membantu Anda menggunakan Bigtable dalam pipeline Dataflow, dua konektor I/O Bigtable Beam open source tersedia.

Jika Anda melakukan migrasi dari HBase ke Bigtable atau aplikasi Anda memanggil HBase API, gunakan konektor Bigtable HBase Beam (CloudBigtableIO) yang dibahas di halaman ini.

Dalam semua kasus lainnya, Anda harus menggunakan konektor Bigtable Beam (BigtableIO) bersama dengan klien Cloud Bigtable untuk Java, yang berfungsi dengan Cloud Bigtable API. Untuk mulai menggunakan konektor tersebut, lihat Konektor Bigtable Beam.

Untuk mengetahui informasi selengkapnya tentang model pemrograman Apache Beam, lihat dokumentasi Beam.

Mulai menggunakan HBase

Konektor Bigtable HBase Beam ditulis dalam Java dan dibangun di klien HBase Bigtable untuk Java. Kompatibel dengan Dataflow SDK 2.x untuk Java, yang didasarkan pada Apache Beam. Kode sumber konektor ada di GitHub dalam repositori googleapis/java-bigtable-hbase.

Halaman ini memberikan ringkasan cara menggunakan transformasi Read dan Write.

Menyiapkan autentikasi

Untuk menggunakan contoh Java di halaman ini dalam lingkungan pengembangan lokal, instal dan lakukan inisialisasi gcloud CLI, lalu siapkan Kredensial Default Aplikasi dengan kredensial pengguna Anda.

  1. Instal Google Cloud CLI.

  2. Jika Anda menggunakan penyedia identitas (IdP) eksternal, Anda harus login ke gcloud CLI dengan identitas gabungan Anda terlebih dahulu.

  3. Jika Anda menggunakan shell lokal, buat kredensial autentikasi lokal untuk akun pengguna Anda:

    gcloud auth application-default login

    Anda tidak perlu melakukan langkah ini jika menggunakan Cloud Shell.

    Jika error autentikasi ditampilkan, dan Anda menggunakan penyedia identitas (IdP) eksternal, konfirmasi bahwa Anda telah login ke gcloud CLI dengan identitas gabungan Anda.

Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

Untuk mengetahui informasi tentang cara menyiapkan autentikasi bagi lingkungan produksi, lihat Menyiapkan Kredensial Default Aplikasi untuk kode yang berjalan di Google Cloud .

Menambahkan konektor ke project Maven

Untuk menambahkan konektor Bigtable HBase Beam ke project Maven, tambahkan artefak Maven ke file pom.xml Anda sebagai dependensi:

<dependency>
  <groupId>com.google.cloud.bigtable</groupId>
  <artifactId>bigtable-hbase-beam</artifactId>
  <version>2.12.0</version>
</dependency>

Tentukan konfigurasi Bigtable

Buat antarmuka opsi untuk mengizinkan input guna menjalankan pipeline Anda:

public interface BigtableOptions extends DataflowPipelineOptions {

  @Description("The Bigtable project ID, this can be different than your Dataflow project")
  @Default.String("bigtable-project")
  String getBigtableProjectId();

  void setBigtableProjectId(String bigtableProjectId);

  @Description("The Bigtable instance ID")
  @Default.String("bigtable-instance")
  String getBigtableInstanceId();

  void setBigtableInstanceId(String bigtableInstanceId);

  @Description("The Bigtable table ID in the instance.")
  @Default.String("mobile-time-series")
  String getBigtableTableId();

  void setBigtableTableId(String bigtableTableId);
}

Saat membaca dari atau menulis ke Bigtable, Anda harus menyediakan objek konfigurasi CloudBigtableConfiguration. Objek ini menentukan ID project dan ID instance untuk tabel Anda, serta nama tabel itu sendiri:

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .build();

Untuk membaca, berikan objek konfigurasi CloudBigtableScanConfiguration, yang memungkinkan Anda menentukan objek Scan Apache HBase yang membatasi dan memfilter hasil pembacaan. Lihat Membaca dari Bigtable untuk mengetahui detailnya.

Membaca dari Bigtable

Untuk membaca dari tabel Bigtable, Anda menerapkan transformasi Read ke hasil operasi CloudBigtableIO.read. Transformasi Read menampilkan PCollection objek Result HBase, dengan setiap elemen dalam PCollection mewakili satu baris dalam tabel.

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(
        ParDo.of(
            new DoFn<Result, Void>() {
              @ProcessElement
              public void processElement(@Element Result row, OutputReceiver<Void> out) {
                System.out.println(Bytes.toString(row.getRow()));
              }
            }));

Secara default, operasi CloudBigtableIO.read menampilkan semua baris dalam tabel Anda. Anda dapat menggunakan objek Scan HBase untuk membatasi pembacaan ke rentang kunci baris dalam tabel, atau untuk menerapkan filter ke hasil pembacaan. Untuk menggunakan objek Scan, sertakan di CloudBigtableScanConfiguration.

Misalnya, Anda dapat menambahkan Scan yang hanya menampilkan pasangan nilai kunci pertama dari setiap baris dalam tabel, yang berguna saat menghitung jumlah baris dalam tabel:

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldRead {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    Scan scan = new Scan();
    scan.setCacheBlocks(false);
    scan.setFilter(new FirstKeyOnlyFilter());

    CloudBigtableScanConfiguration config =
        new CloudBigtableScanConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withScan(scan)
            .build();

    p.apply(Read.from(CloudBigtableIO.read(config)))
        .apply(
            ParDo.of(
                new DoFn<Result, Void>() {
                  @ProcessElement
                  public void processElement(@Element Result row, OutputReceiver<Void> out) {
                    System.out.println(Bytes.toString(row.getRow()));
                  }
                }));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}

Menulis ke Bigtable

Untuk menulis ke tabel Bigtable, Anda apply operasi CloudBigtableIO.writeToTable. Anda harus melakukan operasi ini pada objek Mutation PCollection HBase, yang dapat mencakup objek Put dan Delete.

Tabel Bigtable harus sudah ada dan harus memiliki grup kolom yang sesuai. Konektor Dataflow tidak membuat tabel dan grup kolom secara langsung. Anda dapat menggunakan cbt CLI untuk membuat tabel dan menyiapkan column family, atau Anda dapat melakukannya secara terprogram.

Sebelum menulis ke Bigtable, Anda harus membuat pipeline Dataflow agar operasi put dan delete dapat diserialisasi melalui jaringan:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

Secara umum, Anda harus melakukan transformasi, seperti ParDo, untuk memformat data output Anda menjadi kumpulan objek HBase Put atau Delete. Contoh berikut menunjukkan transformasi DoFn yang mengambil nilai saat ini dan menggunakannya sebagai kunci baris untuk Put. Kemudian, Anda dapat menulis objek Put ke Bigtable.

p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
    .apply(
        ParDo.of(
            new DoFn<String, Mutation>() {
              @ProcessElement
              public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                long timestamp = System.currentTimeMillis();
                Put row = new Put(Bytes.toBytes(rowkey));

                row.addColumn(
                    Bytes.toBytes("stats_summary"),
                    Bytes.toBytes("os_build"),
                    timestamp,
                    Bytes.toBytes("android"));
                out.output(row);
              }
            }))
    .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

Untuk mengaktifkan kontrol alur penulisan batch, tetapkan BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL ke true. Fitur ini otomatis membatasi laju traffic untuk permintaan penulisan batch dan memungkinkan penskalaan otomatis Bigtable menambahkan atau menghapus node secara otomatis untuk menangani tugas Dataflow Anda.

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
            "true")
        .build();
return bigtableTableConfig;

Berikut adalah contoh penulisan lengkap, termasuk variasi yang memungkinkan kontrol alur penulisan batch.


import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldWrite {

  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .build();

    p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
        .apply(
            ParDo.of(
                new DoFn<String, Mutation>() {
                  @ProcessElement
                  public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                    long timestamp = System.currentTimeMillis();
                    Put row = new Put(Bytes.toBytes(rowkey));

                    row.addColumn(
                        Bytes.toBytes("stats_summary"),
                        Bytes.toBytes("os_build"),
                        timestamp,
                        Bytes.toBytes("android"));
                    out.output(row);
                  }
                }))
        .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {

    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }

  public static CloudBigtableTableConfiguration batchWriteFlowControlExample(
      BigtableOptions options) {
    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
                "true")
            .build();
    return bigtableTableConfig;
  }
}