העברות מבוססות-אירועים מ-AWS S3

‫Storage Transfer Service יכול להאזין להתראות אירועים ב-AWS כדי להעביר באופן אוטומטי נתונים שנוספו או עודכנו במיקום המקור לקטגוריה של Cloud Storage. מידע נוסף על היתרונות של העברות מבוססות-אירועים

העברות מבוססות-אירועים מאזינות להתראות אירועים של Amazon S3 שנשלחות ל-Amazon SQS כדי לדעת מתי אובייקטים בקטגוריית המקור שונו או נוספו. מחיקות של אובייקטים לא מזוהות. מחיקה של אובייקט במקור לא מוחקת את האובייקט המשויך בקטגוריית היעד.

העברות מבוססות-אירועים תמיד משתמשות בקטגוריה של Cloud Storage כיעד.

לפני שמתחילים

פועלים לפי ההוראות כדי להעניק את ההרשאות הנדרשות בקטגוריה של Cloud Storage ביעד:

יצירת תור SQS

  1. במסוף AWS, עוברים לדף Simple Queue Service.

  2. לוחצים על יצירת תור.

  3. מזינים שם לתור הזה.

  4. בקטע מדיניות גישה, בוחרים באפשרות מתקדם. מוצג אובייקט JSON:

     {
        "Version": "2008-10-17",
        "Id": "__default_policy_ID",
        "Statement": [
          {
            "Sid": "__owner_statement",
            "Effect": "Allow",
            "Principal": {
              "AWS": "01234567890"
            },
            "Action": [
              "SQS:*"
            ],
            "Resource": "arn:aws:sqs:us-west-2:01234567890:test"
          }
        ]
      }
      

    הערכים של AWS ושל Resource הם ייחודיים לכל פרויקט.

  5. מעתיקים את הערכים הספציפיים של AWS ו-Resource מ-JSON שמוצג אל קטע ה-JSON הבא:

    {
      "Version": "2012-10-17",
      "Id": "example-ID",
      "Statement": [
        {
          "Sid": "example-statement-ID",
          "Effect": "Allow",
          "Principal": {
            "Service": "s3.amazonaws.com"
          },
          "Action": "SQS:SendMessage",
          "Resource": "RESOURCE",
          "Condition": {
            "StringEquals": {
              "aws:SourceAccount": "AWS"
            },
            "ArnLike": {
              "aws:SourceArn": "S3_BUCKET_ARN"
            }
          }
        }
      ]
    }

    הערכים של ה-placeholders ב-JSON שלמעלה הם בפורמט הבא:

    • AWS הוא ערך מספרי שמייצג את הפרויקט שלכם ב-Amazon Web Services. לדוגמה, "aws:SourceAccount": "1234567890".
    • RESOURCE הוא מספר משאב של אמזון (ARN) שמזהה את התור הזה. לדוגמה, "Resource": "arn:aws:sqs:us-west-2:01234567890:test".
    • S3_BUCKET_ARN הוא מספר ARN שמזהה את קטגוריית המקור. לדוגמה, "aws:SourceArn": "arn:aws:s3:::example-aws-bucket". אפשר למצוא את ה-ARN של קטגוריה בכרטיסייה מאפיינים בדף הפרטים של הקטגוריה במסוף AWS.
  6. מחליפים את ה-JSON שמוצג בקטע מדיניות הגישה ב-JSON המעודכן שלמעלה.

  7. לוחצים על יצירת תור.

בסיום, רושמים את שם משאב Amazon‏ (ARN) של התור. הפורמט של ה-ARN הוא:

arn:aws:sqs:us-east-1:1234567890:event-queue"

הפעלת התראות בדלי S3

  1. נכנסים לדף S3 במסוף AWS.

  2. ברשימה Buckets, בוחרים את קטגוריית המקור.

  3. לוחצים על הכרטיסייה מאפיינים.

  4. בקטע התראות לגבי אירועים, לוחצים על יצירת התראה לגבי אירוע.

  5. מציינים שם לאירוע.

  6. בקטע Event types (סוגי אירועים), בוחרים באפשרות All object create events (כל האירועים שקשורים ליצירת אובייקטים).

  7. בקטע יעד בוחרים באפשרות תור SQS ובוחרים את התור שיצרתם להעברה הזו.

  8. לוחצים על שמירת השינויים.

הגדרת ההרשאות

פועלים לפי ההוראות במאמר הגדרת גישה למקור: Amazon S3 כדי ליצור מזהה מפתח גישה ומפתח סודי, או תפקיד של זהות מאוחדת.

