Ereignisgesteuerte Übertragungen von AWS S3

Storage Transfer Service kann Ereignisbenachrichtigungen in AWS abrufen, um automatisch Daten, die am Quellspeicherort hinzugefügt oder aktualisiert wurden, in einen Cloud Storage-Bucket zu übertragen. Weitere Informationen zu den Vorteilen von ereignisgesteuerten Übertragungen

Bei ereignisgesteuerten Übertragungen wird auf Amazon S3-Ereignisbenachrichtigungen gewartet, die an Amazon SQS gesendet werden, um zu erfahren, wann Objekte im Quell-Bucket geändert oder hinzugefügt wurden. Das Löschen von Objekten wird nicht erkannt. Wenn Sie ein Objekt in der Quelle löschen, wird das zugehörige Objekt im Ziel-Bucket nicht gelöscht.

Bei ereignisgesteuerten Übertragungen wird immer ein Cloud Storage-Bucket als Ziel verwendet.

Hinweis

Folgen Sie der Anleitung, um die erforderlichen Berechtigungen für Ihren Cloud Storage-Ziel-Bucket zu erteilen:

SQS-Warteschlange erstellen

  1. Rufen Sie in der AWS Console die Seite Simple Queue Service auf.

  2. Klicken Sie auf Warteschlange erstellen.

  3. Geben Sie einen Namen für diese Warteschlange ein.

  4. Wählen Sie im Abschnitt Zugriffsrichtlinie die Option Erweitert aus. Ein JSON-Objekt wird angezeigt.

    Standardmäßige AWS-Regionen

    {
      "Version": "2008-10-17",
      "Id": "\_\_default\_policy\_ID",
      "Statement": [
        {
          "Sid": "\_\_owner\_statement",
          "Effect": "Allow",
          "Principal": {
            "AWS": "01234567890"
          },
          "Action": [
            "SQS:*"
          ],
          "Resource": "arn:aws:sqs:us-west-2:01234567890:test"
        }
      ]
    }

    AWS GovCloud-Regionen

    {
      "Version": "2008-10-17",
      "Id": "\_\_default\_policy\_ID",
      "Statement": [
        {
          "Sid": "\_\_owner\_statement",
          "Effect": "Allow",
          "Principal": {
            "AWS": "01234567890"
          },
          "Action": [
            "SQS:*"
          ],
          "Resource": "arn:aws-us-gov:sqs:us-gov-west-1:01234567890:test"
        }
      ]
    }

    Kopieren Sie die Werte von AWS und Resource. Sie sind für jedes Projekt eindeutig.

  5. Fügen Sie Ihre spezifischen Werte für AWS und Resource aus dem vorherigen Schritt in das folgende JSON-Snippet ein:

    Standardmäßige AWS-Regionen

    {
    "Version": "2012-10-17",
    "Id": "example-ID",
    "Statement": [
      {
        "Sid": "example-statement-ID",
        "Effect": "Allow",
        "Principal": {
          "Service": "s3.amazonaws.com"
        },
        "Action": "SQS:SendMessage",
        "Resource": "RESOURCE",
        "Condition": {
          "StringEquals": {
            "aws:SourceAccount": "AWS"
          },
          "ArnLike": {
            "aws:SourceArn": "arn:aws:s3:::S3_BUCKET_NAME"
          }
        }
      }
    ]
    }

    AWS GovCloud-Regionen

    {
    "Version": "2012-10-17",
    "Id": "example-ID",
    "Statement": [
      {
        "Sid": "example-statement-ID",
        "Effect": "Allow",
        "Principal": {
          "Service": "s3.amazonaws.com"
        },
        "Action": "SQS:SendMessage",
        "Resource": "RESOURCE",
        "Condition": {
          "StringEquals": {
            "aws:SourceAccount": "AWS"
          },
          "ArnLike": {
            "aws:SourceArn": "arn:aws-us-gov:s3:::S3_BUCKET_NAME"
          }
        }
      }
    ]
    }

    Ersetzen Sie S3_BUCKET_NAME durch den Namen des S3-Quell-Buckets.

  6. Kopieren Sie dieses vollständige JSON-Snippet und ersetzen Sie damit das JSON, das im Bereich Zugriffsrichtlinie angezeigt wird.

  7. Klicken Sie auf Warteschlange erstellen.

Notieren Sie sich nach Abschluss den Amazon Resource Name (ARN) der Warteschlange.

