Transferências baseadas em eventos do AWS S3

O Serviço de transferência do Cloud Storage pode detectar notificações de eventos na AWS para transferir automaticamente dados que foram adicionados ou atualizados no local de origem para um bucket do Cloud Storage. Saiba mais sobre os benefícios das transferências baseadas em eventos.

As transferências baseadas em eventos detectam notificações de eventos do Amazon S3 enviadas ao Amazon SQS para saber quando os objetos no bucket de origem foram modificados ou adicionados. As exclusões de objetos não são detectadas. A exclusão de um objeto na origem não exclui o objeto associado no bucket de destino.

As transferências baseadas em eventos sempre usam um bucket do Cloud Storage como destino.

Antes de começar

Siga as instruções para conceder as permissões necessárias no bucket do Cloud Storage de destino:

Criar uma fila do SQS

  1. No console da AWS, acesse a página Simple Queue Service.

  2. Clique em Criar fila.

  3. Insira um Nome para essa fila.

  4. Na seção Política de acesso, selecione Avançado. Um objeto JSON é exibido.

    Regiões padrão da AWS

    {
      "Version": "2008-10-17",
      "Id": "\_\_default\_policy\_ID",
      "Statement": [
        {
          "Sid": "\_\_owner\_statement",
          "Effect": "Allow",
          "Principal": {
            "AWS": "01234567890"
          },
          "Action": [
            "SQS:*"
          ],
          "Resource": "arn:aws:sqs:us-west-2:01234567890:test"
        }
      ]
    }

    Regiões da AWS GovCloud

    {
      "Version": "2008-10-17",
      "Id": "\_\_default\_policy\_ID",
      "Statement": [
        {
          "Sid": "\_\_owner\_statement",
          "Effect": "Allow",
          "Principal": {
            "AWS": "01234567890"
          },
          "Action": [
            "SQS:*"
          ],
          "Resource": "arn:aws-us-gov:sqs:us-gov-west-1:01234567890:test"
        }
      ]
    }

    Copie os valores de AWS e Resource. Eles são exclusivos para cada projeto.

  5. Cole os valores específicos de AWS e Resource da etapa anterior no snippet JSON a seguir:

    Regiões padrão da AWS

    {
    "Version": "2012-10-17",
    "Id": "example-ID",
    "Statement": [
      {
        "Sid": "example-statement-ID",
        "Effect": "Allow",
        "Principal": {
          "Service": "s3.amazonaws.com"
        },
        "Action": "SQS:SendMessage",
        "Resource": "RESOURCE",
        "Condition": {
          "StringEquals": {
            "aws:SourceAccount": "AWS"
          },
          "ArnLike": {
            "aws:SourceArn": "arn:aws:s3:::S3_BUCKET_NAME"
          }
        }
      }
    ]
    }

    Regiões da AWS GovCloud

    {
    "Version": "2012-10-17",
    "Id": "example-ID",
    "Statement": [
      {
        "Sid": "example-statement-ID",
        "Effect": "Allow",
        "Principal": {
          "Service": "s3.amazonaws.com"
        },
        "Action": "SQS:SendMessage",
        "Resource": "RESOURCE",
        "Condition": {
          "StringEquals": {
            "aws:SourceAccount": "AWS"
          },
          "ArnLike": {
            "aws:SourceArn": "arn:aws-us-gov:s3:::S3_BUCKET_NAME"
          }
        }
      }
    ]
    }

    Substitua S3_BUCKET_NAME pelo nome do bucket de origem do S3.

  6. Copie esse snippet JSON concluído e use-o para substituir o JSON exibido na seção Política de acesso.

  7. Clique em Criar fila.

Quando for concluído, anote o nome de recurso da Amazon (ARN) na fila.

