Transfer berbasis peristiwa dari Cloud Storage

Storage Transfer Service dapat memproses notifikasi peristiwa di Google Cloud untuk otomatis mentransfer data yang telah ditambahkan atau diperbarui di bucket Cloud Storage. Pelajari lebih lanjut manfaat transfer berbasis peristiwa.

Transfer berbasis peristiwa dari Cloud Storage menggunakan notifikasi Pub/Sub untuk mengetahui kapan objek di bucket sumber telah diubah atau ditambahkan. Penghapusan objek tidak terdeteksi; menghapus objek di sumber tidak akan menghapus objek terkait di bucket tujuan.

Transfer berbasis peristiwa selalu menggunakan bucket Cloud Storage sebagai tujuan.

Mengonfigurasi izin

Selain izin yang diperlukan untuk semua tugas transfer, transfer berbasis peristiwa memerlukan peran Pub/Sub Subscriber.

  1. Temukan nama agen layanan Storage Transfer Service untuk project Anda:

    1. Buka halaman referensi googleServiceAccounts.get.

      Panel interaktif akan terbuka, dengan judul Try this method.

    2. Di panel, di bagian Request parameters, masukkan ID project Anda. Project yang Anda tentukan di sini harus merupakan project yang Anda gunakan untuk mengelola Storage Transfer Service, yang mungkin berbeda dengan project bucket sumber.

    3. Klik Execute.

    Email agen layanan Anda akan ditampilkan sebagai nilai accountEmail. Salin nilai ini.

    Email agen layanan menggunakan format project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com.

  2. Berikan peran Pub/Sub Subscriber kepada agen layanan Storage Transfer Service.

    Cloud Console

    Ikuti petunjuk di Mengontrol akses melalui Google Cloud konsol untuk memberikan peran Pub/Sub Subscriber ke layanan Storage Transfer Service. Peran dapat diberikan di tingkat topik, langganan, atau project.

    CLI gcloud

    Ikuti petunjuk di Menetapkan kebijakan untuk menambahkan binding berikut:

    {
      "role": "roles/pubsub.subscriber",
      "members": [
        "serviceAccount:project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com"
    }

Mengonfigurasi Pub/Sub

  1. Pastikan Anda telah memenuhi Prasyarat untuk menggunakan Pub/Sub dengan Cloud Storage.

  2. Buat notifikasi Pub/Sub untuk bucket Cloud Storage sumber.

    Anda tidak dapat mengelola notifikasi Pub/Sub dengan Google Cloud konsol. Gunakan gcloud CLI atau salah satu library klien yang tersedia sebagai gantinya.

    gcloud storage buckets notifications create gs://SOURCE_BUCKET_NAME --topic=TOPIC_NAME
  3. Buat langganan pull untuk topik. Anda harus membuat langganan terpisah untuk setiap tugas transfer.

    Contoh berikut menunjukkan perintah Google Cloud CLI untuk membuat langganan pull. Untuk petunjuk konsol dan kode library klien, lihat Membuat langganan pull.

    gcloud pubsub subscriptions create SUBSCRIPTION_ID --topic=TOPIC_NAME --ack-deadline=300

Membuat tugas transfer

Anda dapat menggunakan REST API atau Google Cloud konsol untuk membuat tugas transfer berbasis peristiwa.

Jangan menyertakan informasi sensitif seperti informasi identitas pribadi (PII) atau data keamanan dalam nama tugas transfer Anda. Nama resource dapat disebarkan ke nama resource Google Cloud lainnya dan dapat diekspos ke sistem internal Google di luar project Anda.

Cloud Console

  1. Buka halaman Create transfer job di Google Cloud konsol.

    Buka Create transfer job

  2. Pilih Cloud Storage sebagai sumber dan tujuan.

  3. Sebagai Scheduling mode , pilih Event-driven , lalu klik Next step.

  4. Pilih bucket sumber untuk transfer ini.

  5. Di bagian Event stream, masukkan nama langganan:

    projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID

  6. Secara opsional, tentukan filter apa pun, lalu klik Next step.

  7. Pilih bucket tujuan untuk transfer ini.

  8. Secara opsional, masukkan waktu mulai dan berakhir untuk transfer. Jika Anda tidak menentukan waktu, transfer akan segera dimulai dan akan berjalan hingga dihentikan secara manual.

  9. Tentukan opsi transfer apa pun. Informasi selengkapnya tersedia di halaman Membuat transfer.

  10. Klik Create.

Setelah dibuat, tugas transfer akan mulai berjalan dan pemroses peristiwa akan menunggu notifikasi di langganan Pub/Sub. Halaman detail tugas menampilkan satu operasi setiap jam, dan menyertakan detail data yang ditransfer untuk setiap tugas.

REST

Untuk membuat transfer berbasis peristiwa menggunakan REST API, kirim objek JSON berikut ke endpoint transferJobs.create:

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "gcsDataSource" {
      "bucketName": "GCS_SOURCE_NAME"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

eventStreamStartTime dan eventStreamExpirationTime bersifat opsional. Jika waktu mulai dihilangkan, transfer akan segera dimulai; jika waktu berakhir dihilangkan, transfer akan berlanjut hingga dihentikan secara manual.

Library klien

Go

Untuk mempelajari cara menginstal dan menggunakan library klien untuk Storage Transfer Service, lihat library klien Storage Transfer Service. Untuk mengetahui informasi selengkapnya, lihat Storage Transfer Service Go API dokumentasi referensi.

Untuk melakukan autentikasi ke Storage Transfer Service, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.


func createEventDrivenGCSTransfer(w io.Writer, projectID string, gcsSourceBucket string, gcsSinkBucket string, pubSubId string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source GCS bucket.
	// gcsSourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Pub/Sub topic to subscribe the event driven transfer to.
	// pubSubID := "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID"

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_GcsDataSource{
					GcsDataSource: &storagetransferpb.GcsData{BucketName: gcsSourceBucket}},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: pubSubId},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", gcsSourceBucket, gcsSinkBucket, pubSubId, resp.Name)
	return resp, nil
}

Java

Untuk mempelajari cara menginstal dan menggunakan library klien untuk Storage Transfer Service, lihat library klien Storage Transfer Service. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Storage Transfer Service Java API.

Untuk melakukan autentikasi ke Storage Transfer Service, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenGcsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the GCS AWS bucket to transfer data from
    String gcsSourceBucket = "your-gcs-source-bucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-sink-bucket";

    // The ARN of the PubSub queue to subscribe to
    String sqsQueueArn = "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID";

    createEventDrivenGcsTransfer(projectId, gcsSourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenGcsTransfer(
      String projectId, String gcsSourceBucket, String gcsSinkBucket, String pubSubId)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setGcsDataSource(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSourceBucket))
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(pubSubId).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job between from "
              + gcsSourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + pubSubId
              + " with name "
              + response.getName());
    }
  }
}