Benachrichtigungen für Ihren S3-Bucket aktivieren

  1. Rufen Sie in der AWS Console die Seite S3 auf.

  2. Wählen Sie in der Liste Buckets den Quell-Bucket aus.

  3. Wählen Sie den Tab Eigenschaften aus.

  4. Klicken Sie im Bereich Ereignisbenachrichtigungen auf Ereignisbenachrichtigung erstellen.

  5. Geben Sie einen Namen für dieses Ereignis an.

  6. Wählen Sie im Bereich Ereignistypen die Option Alle Ereignisse zum Erstellen von Objekten aus.

  7. Wählen Sie als Ziel die Option SQS-Warteschlange aus und wählen Sie die Warteschlange aus, die Sie für diese Übertragung erstellt haben.

  8. Klicken Sie auf Änderungen speichern.

Berechtigungen konfigurieren

Folgen Sie der Anleitung unter Zugriff auf eine Quelle konfigurieren: Amazon S3, um entweder eine Zugriffsschlüssel-ID und einen geheimen Schlüssel oder eine Rolle für die Verbundidentität zu erstellen.

Verwenden Sie beim Befolgen der Anleitung den folgenden JSON-Code, wenn Sie aufgefordert werden, eine benutzerdefinierte Rolle oder eine benutzerdefinierte Vertrauensrichtlinie anzugeben:

Standardmäßige AWS-Regionen

{
  "Version": "2012-10-17",
  "Statement": [
      {
          "Effect": "Allow",
          "Action": [
              "sqs:DeleteMessage",
              "sqs:ChangeMessageVisibility",
              "sqs:ReceiveMessage",
              "s3:GetObject",
              "s3:ListBucket"
          ],
          "Resource": [
              "arn:aws:s3:::S3_BUCKET_NAME",
              "arn:aws:s3:::S3_BUCKET_NAME/*",
              "AWS_QUEUE_ARN"
          ]
      }
  ]
}

Notieren Sie sich nach dem Erstellen die folgenden Informationen:

  • Notieren Sie sich für einen Nutzer die Zugriffsschlüssel-ID und den geheimen Schlüssel.
  • Notieren Sie sich für eine föderierte Identitätsrolle den Amazon Resource Name (ARN) im folgenden Format: arn:aws:iam::AWS_ACCOUNT:role/AWS_ROLE_NAME

AWS GovCloud-Regionen

{
  "Version": "2012-10-17",
  "Statement": [
      {
          "Effect": "Allow",
          "Action": [
              "sqs:DeleteMessage",
              "sqs:ChangeMessageVisibility",
              "sqs:ReceiveMessage",
              "s3:GetObject",
              "s3:ListBucket"
          ],
          "Resource": [
              "arn:aws-us-gov:s3:::S3_BUCKET_NAME",
              "arn:aws-us-gov:s3:::S3_BUCKET_NAME/*",
              "AWS_QUEUE_ARN"
          ]
      }
  ]
}

Notieren Sie sich nach dem Erstellen die folgenden Informationen:

  • Notieren Sie sich für einen Nutzer die Zugriffsschlüssel-ID und den geheimen Schlüssel.
  • Notieren Sie sich für eine Federated Identity-Rolle den Amazon Resource Name (ARN) im folgenden Format: arn:aws-us-gov:iam::AWS_ACCOUNT:role/AWS_ROLE_NAME

Übertragungsjob erstellen

Sie können die REST API oder die Google Cloud Console verwenden, um einen ereignisbasierten Übertragungsjob zu erstellen.

