Transferências baseadas em eventos do AWS S3

O Storage Transfer Service pode ouvir notificações de eventos na AWS para transferir automaticamente dados que foram adicionados ou atualizados na localização de origem para um contentor do Cloud Storage. Saiba mais sobre as vantagens das transferências baseadas em eventos.

As transferências orientadas por eventos monitorizam as notificações de eventos do Amazon S3 enviadas para o Amazon SQS para saber quando os objetos no contentor de origem foram modificados ou adicionados. As eliminações de objetos não são detetadas. Se eliminar um objeto na origem, não elimina o objeto associado no contentor de destino.

As transferências baseadas em eventos usam sempre um contentor do Cloud Storage como destino.

Antes de começar

Siga as instruções para conceder as autorizações necessárias no contentor do Cloud Storage de destino:

Crie uma fila SQS

  1. Na consola da AWS, aceda à página Simple Queue Service.

  2. Clique em Criar fila.

  3. Introduza um nome para esta fila.

  4. Na secção Política de acesso, selecione Avançadas. É apresentado um objeto JSON:

     {
        "Version": "2008-10-17",
        "Id": "__default_policy_ID",
        "Statement": [
          {
            "Sid": "__owner_statement",
            "Effect": "Allow",
            "Principal": {
              "AWS": "01234567890"
            },
            "Action": [
              "SQS:*"
            ],
            "Resource": "arn:aws:sqs:us-west-2:01234567890:test"
          }
        ]
      }
      

    Os valores de AWS e Resource são únicos para cada projeto.

  5. Copie os valores específicos de AWS e Resource do JSON apresentado para o seguinte fragmento JSON:

    {
      "Version": "2012-10-17",
      "Id": "example-ID",
      "Statement": [
        {
          "Sid": "example-statement-ID",
          "Effect": "Allow",
          "Principal": {
            "Service": "s3.amazonaws.com"
          },
          "Action": "SQS:SendMessage",
          "Resource": "RESOURCE",
          "Condition": {
            "StringEquals": {
              "aws:SourceAccount": "AWS"
            },
            "ArnLike": {
              "aws:SourceArn": "S3_BUCKET_ARN"
            }
          }
        }
      ]
    }

    Os valores dos marcadores de posição no JSON anterior usam o seguinte formato:

    • AWS é um valor numérico que representa o seu projeto dos Amazon Web Services. Por exemplo, "aws:SourceAccount": "1234567890".
    • RESOURCE é um número de recurso da Amazon (ARN) que identifica esta fila. Por exemplo, "Resource": "arn:aws:sqs:us-west-2:01234567890:test".
    • S3_BUCKET_ARN é um ARN que identifica o contentor de origem. Por exemplo, "aws:SourceArn": "arn:aws:s3:::example-aws-bucket". Pode encontrar o ARN de um contentor no separador Propriedades da página de detalhes do contentor na consola da AWS.
  6. Substitua o JSON apresentado na secção Política de acesso pelo JSON atualizado acima.

  7. Clique em Criar fila.

Quando terminar, tome nota do Nome do recurso da Amazon (ARN) da fila. O ARN tem o seguinte formato:

arn:aws:sqs:us-east-1:1234567890:event-queue"

Ative as notificações no seu contentor S3

  1. Na consola do AWS, aceda à página S3.

  2. Na lista Recipientes, selecione o recipiente de origem.

  3. Selecione o separador Propriedades.

  4. Na secção Notificações de eventos, clique em Criar notificação de evento.

  5. Especifique um nome para este evento.

  6. Na secção Tipos de eventos, selecione Todos os eventos de criação de objetos.

  7. Como Destino, selecione Fila SQS e selecione a fila que criou para esta transferência.

  8. Clique em Guardar alterações.

Configure autorizações

Siga as instruções em Configure o acesso a uma origem: Amazon S3 para criar um ID da chave de acesso e uma chave secreta, ou uma função de identidade federada.

Substitua o JSON de autorizações personalizadas pelo seguinte:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:ChangeMessageVisibility",
                "sqs:ReceiveMessage",
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::S3_BUCKET_NAME",
                "arn:aws:s3:::S3_BUCKET_NAME/*",
                "AWS_QUEUE_ARN"
            ]
        }
    ]
}

