Transferts basés sur des événements depuis AWS S3

Le service de transfert de stockage peut écouter les notifications d'événements dans AWS pour transférer automatiquement les données qui ont été ajoutées ou mises à jour dans l'emplacement source vers un bucket Cloud Storage. En savoir plus sur les avantages des transferts basés sur des événements

Les transferts basés sur des événements écoutent les notifications d'événement Amazon S3 envoyées à Amazon SQS pour savoir quand des objets du bucket source ont été modifiés ou ajoutés. Les suppressions d'objets ne sont pas détectées. La suppression d'un objet à la source n'entraîne pas la suppression de l'objet associé dans le bucket de destination.

Les transferts basés sur les événements utilisent toujours un bucket Cloud Storage comme destination.

Avant de commencer

Suivez les instructions pour accorder les autorisations requises sur votre bucket Cloud Storage de destination :

Créer une file d'attente SQS

  1. Dans la console AWS, accédez à la page Simple Queue Service.

  2. Cliquez sur Créer une file d'attente.

  3. Saisissez un nom pour cette file d'attente.

  4. Dans la section Règle d'accès, sélectionnez Avancée. Un objet JSON s'affiche.

    Régions AWS standards

    {
      "Version": "2008-10-17",
      "Id": "\_\_default\_policy\_ID",
      "Statement": [
        {
          "Sid": "\_\_owner\_statement",
          "Effect": "Allow",
          "Principal": {
            "AWS": "01234567890"
          },
          "Action": [
            "SQS:*"
          ],
          "Resource": "arn:aws:sqs:us-west-2:01234567890:test"
        }
      ]
    }

    Régions AWS GovCloud

    {
      "Version": "2008-10-17",
      "Id": "\_\_default\_policy\_ID",
      "Statement": [
        {
          "Sid": "\_\_owner\_statement",
          "Effect": "Allow",
          "Principal": {
            "AWS": "01234567890"
          },
          "Action": [
            "SQS:*"
          ],
          "Resource": "arn:aws-us-gov:sqs:us-gov-west-1:01234567890:test"
        }
      ]
    }

    Copiez les valeurs de AWS et Resource. Ils sont uniques pour chaque projet.

  5. Collez vos valeurs spécifiques de AWS et Resource de l'étape précédente dans l'extrait JSON suivant :

    Régions AWS standards

    {
    "Version": "2012-10-17",
    "Id": "example-ID",
    "Statement": [
      {
        "Sid": "example-statement-ID",
        "Effect": "Allow",
        "Principal": {
          "Service": "s3.amazonaws.com"
        },
        "Action": "SQS:SendMessage",
        "Resource": "RESOURCE",
        "Condition": {
          "StringEquals": {
            "aws:SourceAccount": "AWS"
          },
          "ArnLike": {
            "aws:SourceArn": "arn:aws:s3:::S3_BUCKET_NAME"
          }
        }
      }
    ]
    }

    Régions AWS GovCloud

    {
    "Version": "2012-10-17",
    "Id": "example-ID",
    "Statement": [
      {
        "Sid": "example-statement-ID",
        "Effect": "Allow",
        "Principal": {
          "Service": "s3.amazonaws.com"
        },
        "Action": "SQS:SendMessage",
        "Resource": "RESOURCE",
        "Condition": {
          "StringEquals": {
            "aws:SourceAccount": "AWS"
          },
          "ArnLike": {
            "aws:SourceArn": "arn:aws-us-gov:s3:::S3_BUCKET_NAME"
          }
        }
      }
    ]
    }

    Remplacez S3_BUCKET_NAME par le nom du bucket source S3.

  6. Copiez cet extrait JSON complet et utilisez-le pour remplacer le code JSON affiché dans la section Règle d'accès.

  7. Cliquez sur Créer une file d'attente.

Une fois l'opération terminée, notez le nom de ressource Amazon (ARN) de la file d'attente.