Ativar notificações no bucket S3

  1. No console da AWS, acesse a página S3.

  2. Na lista Buckets, selecione o bucket de origem.

  3. Selecione a guia Propriedades.

  4. Na seção Notificações de eventos, clique em Criar notificações de eventos.

  5. Especifique um nome para esse evento.

  6. Na seção Tipos de evento, selecione Todos os eventos de criação de objeto.

  7. Em Destino, selecione Fila SQS e selecione a fila criada para essa transferência.

  8. Clique em Salvar alterações.

Configurar permissões

Siga as instruções em Configurar o acesso a uma origem: Amazon S3 para criar um ID de chave de acesso e uma chave secreta, ou um papel de identidade federada.

Ao seguir as instruções, use o JSON a seguir quando for solicitado a especificar um papel personalizado ou uma política de confiança personalizada:

Regiões padrão da AWS

{
  "Version": "2012-10-17",
  "Statement": [
      {
          "Effect": "Allow",
          "Action": [
              "sqs:DeleteMessage",
              "sqs:ChangeMessageVisibility",
              "sqs:ReceiveMessage",
              "s3:GetObject",
              "s3:ListBucket"
          ],
          "Resource": [
              "arn:aws:s3:::S3_BUCKET_NAME",
              "arn:aws:s3:::S3_BUCKET_NAME/*",
              "AWS_QUEUE_ARN"
          ]
      }
  ]
}

Após a criação, observe as seguintes informações:

  • Para um usuário, anote o ID da chave de acesso e a chave secreta.
  • Para um papel de identidade federada, anote o Amazon Resource Name (ARN), que tem o seguinte formato: arn:aws:iam::AWS_ACCOUNT:role/AWS_ROLE_NAME

Regiões da AWS GovCloud

{
  "Version": "2012-10-17",
  "Statement": [
      {
          "Effect": "Allow",
          "Action": [
              "sqs:DeleteMessage",
              "sqs:ChangeMessageVisibility",
              "sqs:ReceiveMessage",
              "s3:GetObject",
              "s3:ListBucket"
          ],
          "Resource": [
              "arn:aws-us-gov:s3:::S3_BUCKET_NAME",
              "arn:aws-us-gov:s3:::S3_BUCKET_NAME/*",
              "AWS_QUEUE_ARN"
          ]
      }
  ]
}

Após a criação, observe as seguintes informações:

  • Para um usuário, anote o ID da chave de acesso e a chave secreta.
  • Para um papel de identidade federada, anote o Amazon Resource Name (ARN), que tem o seguinte formato: arn:aws-us-gov:iam::AWS_ACCOUNT:role/AWS_ROLE_NAME

Criar um job de transferência

Use a API REST ou o Google Cloud console do Cloud para criar um job de transferência baseado em eventos.

Console do Cloud

  1. Acesse a página Criar job de transferência no Google Cloud console do Cloud.

    Acesse Criar job de transferência

  2. Selecione Amazon S3 como o tipo de origem e Cloud Storage como destino.

  3. No Modo de programação , selecione Baseado em eventos e clique em Próxima etapa.

  4. Insira o nome do bucket S3. O nome do bucket é o nome exibido no AWS Management Console. Por exemplo, my-aws-bucket.

  5. Selecione seu método de autenticação e insira as informações solicitadas, que você criou e anotou na seção anterior.

  6. Digite o ARN da fila do Amazon SQS que você criou anteriormente. Ele usa um dos seguintes formatos:

    • Para regiões padrão da AWS: arn:aws:sqs:AWS_REGION:AWS_ACCOUNT:AWS_QUEUE_NAME
    • Para regiões da AWS GovCloud: arn:aws-us-gov:sqs:AWS_REGION:AWS_ACCOUNT:AWS_QUEUE_NAME
  7. Se quiser, defina qualquer filtro e clique em Próxima etapa.

  8. Selecione o bucket de destino do Cloud Storage e, opcionalmente, o caminho.

  9. Também é possível digitar um horário de início e de término para a transferência. Se você não especificar um horário, a transferência será iniciada imediatamente e será executada até ser interrompida manualmente.

  10. Especifique as opções de transferência. Veja mais informações na página Criar transferências.

  11. Clique em Criar.