Depois de criado, anote as seguintes informações:

  • Para um utilizador, anote o ID da chave de acesso e a chave secreta.
  • Para uma função de identidade federada, tenha em atenção o nome de recurso da Amazon (ARN), que tem o formato arn:aws:iam::AWS_ACCOUNT:role/ROLE_NAME.

Crie uma tarefa de transferência

Pode usar a API REST ou a Google Cloud consola para criar uma tarefa de transferência baseada em eventos.

Cloud Console

  1. Aceda à página Criar tarefa de transferência na Google Cloud consola.

    Aceda a Criar tarefa de transferência

  2. Selecione Amazon S3 como o tipo de origem e armazenamento na nuvem como o destino.

  3. No Modo de agendamento, selecione Orientado por eventos e clique em Passo seguinte.

  4. Introduza o nome do contentor do S3. O nome do contentor é o nome apresentado na AWS Management Console. Por exemplo, my-aws-bucket.

  5. Selecione o seu método de autenticação e introduza as informações pedidas, que criou e anotou na secção anterior.

  6. Introduza o ARN da fila do Amazon SQS que criou anteriormente. Usa o seguinte formato:

    arn:aws:sqs:us-east-1:1234567890:event-queue"
    
  7. Opcionalmente, defina filtros e, de seguida, clique em Passo seguinte.

  8. Selecione o contentor do Cloud Storage de destino e, opcionalmente, o caminho.

  9. Opcionalmente, introduza uma hora de início e fim para a transferência. Se não especificar uma hora, a transferência começa imediatamente e é executada até ser interrompida manualmente.

  10. Especifique as opções de transferência. Estão disponíveis mais informações na página Crie transferências.

  11. Clique em Criar.

Depois de criado, o trabalho de transferência começa a ser executado e um ouvinte de eventos aguarda notificações na fila SQS. A página de detalhes do trabalho mostra uma operação por hora e inclui detalhes sobre os dados transferidos para cada trabalho.

REST

Para criar uma transferência orientada por eventos através da API REST, envie o seguinte objeto JSON para o ponto final transferJobs.create:

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "awsS3DataSource" {
      "bucketName": "AWS_SOURCE_NAME",
      "roleArn": "arn:aws:iam::1234567891011:role/role_for_federated_auth"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

O eventStreamStartTime e o eventStreamExpirationTime são opcionais. Se a hora de início for omitida, a transferência começa imediatamente. Se a hora de fim for omitida, a transferência continua até ser interrompida manualmente.

Bibliotecas cliente

Go

Para saber como instalar e usar a biblioteca cliente do Serviço de transferência de armazenamento, consulte o artigo Bibliotecas cliente do Serviço de transferência de armazenamento. Para mais informações, consulte a documentação de referência da API Go do Storage Transfer Service.

Para se autenticar no serviço de transferência de armazenamento, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.


func createEventDrivenAWSTransfer(w io.Writer, projectID string, s3SourceBucket string, gcsSinkBucket string, sqsQueueARN string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source AWS S3 bucket.
	// s3SourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Amazon Resource Name (ARN) of the AWS SNS queue to subscribe the event driven transfer to.
	// sqsQueueARN := "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue"

	// The AWS access key credential, should be accessed via environment variable for security
	awsAccessKeyID := os.Getenv("AWS_ACCESS_KEY_ID")

	// The AWS secret key credential, should be accessed via environment variable for security
	awsSecretKey := os.Getenv("AWS_SECRET_ACCESS_KEY")

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_AwsS3DataSource{
					AwsS3DataSource: &storagetransferpb.AwsS3Data{
						BucketName: s3SourceBucket,
						AwsAccessKey: &storagetransferpb.AwsAccessKey{
							AccessKeyId:     awsAccessKeyID,
							SecretAccessKey: awsSecretKey,
						}},
				},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: sqsQueueARN},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", s3SourceBucket, gcsSinkBucket, sqsQueueARN, resp.Name)
	return resp, nil
}

Java