מחליפים את ה-JSON של ההרשאות המותאמות אישית בטקסט הבא:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:ChangeMessageVisibility",
                "sqs:ReceiveMessage",
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::S3_BUCKET_NAME",
                "arn:aws:s3:::S3_BUCKET_NAME/*",
                "AWS_QUEUE_ARN"
            ]
        }
    ]
}

אחרי שיוצרים את הקבוצה, חשוב לשים לב למידע הבא:

  • רושמים בצד את המזהה של מפתח הגישה ואת המפתח הסודי של המשתמש.
  • לתפקיד של זהות מאוחדת, שימו לב לשם משאב Amazon‏ (ARN), שהפורמט שלו הוא arn:aws:iam::AWS_ACCOUNT:role/ROLE_NAME.

יצירת העברת נתונים

אפשר להשתמש ב-API בארכיטקטורת REST או במסוף Google Cloud כדי ליצור העברה מבוססת-אירועים.

מסוף Cloud

  1. נכנסים לדף Create transfer job במסוף Google Cloud .

    מעבר אל Create transfer job

  2. בוחרים באפשרות Amazon S3 כסוג המקור ובאפשרות Cloud Storage כיעד.

  3. בקטע מצב תזמון בוחרים באפשרות מבוסס-אירועים ולוחצים על השלב הבא.

  4. מזינים את שם קטגוריית S3. שם ה-bucket הוא השם שמופיע במסוף הניהול של AWS. לדוגמה, my-aws-bucket.

  5. בוחרים את שיטת האימות ומזינים את המידע הנדרש, שיצרתם ורשמתם בקטע הקודם.

  6. מזינים את ה-ARN של תור Amazon SQS שיצרתם קודם. הפורמט הוא:

    arn:aws:sqs:us-east-1:1234567890:event-queue"
    
  7. אפשר להגדיר מסננים ואז ללחוץ על השלב הבא.

  8. בוחרים את קטגוריית היעד של Cloud Storage ואת הנתיב (אם רוצים).

  9. אפשר גם להזין שעת התחלה ושעת סיום להעברה. אם לא מציינים זמן, ההעברה תתחיל באופן מיידי ותפעל עד שתופסק באופן ידני.

  10. מציינים את אפשרויות ההעברה הרצויות. מידע נוסף זמין בדף יצירת העברות.

  11. לוחצים על יצירה.

אחרי שיוצרים את העברת הנתונים, היא מתחילה לפעול ו-event listener מחכה להודעות בתור SQS. בדף פרטי המשרה מוצגת פעולה אחת בכל שעה, והוא כולל פרטים על הנתונים שהועברו לכל משרה.

REST

כדי ליצור העברה מבוססת-אירועים באמצעות API בארכיטקטורת REST, שולחים את אובייקט ה-JSON הבא לנקודת הקצה transferJobs.create:

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "awsS3DataSource" {
      "bucketName": "AWS_SOURCE_NAME",
      "roleArn": "arn:aws:iam::1234567891011:role/role_for_federated_auth"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

הערכים eventStreamStartTime ו-eventStreamExpirationTime הם אופציונליים. אם לא מציינים את זמן ההתחלה, ההעברה מתחילה באופן מיידי. אם לא מציינים את זמן הסיום, ההעברה נמשכת עד שמפסיקים אותה ידנית.

ספריות לקוח

Go

מידע על התקנת ספריית הלקוח של Storage Transfer Service והשימוש בה מופיע במאמר ספריות הלקוח של Storage Transfer Service. מידע נוסף מופיע במאמרי העזרה של Storage Transfer Service Go API.

כדי לבצע אימות ב-Storage Transfer Service, אתם צריכים להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.


func createEventDrivenAWSTransfer(w io.Writer, projectID string, s3SourceBucket string, gcsSinkBucket string, sqsQueueARN string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source AWS S3 bucket.
	// s3SourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Amazon Resource Name (ARN) of the AWS SNS queue to subscribe the event driven transfer to.
	// sqsQueueARN := "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue"

	// The AWS access key credential, should be accessed via environment variable for security
	awsAccessKeyID := os.Getenv("AWS_ACCESS_KEY_ID")

	// The AWS secret key credential, should be accessed via environment variable for security
	awsSecretKey := os.Getenv("AWS_SECRET_ACCESS_KEY")

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_AwsS3DataSource{
					AwsS3DataSource: &storagetransferpb.AwsS3Data{
						BucketName: s3SourceBucket,
						AwsAccessKey: &storagetransferpb.AwsAccessKey{
							AccessKeyId:     awsAccessKeyID,
							SecretAccessKey: awsSecretKey,
						}},
				},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: sqsQueueARN},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", s3SourceBucket, gcsSinkBucket, sqsQueueARN, resp.Name)
	return resp, nil
}