Depois de criado, o job de transferência começa a ser executado e um listener de eventos aguarda notificações na fila do SQS. A página de detalhes do job mostra uma operação a cada hora e inclui detalhes sobre os dados transferidos para cada job.

REST

Para criar uma transferência baseada em eventos usando a API REST, envie o seguinte objeto JSON para o transferJobs.create:

{
"description": "DESCRIPTION",
"status": "ENABLED",
"projectId": "PROJECT_ID",
"transferSpec": {
  "awsS3DataSource": {
    "bucketName": "S3_BUCKET_NAME",
    "roleArn": "AWS_ROLE_ARN"
  },
  "gcsDataSink": {
    "bucketName": "GCS_BUCKET_NAME"
  }
},
"eventStream": {
  "name": "AWS_QUEUE_ARN",
  "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
  "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
}
}

Os marcadores de posição no JSON anterior usam os seguintes valores:

  • DESCRIPTION é uma descrição do job de transferência.
  • PROJECT_ID é o ID do projeto na nuvem do Google Cloud em que o job de transferência é criado.
  • S3_BUCKET_NAME é o nome do bucket de origem do Amazon S3.
  • AWS_ROLE_ARN é o ARN do papel de identidade federada que você criou. Por exemplo, arn:aws:iam::1234567891011:role/aws-role-name para regiões padrão da AWS ou arn:aws-us-gov:iam::1234567891011:role/aws-role-name para regiões da AWS GovCloud.
  • GCS_BUCKET_NAME é o nome do bucket de destino do Cloud Storage.
  • AWS_QUEUE_ARN é o ARN da fila do SQS. Por exemplo, arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue para regiões padrão da AWS ou arn:aws-us-gov:sqs:us-gov-east-1:1234567890:event-queue para regiões da AWS GovCloud.

O eventStreamStartTime e o eventStreamExpirationTime são opcionais. Se o horário de início for omitido, a transferência começará imediatamente. Se o horário de término for omitido, a transferência continuará até que seja interrompida manualmente.

Bibliotecas de cliente

Go

Para saber como instalar e usar a biblioteca de cliente do Serviço de transferência do Cloud Storage, consulte Bibliotecas de clientes do Serviço de transferência do Cloud Storage. Para mais informações, consulte a Documentação de referência da API Go do Serviço de transferência do Cloud Storage.

Para autenticar o Serviço de transferência do Cloud Storage, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.


func createEventDrivenAWSTransfer(w io.Writer, projectID string, s3SourceBucket string, gcsSinkBucket string, sqsQueueARN string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source AWS S3 bucket.
	// s3SourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Amazon Resource Name (ARN) of the AWS SNS queue to subscribe the event driven transfer to.
	// sqsQueueARN := "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue"

	// The AWS access key credential, should be accessed via environment variable for security
	awsAccessKeyID := os.Getenv("AWS_ACCESS_KEY_ID")

	// The AWS secret key credential, should be accessed via environment variable for security
	awsSecretKey := os.Getenv("AWS_SECRET_ACCESS_KEY")

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_AwsS3DataSource{
					AwsS3DataSource: &storagetransferpb.AwsS3Data{
						BucketName: s3SourceBucket,
						AwsAccessKey: &storagetransferpb.AwsAccessKey{
							AccessKeyId:     awsAccessKeyID,
							SecretAccessKey: awsSecretKey,
						}},
				},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: sqsQueueARN},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", s3SourceBucket, gcsSinkBucket, sqsQueueARN, resp.Name)
	return resp, nil
}

Java

Para saber como instalar e usar a biblioteca de cliente do Serviço de transferência do Cloud Storage, consulte Bibliotecas de clientes do Serviço de transferência do Cloud Storage. Para mais informações, consulte a Documentação de referência da API Java do Serviço de transferência do Cloud Storage.