Cloud Console

  1. Rufen Sie in der Google Cloud -Console die Seite Übertragungsjob erstellen auf.

    Übertragungsjob erstellen

  2. Wählen Sie Amazon S3 als Quelltyp und Cloud Storage als Ziel aus.

  3. Wählen Sie als Planungsmodus die Option Ereignisgesteuert aus und klicken Sie auf Nächster Schritt.

  4. Geben Sie den Namen Ihres S3-Buckets ein. Der Bucket-Name ist der Name, der in der AWS Management Console angezeigt wird. Beispiel: my-aws-bucket.

  5. Wählen Sie Ihre Authentifizierungsmethode aus und geben Sie die angeforderten Informationen ein, die Sie im vorherigen Abschnitt erstellt und notiert haben.

  6. Geben Sie den ARN der Amazon SQS-Warteschlange ein, die Sie zuvor erstellt haben. Dazu wird eines der folgenden Formate verwendet:

    • Für Standard-AWS-Regionen: arn:aws:sqs:AWS_REGION:AWS_ACCOUNT:AWS_QUEUE_NAME
    • Für AWS GovCloud-Regionen: arn:aws-us-gov:sqs:AWS_REGION:AWS_ACCOUNT:AWS_QUEUE_NAME
  7. Optional: Definieren Sie Filter und klicken Sie dann auf Nächster Schritt.

  8. Wählen Sie den Cloud Storage-Ziel-Bucket und optional den Pfad aus.

  9. Optional: Geben Sie eine Start- und Endzeit für die Übertragung ein. Wenn Sie keine Zeit angeben, beginnt die Übertragung sofort und wird fortgesetzt, bis sie manuell beendet wird.

  10. Geben Sie Übertragungsoptionen an. Weitere Informationen finden Sie auf der Seite Übertragungen erstellen.

  11. Klicken Sie auf Erstellen.

Nach der Erstellung wird der Übertragungsjob ausgeführt und ein Event-Listener wartet auf Benachrichtigungen in der SQS-Warteschlange. Auf der Detailseite für Jobs wird für jede Stunde ein Vorgang angezeigt. Sie enthält Details zu den für jeden Job übertragenen Daten.

REST

Senden Sie das folgende JSON-Objekt an den Endpunkt transferJobs.create, um eine ereignisgesteuerte Übertragung mit der REST API zu erstellen:

{
"description": "DESCRIPTION",
"status": "ENABLED",
"projectId": "PROJECT_ID",
"transferSpec": {
  "awsS3DataSource": {
    "bucketName": "S3_BUCKET_NAME",
    "roleArn": "AWS_ROLE_ARN"
  },
  "gcsDataSink": {
    "bucketName": "GCS_BUCKET_NAME"
  }
},
"eventStream": {
  "name": "AWS_QUEUE_ARN",
  "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
  "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
}
}

Die Platzhalter im vorherigen JSON verwenden die folgenden Werte:

  • DESCRIPTION ist eine Beschreibung des Übertragungsjobs.
  • PROJECT_ID ist die ID des Google Cloud-Projekts, in dem der Übertragungsjob erstellt wird.
  • S3_BUCKET_NAME ist der Name des Amazon S3-Quell-Buckets.
  • AWS_ROLE_ARN ist der ARN der Rolle für föderierte Identitäten, die Sie erstellt haben. Beispiel: arn:aws:iam::1234567891011:role/aws-role-name für Standard-AWS-Regionen oder arn:aws-us-gov:iam::1234567891011:role/aws-role-name für AWS GovCloud-Regionen.
  • GCS_BUCKET_NAME ist der Name des Cloud Storage-Ziel-Buckets.
  • AWS_QUEUE_ARN ist der ARN der SQS-Warteschlange. Beispiel: arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue für Standard-AWS-Regionen oder arn:aws-us-gov:sqs:us-gov-east-1:1234567890:event-queue für AWS GovCloud-Regionen.

Die Parameter eventStreamStartTime und eventStreamExpirationTime sind optional. Wenn die Startzeit weggelassen wird, beginnt die Übertragung sofort. Wenn die Endzeit weggelassen wird, wird die Übertragung fortgesetzt, bis sie manuell beendet wird.

Clientbibliotheken

Go

Informationen zum Installieren und Verwenden der Clientbibliothek für Storage Transfer Service finden Sie unter Storage Transfer Service-Clientbibliotheken. Weitere Informationen finden Sie in der Referenzdokumentation zur Storage Transfer Service Go API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich beim Storage Transfer Service zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


func createEventDrivenAWSTransfer(w io.Writer, projectID string, s3SourceBucket string, gcsSinkBucket string, sqsQueueARN string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source AWS S3 bucket.
	// s3SourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Amazon Resource Name (ARN) of the AWS SNS queue to subscribe the event driven transfer to.
	// sqsQueueARN := "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue"

	// The AWS access key credential, should be accessed via environment variable for security
	awsAccessKeyID := os.Getenv("AWS_ACCESS_KEY_ID")

	// The AWS secret key credential, should be accessed via environment variable for security
	awsSecretKey := os.Getenv("AWS_SECRET_ACCESS_KEY")

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_AwsS3DataSource{
					AwsS3DataSource: &storagetransferpb.AwsS3Data{
						BucketName: s3SourceBucket,
						AwsAccessKey: &storagetransferpb.AwsAccessKey{
							AccessKeyId:     awsAccessKeyID,
							SecretAccessKey: awsSecretKey,
						}},
				},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: sqsQueueARN},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", s3SourceBucket, gcsSinkBucket, sqsQueueARN, resp.Name)
	return resp, nil
}