Activez les notifications sur votre bucket S3.

  1. Dans la console AWS, accédez à la page S3.

  2. Dans la liste Buckets, sélectionnez votre bucket source.

  3. Sélectionnez l'onglet Propriétés.

  4. Dans la section Notifications relatives à un événement, cliquez sur Créer une notification d'événement.

  5. Spécifiez un nom pour cet événement.

  6. Dans la section Types d'événements, sélectionnez Tous les événements de création d'objet.

  7. Dans Destination, sélectionnez File d'attente SQS, puis la file d'attente que vous avez créée pour ce transfert.

  8. Cliquez sur Enregistrer les modifications.

Configurer les autorisations

Suivez les instructions de la section Configurer l'accès à une source : Amazon S3 pour créer un ID de clé d'accès et une clé secrète, ou un rôle d'identité fédérée.

Lorsque vous suivez les instructions, utilisez le code JSON suivant lorsque vous êtes invité à spécifier un rôle personnalisé ou une règle de confiance personnalisée :

Régions AWS standards

{
  "Version": "2012-10-17",
  "Statement": [
      {
          "Effect": "Allow",
          "Action": [
              "sqs:DeleteMessage",
              "sqs:ChangeMessageVisibility",
              "sqs:ReceiveMessage",
              "s3:GetObject",
              "s3:ListBucket"
          ],
          "Resource": [
              "arn:aws:s3:::S3_BUCKET_NAME",
              "arn:aws:s3:::S3_BUCKET_NAME/*",
              "AWS_QUEUE_ARN"
          ]
      }
  ]
}

Une fois le compte créé, notez les informations suivantes :

  • Pour un utilisateur, notez l'ID de clé d'accès et la clé secrète.
  • Pour un rôle d'identité fédérée, notez le nom de ressource Amazon (ARN), qui se présente au format suivant : arn:aws:iam::AWS_ACCOUNT:role/AWS_ROLE_NAME

Régions AWS GovCloud

{
  "Version": "2012-10-17",
  "Statement": [
      {
          "Effect": "Allow",
          "Action": [
              "sqs:DeleteMessage",
              "sqs:ChangeMessageVisibility",
              "sqs:ReceiveMessage",
              "s3:GetObject",
              "s3:ListBucket"
          ],
          "Resource": [
              "arn:aws-us-gov:s3:::S3_BUCKET_NAME",
              "arn:aws-us-gov:s3:::S3_BUCKET_NAME/*",
              "AWS_QUEUE_ARN"
          ]
      }
  ]
}

Une fois le compte créé, notez les informations suivantes :

  • Pour un utilisateur, notez l'ID de clé d'accès et la clé secrète.
  • Pour un rôle d'identité fédérée, notez le nom de ressource Amazon (ARN), qui se présente au format suivant : arn:aws-us-gov:iam::AWS_ACCOUNT:role/AWS_ROLE_NAME.

Créer un job de transfert

Vous pouvez utiliser l'API REST ou la console Google Cloud pour créer un job de transfert basé sur des événements.

console Cloud

  1. Accédez à la page Créer un job de transfert dans la console Google Cloud .

    Accédez à Créer une tâche de transfert.

  2. Sélectionnez Amazon S3 comme type de source et Cloud Storage comme destination.

  3. Dans Scheduling mode (Mode de planification), sélectionnez Event-driven (Piloté par les événements), puis cliquez sur Next step (Étape suivante).

  4. Saisissez le nom de votre bucket S3. Le nom du bucket est celui qui apparaît dans AWS Management Console. Exemple : my-aws-bucket.

  5. Sélectionnez votre méthode d'authentification et saisissez les informations demandées, que vous avez créées et notées dans la section précédente.

  6. Saisissez l'ARN de la file d'attente Amazon SQS que vous avez créée précédemment. Il utilise l'un des formats suivants :

    • Pour les régions AWS standards : arn:aws:sqs:AWS_REGION:AWS_ACCOUNT:AWS_QUEUE_NAME
    • Pour les régions AWS GovCloud : arn:aws-us-gov:sqs:AWS_REGION:AWS_ACCOUNT:AWS_QUEUE_NAME
  7. Vous pouvez également définir des filtres, puis cliquer sur Étape suivante.

  8. Sélectionnez le bucket Cloud Storage de destination et, éventuellement, le chemin d'accès.

  9. Vous pouvez également saisir une heure de début et de fin pour le transfert. Si vous ne spécifiez pas d'heure, le transfert commencera immédiatement et s'exécutera jusqu'à ce qu'il soit arrêté manuellement.

  10. Spécifiez les options de transfert. Pour en savoir plus, consultez la page Créer des transferts.

  11. Cliquez sur Créer.