Java

מידע על התקנת ספריית הלקוח של Storage Transfer Service והשימוש בה מופיע במאמר ספריות הלקוח של Storage Transfer Service. מידע נוסף מופיע במאמרי העזרה של Storage Transfer Service Java API.

כדי לבצע אימות ב-Storage Transfer Service, אתם צריכים להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenAwsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the source AWS bucket to transfer data from
    String s3SourceBucket = "yourS3SourceBucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-bucket";

    // The ARN of the SQS queue to subscribe to
    String sqsQueueArn = "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue";

    createEventDrivenAwsTransfer(projectId, s3SourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenAwsTransfer(
      String projectId, String s3SourceBucket, String gcsSinkBucket, String sqsQueueArn)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      // The ID used to access your AWS account. Should be accessed via environment variable.
      String awsAccessKeyId = System.getenv("AWS_ACCESS_KEY_ID");

      // The Secret Key used to access your AWS account. Should be accessed via environment
      // variable.
      String awsSecretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY");

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setAwsS3DataSource(
                          TransferTypes.AwsS3Data.newBuilder()
                              .setBucketName(s3SourceBucket)
                              .setAwsAccessKey(
                                  TransferTypes.AwsAccessKey.newBuilder()
                                      .setAccessKeyId(awsAccessKeyId)
                                      .setSecretAccessKey(awsSecretAccessKey))
                              .build())
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(sqsQueueArn).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job from "
              + s3SourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + sqsQueueArn
              + " with name "
              + response.getName());
    }
  }
}

Node.js

מידע על התקנת ספריית הלקוח של Storage Transfer Service והשימוש בה מופיע במאמר ספריות הלקוח של Storage Transfer Service. מידע נוסף מופיע במאמרי העזרה של Storage Transfer Service Node.js API.

כדי לבצע אימות ב-Storage Transfer Service, אתם צריכים להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// AWS S3 source bucket name
// s3SourceBucket = 'my-s3-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The ARN of the SQS queue to subscribe to
// sqsQueueArn = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

// AWS Access Key ID. Should be accessed via environment variable for security.
// awsAccessKeyId = 'AKIA...'

// AWS Secret Access Key. Should be accessed via environment variable for security.
// awsSecretAccessKey = 'HEAoMK2.../...ku8'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks an SQS queue.
 */
async function createEventDrivenAwsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        awsS3DataSource: {
          bucketName: s3SourceBucket,
          awsAccessKey: {
            accessKeyId: awsAccessKeyId,
            secretAccessKey: awsSecretAccessKey,
          },
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: sqsQueueArn,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${s3SourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenAwsTransfer();

Python

מידע על התקנת ספריית הלקוח של Storage Transfer Service והשימוש בה מופיע במאמר ספריות הלקוח של Storage Transfer Service. מידע נוסף מופיע במאמרי העזרה של Storage Transfer Service Python API.

כדי לבצע אימות ב-Storage Transfer Service, אתם צריכים להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.


from google.cloud import storage_transfer


def create_event_driven_aws_transfer(
    project_id: str,
    description: str,
    source_s3_bucket: str,
    sink_gcs_bucket: str,
    sqs_queue_arn: str,
    aws_access_key_id: str,
    aws_secret_access_key: str,
):
    """Create an event driven transfer between two GCS buckets that tracks an AWS SQS queue"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks an SQS queue'

    # AWS S3 source bucket name
    # source_s3_bucket = 'my-s3-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_gcs_bucket = 'my-gcs-destination-bucket'

    # The ARN of the SQS queue to subscribe to
    # pubsub_id = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

    # AWS Access Key ID. Should be accessed via environment variable for security purposes.
    # aws_access_key_id = 'AKIA...'

    # AWS Secret Access Key. Should be accessed via environment variable for security purposes.
    # aws_secret_access_key = 'HEAoMK2.../...ku8'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "aws_s3_data_source": {
                        "bucket_name": source_s3_bucket,
                        "aws_access_key": {
                            "access_key_id": aws_access_key_id,
                            "secret_access_key": aws_secret_access_key,
                        },
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_gcs_bucket,
                    },
                },
                "event_stream": {
                    "name": sqs_queue_arn,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")