Para autenticar o Serviço de transferência do Cloud Storage, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenAwsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the source AWS bucket to transfer data from
    String s3SourceBucket = "yourS3SourceBucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-bucket";

    // The ARN of the SQS queue to subscribe to
    String sqsQueueArn = "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue";

    createEventDrivenAwsTransfer(projectId, s3SourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenAwsTransfer(
      String projectId, String s3SourceBucket, String gcsSinkBucket, String sqsQueueArn)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      // The ID used to access your AWS account. Should be accessed via environment variable.
      String awsAccessKeyId = System.getenv("AWS_ACCESS_KEY_ID");

      // The Secret Key used to access your AWS account. Should be accessed via environment
      // variable.
      String awsSecretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY");

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setAwsS3DataSource(
                          TransferTypes.AwsS3Data.newBuilder()
                              .setBucketName(s3SourceBucket)
                              .setAwsAccessKey(
                                  TransferTypes.AwsAccessKey.newBuilder()
                                      .setAccessKeyId(awsAccessKeyId)
                                      .setSecretAccessKey(awsSecretAccessKey))
                              .build())
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(sqsQueueArn).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job from "
              + s3SourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + sqsQueueArn
              + " with name "
              + response.getName());
    }
  }
}

Node.js

Para saber como instalar e usar a biblioteca de cliente do Serviço de transferência do Cloud Storage, consulte Bibliotecas de clientes do Serviço de transferência do Cloud Storage. Para mais informações, consulte a Documentação de referência da API Node.js do Serviço de transferência do Cloud Storage.

Para autenticar o Serviço de transferência do Cloud Storage, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// AWS S3 source bucket name
// s3SourceBucket = 'my-s3-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The ARN of the SQS queue to subscribe to
// sqsQueueArn = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

// AWS Access Key ID. Should be accessed via environment variable for security.
// awsAccessKeyId = 'AKIA...'

// AWS Secret Access Key. Should be accessed via environment variable for security.
// awsSecretAccessKey = 'HEAoMK2.../...ku8'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks an SQS queue.
 */
async function createEventDrivenAwsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        awsS3DataSource: {
          bucketName: s3SourceBucket,
          awsAccessKey: {
            accessKeyId: awsAccessKeyId,
            secretAccessKey: awsSecretAccessKey,
          },
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: sqsQueueArn,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${s3SourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenAwsTransfer();

Python

Para saber como instalar e usar a biblioteca de cliente do Serviço de transferência do Cloud Storage, consulte Bibliotecas de clientes do Serviço de transferência do Cloud Storage. Para mais informações, consulte a Documentação de referência da API Python do Serviço de transferência do Cloud Storage.

Para autenticar o Serviço de transferência do Cloud Storage, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.


from google.cloud import storage_transfer


def create_event_driven_aws_transfer(
    project_id: str,
    description: str,
    source_s3_bucket: str,
    sink_gcs_bucket: str,
    sqs_queue_arn: str,
    aws_access_key_id: str,
    aws_secret_access_key: str,
):
    """Create an event driven transfer between two GCS buckets that tracks an AWS SQS queue"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks an SQS queue'

    # AWS S3 source bucket name
    # source_s3_bucket = 'my-s3-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_gcs_bucket = 'my-gcs-destination-bucket'

    # The ARN of the SQS queue to subscribe to
    # pubsub_id = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

    # AWS Access Key ID. Should be accessed via environment variable for security purposes.
    # aws_access_key_id = 'AKIA...'

    # AWS Secret Access Key. Should be accessed via environment variable for security purposes.
    # aws_secret_access_key = 'HEAoMK2.../...ku8'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "aws_s3_data_source": {
                        "bucket_name": source_s3_bucket,
                        "aws_access_key": {
                            "access_key_id": aws_access_key_id,
                            "secret_access_key": aws_secret_access_key,
                        },
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_gcs_bucket,
                    },
                },
                "event_stream": {
                    "name": sqs_queue_arn,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")