Une fois la tâche de transfert créée, elle commence à s'exécuter et un écouteur d'événements attend les notifications dans la file d'attente SQS. La page d'informations sur les tâches affiche une opération par heure et inclut des informations sur les données transférées pour chaque tâche.

REST

Pour créer un transfert basé sur des événements à l'aide de l'API REST, envoyez l'objet JSON suivant au point de terminaison transferJobs.create :

{
"description": "DESCRIPTION",
"status": "ENABLED",
"projectId": "PROJECT_ID",
"transferSpec": {
  "awsS3DataSource": {
    "bucketName": "S3_BUCKET_NAME",
    "roleArn": "AWS_ROLE_ARN"
  },
  "gcsDataSink": {
    "bucketName": "GCS_BUCKET_NAME"
  }
},
"eventStream": {
  "name": "AWS_QUEUE_ARN",
  "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
  "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
}
}

Les espaces réservés dans le JSON précédent utilisent les valeurs suivantes :

  • DESCRIPTION est une description de la tâche de transfert.
  • PROJECT_ID est l'ID du projet Google Cloud dans lequel le job de transfert est créé.
  • S3_BUCKET_NAME est le nom du bucket source Amazon S3.
  • AWS_ROLE_ARN est l'ARN du rôle d'identité fédérée que vous avez créé. Par exemple, arn:aws:iam::1234567891011:role/aws-role-name pour les régions AWS standards ou arn:aws-us-gov:iam::1234567891011:role/aws-role-name pour les régions AWS GovCloud.
  • GCS_BUCKET_NAME est le nom du bucket Cloud Storage de destination.
  • AWS_QUEUE_ARN est l'ARN de la file d'attente SQS. Par exemple, arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue pour les régions AWS standards ou arn:aws-us-gov:sqs:us-gov-east-1:1234567890:event-queue pour les régions AWS GovCloud.

Les éléments eventStreamStartTime et eventStreamExpirationTime sont facultatifs. Si l'heure de début est omise, le transfert commence immédiatement. Si l'heure de fin est omise, le transfert se poursuit jusqu'à ce qu'il soit arrêté manuellement.

Bibliothèques clientes

Go

Pour savoir comment installer et utiliser la bibliothèque cliente pour le service de transfert de stockage, consultez la page Bibliothèques clientes du service de transfert de stockage. Pour en savoir plus, consultez la documentation de référence de l'API Go du service de transfert de stockage.

Pour vous authentifier auprès du service de transfert de stockage, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


func createEventDrivenAWSTransfer(w io.Writer, projectID string, s3SourceBucket string, gcsSinkBucket string, sqsQueueARN string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source AWS S3 bucket.
	// s3SourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Amazon Resource Name (ARN) of the AWS SNS queue to subscribe the event driven transfer to.
	// sqsQueueARN := "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue"

	// The AWS access key credential, should be accessed via environment variable for security
	awsAccessKeyID := os.Getenv("AWS_ACCESS_KEY_ID")

	// The AWS secret key credential, should be accessed via environment variable for security
	awsSecretKey := os.Getenv("AWS_SECRET_ACCESS_KEY")

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_AwsS3DataSource{
					AwsS3DataSource: &storagetransferpb.AwsS3Data{
						BucketName: s3SourceBucket,
						AwsAccessKey: &storagetransferpb.AwsAccessKey{
							AccessKeyId:     awsAccessKeyID,
							SecretAccessKey: awsSecretKey,
						}},
				},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: sqsQueueARN},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", s3SourceBucket, gcsSinkBucket, sqsQueueARN, resp.Name)
	return resp, nil
}

Java