Java

Informationen zum Installieren und Verwenden der Clientbibliothek für Storage Transfer Service finden Sie unter Storage Transfer Service-Clientbibliotheken. Weitere Informationen finden Sie in der Referenzdokumentation zur Storage Transfer Service Java API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich beim Storage Transfer Service zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenAwsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the source AWS bucket to transfer data from
    String s3SourceBucket = "yourS3SourceBucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-bucket";

    // The ARN of the SQS queue to subscribe to
    String sqsQueueArn = "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue";

    createEventDrivenAwsTransfer(projectId, s3SourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenAwsTransfer(
      String projectId, String s3SourceBucket, String gcsSinkBucket, String sqsQueueArn)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      // The ID used to access your AWS account. Should be accessed via environment variable.
      String awsAccessKeyId = System.getenv("AWS_ACCESS_KEY_ID");

      // The Secret Key used to access your AWS account. Should be accessed via environment
      // variable.
      String awsSecretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY");

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setAwsS3DataSource(
                          TransferTypes.AwsS3Data.newBuilder()
                              .setBucketName(s3SourceBucket)
                              .setAwsAccessKey(
                                  TransferTypes.AwsAccessKey.newBuilder()
                                      .setAccessKeyId(awsAccessKeyId)
                                      .setSecretAccessKey(awsSecretAccessKey))
                              .build())
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(sqsQueueArn).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job from "
              + s3SourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + sqsQueueArn
              + " with name "
              + response.getName());
    }
  }
}

Node.js

Informationen zum Installieren und Verwenden der Clientbibliothek für Storage Transfer Service finden Sie unter Storage Transfer Service-Clientbibliotheken. Weitere Informationen finden Sie in der Referenzdokumentation zur Storage Transfer Service Node.js API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich beim Storage Transfer Service zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// AWS S3 source bucket name
// s3SourceBucket = 'my-s3-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The ARN of the SQS queue to subscribe to
// sqsQueueArn = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

// AWS Access Key ID. Should be accessed via environment variable for security.
// awsAccessKeyId = 'AKIA...'

// AWS Secret Access Key. Should be accessed via environment variable for security.
// awsSecretAccessKey = 'HEAoMK2.../...ku8'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks an SQS queue.
 */
async function createEventDrivenAwsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        awsS3DataSource: {
          bucketName: s3SourceBucket,
          awsAccessKey: {
            accessKeyId: awsAccessKeyId,
            secretAccessKey: awsSecretAccessKey,
          },
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: sqsQueueArn,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${s3SourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenAwsTransfer();

Python

Informationen zum Installieren und Verwenden der Clientbibliothek für Storage Transfer Service finden Sie unter Storage Transfer Service-Clientbibliotheken. Weitere Informationen finden Sie in der Referenzdokumentation zur Storage Transfer Service Python API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich beim Storage Transfer Service zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


from google.cloud import storage_transfer


def create_event_driven_aws_transfer(
    project_id: str,
    description: str,
    source_s3_bucket: str,
    sink_gcs_bucket: str,
    sqs_queue_arn: str,
    aws_access_key_id: str,
    aws_secret_access_key: str,
):
    """Create an event driven transfer between two GCS buckets that tracks an AWS SQS queue"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks an SQS queue'

    # AWS S3 source bucket name
    # source_s3_bucket = 'my-s3-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_gcs_bucket = 'my-gcs-destination-bucket'

    # The ARN of the SQS queue to subscribe to
    # pubsub_id = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

    # AWS Access Key ID. Should be accessed via environment variable for security purposes.
    # aws_access_key_id = 'AKIA...'

    # AWS Secret Access Key. Should be accessed via environment variable for security purposes.
    # aws_secret_access_key = 'HEAoMK2.../...ku8'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "aws_s3_data_source": {
                        "bucket_name": source_s3_bucket,
                        "aws_access_key": {
                            "access_key_id": aws_access_key_id,
                            "secret_access_key": aws_secret_access_key,
                        },
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_gcs_bucket,
                    },
                },
                "event_stream": {
                    "name": sqs_queue_arn,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")