העברות מבוססות-אירוע מ-Cloud Storage

‫Storage Transfer Service יכול להאזין להתראות על אירועים ב-Google Cloud כדי להעביר באופן אוטומטי נתונים שנוספו או עודכנו בקטגוריה של Cloud Storage. מידע נוסף על היתרונות של העברות מבוססות-אירועים

העברות מבוססות-אירועים מ-Cloud Storage משתמשות בהתראות Pub/Sub כדי לדעת מתי אובייקטים בקטגוריית המקור שונו או נוספו. מחיקות של אובייקטים לא מזוהות. מחיקה של אובייקט במקור לא מוחקת את האובייקט המשויך בקטגוריית היעד.

העברות מבוססות-אירועים תמיד משתמשות בקטגוריה של Cloud Storage כיעד.

הגדרת ההרשאות

בנוסף להרשאות שנדרשות לכל העברה, העברות מבוססות-אירועים דורשות את התפקיד Pub/Sub Subscriber.

  1. כדי למצוא את השם של סוכן השירות של Storage Transfer Service בפרויקט:

    1. עוברים אל דף העזר של googleServiceAccounts.get.

      תיפתח חלונית אינטראקטיבית עם הכותרת Try this method (נסו את השיטה הזו).

    2. בחלונית, בקטע Request parameters, מזינים את מזהה הפרויקט. הפרויקט שאתם מציינים כאן צריך להיות הפרויקט שבו אתם משתמשים כדי לנהל את Storage Transfer Service, ויכול להיות שהוא שונה מהפרויקט של דלי המקור.

    3. לוחצים על Execute.

    כתובת האימייל של סוכן השירות מוחזרת כערך של accountEmail. מעתיקים את הערך הזה.

    כתובת האימייל של סוכן השירות היא בפורמט project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com.

  2. מקצים את התפקיד Pub/Sub Subscriber לסוכן השירות של Storage Transfer Service.

    מסוף Cloud

    פועלים לפי ההוראות במאמר שליטה בגישה דרך מסוף Google Cloud כדי להעניק את התפקיד Pub/Sub Subscriber לשירות Storage Transfer Service. אפשר להעניק את התפקיד ברמת הנושא, המינוי או הפרויקט.

    gcloud CLI

    פועלים לפי ההוראות במאמר הגדרת מדיניות כדי להוסיף את הקישור הבא:

    {
      "role": "roles/pubsub.subscriber",
      "members": [
        "serviceAccount:project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com"
    }

הגדרת Pub/Sub

  1. חשוב לוודא שאתם עומדים בדרישות המוקדמות לשימוש ב-Pub/Sub עם Cloud Storage.

  2. יוצרים התראת Pub/Sub לקטגוריית המקור של Cloud Storage.

    אי אפשר לנהל את ההתראות של Pub/Sub באמצעות מסוף Google Cloud . במקום זאת, צריך להשתמש ב-CLI של gcloud או באחת מספריות הלקוח הזמינות.

    gcloud storage buckets notifications create gs://SOURCE_BUCKET_NAME --topic=TOPIC_NAME
  3. יוצרים מינוי שליפה לנושא. צריך ליצור מינוי נפרד לכל משימת העברה.

    בדוגמה הבאה מוצגת הפקודה ב-Google Cloud CLI ליצירת מינוי מסוג pull. הוראות למסוף וקוד של ספריית לקוח מופיעים במאמר בנושא יצירת מינוי שליפה.

    gcloud pubsub subscriptions create SUBSCRIPTION_ID --topic=TOPIC_NAME --ack-deadline=300

יצירת העברת נתונים

אפשר להשתמש ב-API בארכיטקטורת REST או במסוף Google Cloud כדי ליצור העברה מבוססת-אירועים.

אל תכללו בשם של עבודת ההעברה מידע רגיש כמו פרטים אישיים מזהים (PII) או נתוני אבטחה. יכול להיות ששמות המשאבים יועברו לשמות של משאבים אחרים ב-Google Cloud, ויוצגו למערכות פנימיות של Google מחוץ לפרויקט שלכם.

מסוף Cloud

  1. נכנסים לדף Create transfer job במסוף Google Cloud .

    מעבר אל Create transfer job

  2. בוחרים באפשרות Cloud Storage כמקור וכייעד.

  3. בקטע מצב תזמון בוחרים באפשרות מבוסס-אירועים ולוחצים על השלב הבא.

  4. בוחרים את דלי המקור להעברה.

  5. בקטע Event stream, מזינים את שם המינוי:

    projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID

  6. אפשר להגדיר מסננים ואז ללחוץ על השלב הבא.

  7. בוחרים את דלי היעד להעברה הזו.

  8. אפשר גם להזין שעת התחלה ושעת סיום להעברה. אם לא מציינים שעה, ההעברה תתחיל באופן מיידי ותפעל עד שתופסק באופן ידני.

  9. מציינים את אפשרויות ההעברה הרצויות. מידע נוסף זמין בדף יצירת העברות.

  10. לוחצים על יצירה.

אחרי שיוצרים את משימת ההעברה, היא מתחילה לפעול ומאזין האירועים מחכה להתראות במינוי Pub/Sub. בדף פרטי המשרה מוצגת פעולה אחת בכל שעה, והוא כולל פרטים על הנתונים שהועברו בכל משרה.

REST

כדי ליצור העברה מבוססת-אירועים באמצעות API בארכיטקטורת REST, שולחים את אובייקט ה-JSON הבא לנקודת הקצה transferJobs.create:

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "gcsDataSource" {
      "bucketName": "GCS_SOURCE_NAME"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

הערכים eventStreamStartTime ו-eventStreamExpirationTime הם אופציונליים. אם לא מציינים את זמן ההתחלה, ההעברה מתחילה באופן מיידי. אם לא מציינים את זמן הסיום, ההעברה נמשכת עד שמפסיקים אותה ידנית.

ספריות לקוח

Go

מידע על התקנת ספריית הלקוח של Storage Transfer Service והשימוש בה מופיע במאמר ספריות הלקוח של Storage Transfer Service. מידע נוסף מופיע במאמרי העזרה של Storage Transfer Service Go API.

כדי לבצע אימות ב-Storage Transfer Service, אתם צריכים להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.


func createEventDrivenGCSTransfer(w io.Writer, projectID string, gcsSourceBucket string, gcsSinkBucket string, pubSubId string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source GCS bucket.
	// gcsSourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Pub/Sub topic to subscribe the event driven transfer to.
	// pubSubID := "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID"

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_GcsDataSource{
					GcsDataSource: &storagetransferpb.GcsData{BucketName: gcsSourceBucket}},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: pubSubId},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", gcsSourceBucket, gcsSinkBucket, pubSubId, resp.Name)
	return resp, nil
}

Java

מידע על התקנת ספריית הלקוח של Storage Transfer Service והשימוש בה מופיע במאמר ספריות הלקוח של Storage Transfer Service. מידע נוסף מופיע במאמרי העזרה של Storage Transfer Service Java API.

כדי לבצע אימות ב-Storage Transfer Service, אתם צריכים להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenGcsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the GCS AWS bucket to transfer data from
    String gcsSourceBucket = "your-gcs-source-bucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-sink-bucket";

    // The ARN of the PubSub queue to subscribe to
    String sqsQueueArn = "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID";

    createEventDrivenGcsTransfer(projectId, gcsSourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenGcsTransfer(
      String projectId, String gcsSourceBucket, String gcsSinkBucket, String pubSubId)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setGcsDataSource(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSourceBucket))
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(pubSubId).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job between from "
              + gcsSourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + pubSubId
              + " with name "
              + response.getName());
    }
  }
}