Node.js

Untuk mempelajari cara menginstal dan menggunakan library klien untuk Storage Transfer Service, lihat library klien Storage Transfer Service. Untuk mengetahui informasi selengkapnya, lihat Storage Transfer Service Node.js API dokumentasi referensi.

Untuk melakukan autentikasi ke Storage Transfer Service, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// Google Cloud Storage source bucket name
// gcsSourceBucket = 'my-gcs-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The subscription ID to a Pubsub queue to track
// pubsubId = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks a Pubsub subscription.
 */
async function createEventDrivenGcsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        gcsDataSource: {
          bucketName: gcsSourceBucket,
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: pubsubId,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${gcsSourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenGcsTransfer();

Python

Untuk mempelajari cara menginstal dan menggunakan library klien untuk Storage Transfer Service, lihat library klien Storage Transfer Service. Untuk mengetahui informasi selengkapnya, lihat Storage Transfer Service Python API dokumentasi referensi.

Untuk melakukan autentikasi ke Storage Transfer Service, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.


from google.cloud import storage_transfer


def create_event_driven_gcs_transfer(
    project_id: str,
    description: str,
    source_bucket: str,
    sink_bucket: str,
    pubsub_id: str,
):
    """Create an event driven transfer between two GCS buckets that tracks a PubSub subscription"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks a pubsub subscription'

    # Google Cloud Storage source bucket name
    # source_bucket = 'my-gcs-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_bucket = 'my-gcs-destination-bucket'

    # The Pubsub Subscription ID to track
    # pubsub_id = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "gcs_data_source": {
                        "bucket_name": source_bucket,
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_bucket,
                    },
                },
                "event_stream": {
                    "name": pubsub_id,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")

Memantau transfer berbasis peristiwa

Saat Anda membuat transfer berbasis peristiwa, Storage Transfer Service akan membuat tugas transfer. Setelah waktu mulai tercapai, operasi transfer akan mulai berjalan dan pemroses peristiwa akan menunggu notifikasi dari antrean Pub/Sub.

Operasi transfer berjalan, dengan status in progress, selama sekitar 24 jam. Setelah 24 jam, operasi akan selesai dan operasi baru akan dimulai. Operasi baru dibuat setiap 24 jam hingga waktu berakhir tugas transfer tercapai, atau hingga tugas dihentikan secara manual.

Jika transfer file sedang berlangsung saat operasi dijadwalkan selesai, operasi saat ini akan tetap berlangsung hingga file berhasil ditransfer sepenuhnya. Operasi baru akan dimulai, dan kedua operasi akan berjalan secara bersamaan hingga operasi lama selesai. Setiap peristiwa yang terdeteksi selama periode ini akan ditangani oleh operasi baru.

Untuk melihat operasi saat ini dan operasi yang telah selesai:

Google Cloud Konsol

  1. Buka halaman Storage Transfer Service di Google Cloud konsol.

    Buka Storage Transfer Service

  2. Dalam daftar tugas, pilih tab All, atau Cloud-to-cloud.

  3. Klik ID tugas untuk transfer Anda. Kolom Scheduling mode mengidentifikasi semua transfer berbasis peristiwa versus transfer batch.

  4. Pilih tab Operations. Detail ditampilkan untuk operasi saat ini, dan operasi yang telah selesai dicantumkan dalam tabel Run history. Klik operasi yang telah selesai untuk melihat detail tambahan.

gcloud

Untuk memantau progres tugas secara real time, gunakan gcloud transfer jobs monitor. Respons menunjukkan operasi saat ini, waktu mulai tugas, jumlah data yang ditransfer, byte yang dilewati, dan jumlah error.

gcloud transfer jobs monitor JOB_NAME

Untuk mengambil nama operasi saat ini:

gcloud transfer jobs describe JOB_NAME --format="value(latestOperationName)"

Untuk mencantumkan operasi saat ini dan yang telah selesai:

gcloud transfer operations list --job-names=JOB_NAME

Untuk melihat detail tentang operasi:

gcloud transfer operations describe OPERATION_NAME