Pour savoir comment installer et utiliser la bibliothèque cliente pour le service de transfert de stockage, consultez la page Bibliothèques clientes du service de transfert de stockage. Pour en savoir plus, consultez la documentation de référence de l'API Java du service de transfert de stockage.

Pour vous authentifier auprès du service de transfert de stockage, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenAwsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the source AWS bucket to transfer data from
    String s3SourceBucket = "yourS3SourceBucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-bucket";

    // The ARN of the SQS queue to subscribe to
    String sqsQueueArn = "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue";

    createEventDrivenAwsTransfer(projectId, s3SourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenAwsTransfer(
      String projectId, String s3SourceBucket, String gcsSinkBucket, String sqsQueueArn)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      // The ID used to access your AWS account. Should be accessed via environment variable.
      String awsAccessKeyId = System.getenv("AWS_ACCESS_KEY_ID");

      // The Secret Key used to access your AWS account. Should be accessed via environment
      // variable.
      String awsSecretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY");

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setAwsS3DataSource(
                          TransferTypes.AwsS3Data.newBuilder()
                              .setBucketName(s3SourceBucket)
                              .setAwsAccessKey(
                                  TransferTypes.AwsAccessKey.newBuilder()
                                      .setAccessKeyId(awsAccessKeyId)
                                      .setSecretAccessKey(awsSecretAccessKey))
                              .build())
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(sqsQueueArn).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job from "
              + s3SourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + sqsQueueArn
              + " with name "
              + response.getName());
    }
  }
}

Node.js

Pour savoir comment installer et utiliser la bibliothèque cliente pour le service de transfert de stockage, consultez la page Bibliothèques clientes du service de transfert de stockage. Pour en savoir plus, consultez la documentation de référence de l'API Node.js du service de transfert de stockage.

Pour vous authentifier auprès du service de transfert de stockage, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// AWS S3 source bucket name
// s3SourceBucket = 'my-s3-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The ARN of the SQS queue to subscribe to
// sqsQueueArn = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

// AWS Access Key ID. Should be accessed via environment variable for security.
// awsAccessKeyId = 'AKIA...'

// AWS Secret Access Key. Should be accessed via environment variable for security.
// awsSecretAccessKey = 'HEAoMK2.../...ku8'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks an SQS queue.
 */
async function createEventDrivenAwsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        awsS3DataSource: {
          bucketName: s3SourceBucket,
          awsAccessKey: {
            accessKeyId: awsAccessKeyId,
            secretAccessKey: awsSecretAccessKey,
          },
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: sqsQueueArn,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${s3SourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenAwsTransfer();

Python

Pour savoir comment installer et utiliser la bibliothèque cliente pour le service de transfert de stockage, consultez la page Bibliothèques clientes du service de transfert de stockage. Pour en savoir plus, consultez la documentation de référence de l'API Python du service de transfert de stockage.

Pour vous authentifier auprès du service de transfert de stockage, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


from google.cloud import storage_transfer


def create_event_driven_aws_transfer(
    project_id: str,
    description: str,
    source_s3_bucket: str,
    sink_gcs_bucket: str,
    sqs_queue_arn: str,
    aws_access_key_id: str,
    aws_secret_access_key: str,
):
    """Create an event driven transfer between two GCS buckets that tracks an AWS SQS queue"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks an SQS queue'

    # AWS S3 source bucket name
    # source_s3_bucket = 'my-s3-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_gcs_bucket = 'my-gcs-destination-bucket'

    # The ARN of the SQS queue to subscribe to
    # pubsub_id = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

    # AWS Access Key ID. Should be accessed via environment variable for security purposes.
    # aws_access_key_id = 'AKIA...'

    # AWS Secret Access Key. Should be accessed via environment variable for security purposes.
    # aws_secret_access_key = 'HEAoMK2.../...ku8'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "aws_s3_data_source": {
                        "bucket_name": source_s3_bucket,
                        "aws_access_key": {
                            "access_key_id": aws_access_key_id,
                            "secret_access_key": aws_secret_access_key,
                        },
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_gcs_bucket,
                    },
                },
                "event_stream": {
                    "name": sqs_queue_arn,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")