Node.js

מידע על התקנת ספריית הלקוח של Storage Transfer Service והשימוש בה מופיע במאמר ספריות הלקוח של Storage Transfer Service. מידע נוסף מופיע במאמרי העזרה של Storage Transfer Service Node.js API.

כדי לבצע אימות ב-Storage Transfer Service, אתם צריכים להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// Google Cloud Storage source bucket name
// gcsSourceBucket = 'my-gcs-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The subscription ID to a Pubsub queue to track
// pubsubId = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks a Pubsub subscription.
 */
async function createEventDrivenGcsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        gcsDataSource: {
          bucketName: gcsSourceBucket,
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: pubsubId,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${gcsSourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenGcsTransfer();

Python

מידע על התקנת ספריית הלקוח של Storage Transfer Service והשימוש בה מופיע במאמר ספריות הלקוח של Storage Transfer Service. מידע נוסף מופיע במאמרי העזרה של Storage Transfer Service Python API.

כדי לבצע אימות ב-Storage Transfer Service, אתם צריכים להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.


from google.cloud import storage_transfer


def create_event_driven_gcs_transfer(
    project_id: str,
    description: str,
    source_bucket: str,
    sink_bucket: str,
    pubsub_id: str,
):
    """Create an event driven transfer between two GCS buckets that tracks a PubSub subscription"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks a pubsub subscription'

    # Google Cloud Storage source bucket name
    # source_bucket = 'my-gcs-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_bucket = 'my-gcs-destination-bucket'

    # The Pubsub Subscription ID to track
    # pubsub_id = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "gcs_data_source": {
                        "bucket_name": source_bucket,
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_bucket,
                    },
                },
                "event_stream": {
                    "name": pubsub_id,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")

מעקב אחרי העברה מבוססת-אירועים

כשיוצרים העברה מבוססת-אירועים, Storage Transfer Service יוצרת משימת העברה. כשמגיעים לשעת ההתחלה, מתחילה לפעול פעולת העברה ו-event listener ממתין להתראות מתור Pub/Sub.

פעולת ההעברה פועלת עם הסטטוס in progress במשך כ-24 שעות. אחרי 24 שעות, הפעולה מסתיימת ומתחילה פעולה חדשה. מבצעים חדש נוצר כל 24 שעות עד שמגיעים לשעת הסיום של משימת ההעברה או עד שמפסיקים את המשימה באופן ידני.

אם העברת קובץ מתבצעת בזמן שבו הפעולה אמורה להסתיים, הפעולה הנוכחית תמשיך להתבצע עד שהקובץ יועבר במלואו. פעולה חדשה מתחילה, ושתי הפעולות פועלות בו-זמנית עד שהפעולה הישנה מסתיימת. כל האירועים שמזוהים במהלך התקופה הזו מטופלים על ידי הפעולה החדשה.

כדי לראות את הפעולה הנוכחית ואת הפעולות שהושלמו:

מסוף Google Cloud

  1. נכנסים לדף Storage Transfer Service במסוף Google Cloud .

    מעבר אל Storage Transfer Service

  2. ברשימת המשימות, בוחרים בכרטיסייה הכול או בכרטיסייה Cloud-to-cloud.

  3. לוחצים על מזהה העבודה של ההעברה. בעמודה מצב התזמון מופיעים כל ההעברות שמבוססות על אירועים לעומת העברות באצווה.

  4. בוחרים בכרטיסייה פעולות. פרטים על הפעולה הנוכחית מוצגים בטבלה Run history. כדי לראות פרטים נוספים, לוחצים על פעולה שהושלמה.

gcloud

כדי לעקוב אחרי התקדמות העבודה בזמן אמת, משתמשים ב-gcloud transfer jobs monitor. בתשובה מוצגת הפעולה הנוכחית, שעת ההתחלה של העבודה, כמות הנתונים שהועברו, בייטים שדילגו עליהם ומספר השגיאות.

gcloud transfer jobs monitor JOB_NAME

כדי לאחזר את שם הפעולה הנוכחי:

gcloud transfer jobs describe JOB_NAME --format="value(latestOperationName)"

כדי להציג רשימה של פעולות נוכחיות ופעולות שהושלמו:

gcloud transfer operations list --job-names=JOB_NAME

כדי לראות פרטים על פעולה:

gcloud transfer operations describe OPERATION_NAME