Para saber como instalar e usar a biblioteca cliente do Serviço de transferência de armazenamento, consulte o artigo Bibliotecas cliente do Serviço de transferência de armazenamento. Para mais informações, consulte a documentação de referência da API Java do Storage Transfer Service.

Para se autenticar no serviço de transferência de armazenamento, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenAwsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the source AWS bucket to transfer data from
    String s3SourceBucket = "yourS3SourceBucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-bucket";

    // The ARN of the SQS queue to subscribe to
    String sqsQueueArn = "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue";

    createEventDrivenAwsTransfer(projectId, s3SourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenAwsTransfer(
      String projectId, String s3SourceBucket, String gcsSinkBucket, String sqsQueueArn)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      // The ID used to access your AWS account. Should be accessed via environment variable.
      String awsAccessKeyId = System.getenv("AWS_ACCESS_KEY_ID");

      // The Secret Key used to access your AWS account. Should be accessed via environment
      // variable.
      String awsSecretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY");

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setAwsS3DataSource(
                          TransferTypes.AwsS3Data.newBuilder()
                              .setBucketName(s3SourceBucket)
                              .setAwsAccessKey(
                                  TransferTypes.AwsAccessKey.newBuilder()
                                      .setAccessKeyId(awsAccessKeyId)
                                      .setSecretAccessKey(awsSecretAccessKey))
                              .build())
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(sqsQueueArn).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job from "
              + s3SourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + sqsQueueArn
              + " with name "
              + response.getName());
    }
  }
}

Node.js

Para saber como instalar e usar a biblioteca cliente do Serviço de transferência de armazenamento, consulte o artigo Bibliotecas cliente do Serviço de transferência de armazenamento. Para mais informações, consulte a documentação de referência da API Node.js do Storage Transfer Service.

Para se autenticar no serviço de transferência de armazenamento, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// AWS S3 source bucket name
// s3SourceBucket = 'my-s3-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The ARN of the SQS queue to subscribe to
// sqsQueueArn = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

// AWS Access Key ID. Should be accessed via environment variable for security.
// awsAccessKeyId = 'AKIA...'

// AWS Secret Access Key. Should be accessed via environment variable for security.
// awsSecretAccessKey = 'HEAoMK2.../...ku8'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks an SQS queue.
 */
async function createEventDrivenAwsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        awsS3DataSource: {
          bucketName: s3SourceBucket,
          awsAccessKey: {
            accessKeyId: awsAccessKeyId,
            secretAccessKey: awsSecretAccessKey,
          },
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: sqsQueueArn,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${s3SourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenAwsTransfer();

Python

Para saber como instalar e usar a biblioteca cliente do Serviço de transferência de armazenamento, consulte o artigo Bibliotecas cliente do Serviço de transferência de armazenamento. Para mais informações, consulte a documentação de referência da API Python do Storage Transfer Service.

Para se autenticar no serviço de transferência de armazenamento, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.


from google.cloud import storage_transfer


def create_event_driven_aws_transfer(
    project_id: str,
    description: str,
    source_s3_bucket: str,
    sink_gcs_bucket: str,
    sqs_queue_arn: str,
    aws_access_key_id: str,
    aws_secret_access_key: str,
):
    """Create an event driven transfer between two GCS buckets that tracks an AWS SQS queue"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks an SQS queue'

    # AWS S3 source bucket name
    # source_s3_bucket = 'my-s3-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_gcs_bucket = 'my-gcs-destination-bucket'

    # The ARN of the SQS queue to subscribe to
    # pubsub_id = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

    # AWS Access Key ID. Should be accessed via environment variable for security purposes.
    # aws_access_key_id = 'AKIA...'

    # AWS Secret Access Key. Should be accessed via environment variable for security purposes.
    # aws_secret_access_key = 'HEAoMK2.../...ku8'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "aws_s3_data_source": {
                        "bucket_name": source_s3_bucket,
                        "aws_access_key": {
                            "access_key_id": aws_access_key_id,
                            "secret_access_key": aws_secret_access_key,
                        },
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_gcs_bucket,
                    },
                },
                "event_stream": {
                    "name": sqs_queue_arn,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")