Transferencias impulsadas por eventos desde AWS S3

El Servicio de transferencia de almacenamiento puede escuchar las notificaciones de eventos en AWS para transferir automáticamente los datos que se agregaron o actualizaron en la ubicación de origen a un bucket de Cloud Storage. Obtén más información sobre los beneficios de las transferencias controladas por eventos.

Las transferencias controladas por eventos escuchan las notificaciones de eventos de Amazon S3 que se envían a Amazon SQS para saber cuándo se modificaron o agregaron objetos en el bucket de origen. No se detectan las eliminaciones de objetos; si borras un objeto en la fuente, no se borra el objeto asociado en el bucket de destino.

Las transferencias controladas por eventos siempre usan un bucket de Cloud Storage como destino.

Antes de comenzar

Sigue las instrucciones para otorgar los permisos necesarios en tu bucket de Cloud Storage de destino:

Crea una cola de SQS

  1. En la consola de AWS, ve a la página Simple Queue Service.

  2. Haz clic en Crear cola.

  3. Ingresa un Nombre para esta cola.

  4. En la sección Política de acceso, selecciona Avanzado. Se muestra un objeto JSON.

    Regiones estándar de AWS

    {
      "Version": "2008-10-17",
      "Id": "\_\_default\_policy\_ID",
      "Statement": [
        {
          "Sid": "\_\_owner\_statement",
          "Effect": "Allow",
          "Principal": {
            "AWS": "01234567890"
          },
          "Action": [
            "SQS:*"
          ],
          "Resource": "arn:aws:sqs:us-west-2:01234567890:test"
        }
      ]
    }

    Regiones de AWS GovCloud

    {
      "Version": "2008-10-17",
      "Id": "\_\_default\_policy\_ID",
      "Statement": [
        {
          "Sid": "\_\_owner\_statement",
          "Effect": "Allow",
          "Principal": {
            "AWS": "01234567890"
          },
          "Action": [
            "SQS:*"
          ],
          "Resource": "arn:aws-us-gov:sqs:us-gov-west-1:01234567890:test"
        }
      ]
    }

    Copia los valores de AWS y Resource. Estos son únicos para cada proyecto.

  5. Pega tus valores específicos de AWS y Resource del paso anterior en el siguiente fragmento de JSON:

    Regiones estándar de AWS

    {
    "Version": "2012-10-17",
    "Id": "example-ID",
    "Statement": [
      {
        "Sid": "example-statement-ID",
        "Effect": "Allow",
        "Principal": {
          "Service": "s3.amazonaws.com"
        },
        "Action": "SQS:SendMessage",
        "Resource": "RESOURCE",
        "Condition": {
          "StringEquals": {
            "aws:SourceAccount": "AWS"
          },
          "ArnLike": {
            "aws:SourceArn": "arn:aws:s3:::S3_BUCKET_NAME"
          }
        }
      }
    ]
    }

    Regiones de AWS GovCloud

    {
    "Version": "2012-10-17",
    "Id": "example-ID",
    "Statement": [
      {
        "Sid": "example-statement-ID",
        "Effect": "Allow",
        "Principal": {
          "Service": "s3.amazonaws.com"
        },
        "Action": "SQS:SendMessage",
        "Resource": "RESOURCE",
        "Condition": {
          "StringEquals": {
            "aws:SourceAccount": "AWS"
          },
          "ArnLike": {
            "aws:SourceArn": "arn:aws-us-gov:s3:::S3_BUCKET_NAME"
          }
        }
      }
    ]
    }

    Reemplaza S3_BUCKET_NAME por el nombre del bucket de origen de S3.

  6. Copia este fragmento de JSON completado y úsalo para reemplazar el JSON que se muestra en la sección Política de acceso.

  7. Haz clic en Crear cola.

Una vez que se complete, anota el Amazon Resource Name (ARN) de la cola.

Habilita las notificaciones en tu bucket de S3

  1. En la consola de AWS, ve a la página S3.

  2. En la lista Buckets, selecciona tu bucket de origen.

  3. Selecciona la pestaña Propiedades.

  4. En la sección Notificaciones de eventos, haz clic en Crear notificación de eventos.

  5. Especifica un nombre para este evento.

  6. En la sección Tipos de eventos, selecciona Todos los eventos de creación de objetos.

  7. Como Destino , selecciona Cola de SQS y la cola que creaste para esta transferencia.

  8. Haz clic en Guardar cambios.

Configura permisos

Sigue las instrucciones en Configura el acceso a una fuente: Amazon S3 para crear un ID de clave de acceso y una clave secreta, o un rol de identidad federada.

Mientras sigues las instrucciones, usa el siguiente JSON cuando se te indique que especifiques un rol personalizado o una política de confianza personalizada:

Regiones estándar de AWS

{
  "Version": "2012-10-17",
  "Statement": [
      {
          "Effect": "Allow",
          "Action": [
              "sqs:DeleteMessage",
              "sqs:ChangeMessageVisibility",
              "sqs:ReceiveMessage",
              "s3:GetObject",
              "s3:ListBucket"
          ],
          "Resource": [
              "arn:aws:s3:::S3_BUCKET_NAME",
              "arn:aws:s3:::S3_BUCKET_NAME/*",
              "AWS_QUEUE_ARN"
          ]
      }
  ]
}

Una vez creado, anota la siguiente información:

  • Para un usuario, anota el ID de clave de acceso y la clave secreta.
  • Para un rol de identidad federada, anota el Amazon Resource Name (ARN), que tiene el siguiente formato: arn:aws:iam::AWS_ACCOUNT:role/AWS_ROLE_NAME

Regiones de AWS GovCloud

{
  "Version": "2012-10-17",
  "Statement": [
      {
          "Effect": "Allow",
          "Action": [
              "sqs:DeleteMessage",
              "sqs:ChangeMessageVisibility",
              "sqs:ReceiveMessage",
              "s3:GetObject",
              "s3:ListBucket"
          ],
          "Resource": [
              "arn:aws-us-gov:s3:::S3_BUCKET_NAME",
              "arn:aws-us-gov:s3:::S3_BUCKET_NAME/*",
              "AWS_QUEUE_ARN"
          ]
      }
  ]
}

Una vez creado, anota la siguiente información:

  • Para un usuario, anota el ID de clave de acceso y la clave secreta.
  • Para un rol de identidad federada, anota el Amazon Resource Name (ARN), que tiene el siguiente formato: arn:aws-us-gov:iam::AWS_ACCOUNT:role/AWS_ROLE_NAME

Crear un trabajo de transferencia

Puedes usar la Google Cloud consola, Google Cloud CLI o la API de REST para crear un trabajo de transferencia basado en eventos.

Consola de Cloud

  1. Ve a la página Crear trabajo de transferencia en la Google Cloud consola.

    Ir a Crear trabajo de transferencia

  2. Selecciona Amazon S3 como el tipo de fuente y Cloud Storage como el destino.

  3. Como Modo de programación , selecciona Controlado por eventos y haz clic en Siguiente paso.

  4. Ingresa el nombre de tu bucket de S3. El nombre del bucket es el nombre que aparece en la consola de administración de AWS. Por ejemplo, my-aws-bucket.

  5. Selecciona tu método de autenticación y, luego, ingresa la información solicitada que creaste y anotaste en la sección anterior.

  6. Ingresa el ARN de la cola de Amazon SQS que creaste antes. Usa uno de los siguientes formatos:

    • Para las regiones estándar de AWS: arn:aws:sqs:AWS_REGION:AWS_ACCOUNT:AWS_QUEUE_NAME
    • Para las regiones de AWS GovCloud: arn:aws-us-gov:sqs:AWS_REGION:AWS_ACCOUNT:AWS_QUEUE_NAME
  7. De manera opcional, define los filtros y, luego, haz clic en Siguiente paso.

  8. Selecciona el bucket de Cloud Storage de destino y, de manera opcional, la ruta de acceso.

  9. De manera opcional, ingresa una hora de inicio y finalización para la transferencia. Si no especificas una hora, la transferencia comenzará de inmediato y se ejecutará hasta que se detenga de forma manual.

  10. Especifica las opciones de transferencia. Hay más información disponible en la página Crear transferencias.

  11. Haz clic en Crear.

Una vez creado, el trabajo de transferencia comienza a ejecutarse y un objeto de escucha de eventos espera notificaciones en la cola de SQS. En la página de detalles del trabajo, se muestra una operación por hora y se incluyen detalles sobre los datos transferidos para cada trabajo.

gcloud

Para crear un trabajo de transferencia controlado por eventos con Google Cloud CLI, usa el comando gcloud transfer jobs create con la marca --event-stream-name:

gcloud transfer jobs create \
  s3://S3_BUCKET_NAME \
  gs://GCS_BUCKET_NAME \
  --source-creds-file=SOURCE_CREDS_FILE \
  --event-stream-name=AWS_QUEUE_ARN \
  --event-stream-starts=EVENT_STREAM_STARTS \
  --event-stream-expires=EVENT_STREAM_EXPIRES

Reemplaza los marcadores de posición por tus valores reales:

  • S3_BUCKET_NAME: El nombre de tu bucket de origen de Amazon S3.
  • GCS_BUCKET_NAME: Tu bucket de destino de Cloud Storage.
  • SOURCE_CREDS_FILE: La ruta de acceso relativa a un archivo local en tu máquina que contiene tus credenciales de AWS. Según tu método de autenticación, este archivo debe contener el ID de clave de acceso y la clave secreta, o el ARN del rol de identidad federada. Para obtener más información, consulta Configura el acceso a una fuente: Amazon S3.
  • AWS_QUEUE_ARN: El ARN de tu cola de Amazon SQS. Por ejemplo, arn:aws:sqs:us-east-1:123456789012:my-queue para las regiones estándar de AWS o arn:aws-us-gov:sqs:us-gov-west-1:123456789012:my-gov-queue para las regiones de AWS GovCloud.
  • EVENT_STREAM_STARTS: Cuándo comenzar a escuchar eventos con el formato de fecha y hora %Y-%m-%dT%H:%M:%S%z (p.ej., 2020-04-12T06:42:12+04:00). Si no se configura, el trabajo comienza a ejecutarse y a escuchar eventos cuando se envía correctamente el comando de creación del trabajo.
  • EVENT_STREAM_EXPIRES: Cuándo dejar de escuchar eventos. Si no se configura, el trabajo continúa hasta que se detiene de forma manual.

Para obtener una lista completa de los campos admitidos, consulta la gcloud transfer jobs create referencia.

REST

Para crear una transferencia controlada por eventos con la API de REST, envía el siguiente objeto JSON al transferJobs.create:

{
"description": "DESCRIPTION",
"status": "ENABLED",
"projectId": "PROJECT_ID",
"transferSpec": {
  "awsS3DataSource": {
    "bucketName": "S3_BUCKET_NAME",
    "roleArn": "AWS_ROLE_ARN"
  },
  "gcsDataSink": {
    "bucketName": "GCS_BUCKET_NAME"
  }
},
"eventStream": {
  "name": "AWS_QUEUE_ARN",
  "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
  "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
}
}

Los marcadores de posición en el JSON anterior usan los siguientes valores:

  • DESCRIPTION es una descripción del trabajo de transferencia.
  • PROJECT_ID es el ID del proyecto de Google Cloud en el que se crea el trabajo de transferencia.
  • S3_BUCKET_NAME es el nombre del bucket de origen de Amazon S3.
  • AWS_ROLE_ARN es el ARN del rol de identidad federada que creaste. Por ejemplo, arn:aws:iam::1234567891011:role/aws-role-name para las regiones estándar de AWS o arn:aws-us-gov:iam::1234567891011:role/aws-role-name para las regiones de AWS GovCloud.
  • GCS_BUCKET_NAME es el nombre del bucket de destino de Cloud Storage.
  • AWS_QUEUE_ARN es el ARN de la cola de SQS. Por ejemplo, arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue para las regiones estándar de AWS o arn:aws-us-gov:sqs:us-gov-east-1:1234567890:event-queue para las regiones de AWS GovCloud.

eventStreamStartTime y eventStreamExpirationTime son opcionales. Si se omite la hora de inicio, la transferencia comienza de inmediato; si se omite la hora de finalización, la transferencia continúa hasta que se detiene de forma manual.

Bibliotecas cliente

Go

Si deseas obtener información para instalar y usar la biblioteca cliente del Servicio de transferencia de almacenamiento, consulta las Bibliotecas cliente del Servicio de transferencia de almacenamiento. Para obtener más información, consulta la documentación de referencia de la APIGo de Servicio de transferencia de almacenamiento.

Para autenticarte en el Servicio de transferencia de almacenamiento, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.


func createEventDrivenAWSTransfer(w io.Writer, projectID string, s3SourceBucket string, gcsSinkBucket string, sqsQueueARN string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source AWS S3 bucket.
	// s3SourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Amazon Resource Name (ARN) of the AWS SNS queue to subscribe the event driven transfer to.
	// sqsQueueARN := "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue"

	// The AWS access key credential, should be accessed via environment variable for security
	awsAccessKeyID := os.Getenv("AWS_ACCESS_KEY_ID")

	// The AWS secret key credential, should be accessed via environment variable for security
	awsSecretKey := os.Getenv("AWS_SECRET_ACCESS_KEY")

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_AwsS3DataSource{
					AwsS3DataSource: &storagetransferpb.AwsS3Data{
						BucketName: s3SourceBucket,
						AwsAccessKey: &storagetransferpb.AwsAccessKey{
							AccessKeyId:     awsAccessKeyID,
							SecretAccessKey: awsSecretKey,
						}},
				},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: sqsQueueARN},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", s3SourceBucket, gcsSinkBucket, sqsQueueARN, resp.Name)
	return resp, nil
}

Java

Si deseas obtener información para instalar y usar la biblioteca cliente del Servicio de transferencia de almacenamiento, consulta las Bibliotecas cliente del Servicio de transferencia de almacenamiento. Para obtener más información, consulta la documentación de referencia de la APIJava del Servicio de transferencia de almacenamiento.

Para autenticarte en el Servicio de transferencia de almacenamiento, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenAwsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the source AWS bucket to transfer data from
    String s3SourceBucket = "yourS3SourceBucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-bucket";

    // The ARN of the SQS queue to subscribe to
    String sqsQueueArn = "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue";

    createEventDrivenAwsTransfer(projectId, s3SourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenAwsTransfer(
      String projectId, String s3SourceBucket, String gcsSinkBucket, String sqsQueueArn)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      // The ID used to access your AWS account. Should be accessed via environment variable.
      String awsAccessKeyId = System.getenv("AWS_ACCESS_KEY_ID");

      // The Secret Key used to access your AWS account. Should be accessed via environment
      // variable.
      String awsSecretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY");

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setAwsS3DataSource(
                          TransferTypes.AwsS3Data.newBuilder()
                              .setBucketName(s3SourceBucket)
                              .setAwsAccessKey(
                                  TransferTypes.AwsAccessKey.newBuilder()
                                      .setAccessKeyId(awsAccessKeyId)
                                      .setSecretAccessKey(awsSecretAccessKey))
                              .build())
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(sqsQueueArn).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job from "
              + s3SourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + sqsQueueArn
              + " with name "
              + response.getName());
    }
  }
}

Node.js

Si deseas obtener información para instalar y usar la biblioteca cliente del Servicio de transferencia de almacenamiento, consulta las Bibliotecas cliente del Servicio de transferencia de almacenamiento. Para obtener más información, consulta la documentación de referencia de la API Node.js del Servicio de transferencia de almacenamiento.

Para autenticarte en el Servicio de transferencia de almacenamiento, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// AWS S3 source bucket name
// s3SourceBucket = 'my-s3-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The ARN of the SQS queue to subscribe to
// sqsQueueArn = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

// AWS Access Key ID. Should be accessed via environment variable for security.
// awsAccessKeyId = 'AKIA...'

// AWS Secret Access Key. Should be accessed via environment variable for security.
// awsSecretAccessKey = 'HEAoMK2.../...ku8'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks an SQS queue.
 */
async function createEventDrivenAwsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        awsS3DataSource: {
          bucketName: s3SourceBucket,
          awsAccessKey: {
            accessKeyId: awsAccessKeyId,
            secretAccessKey: awsSecretAccessKey,
          },
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: sqsQueueArn,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${s3SourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenAwsTransfer();

Python

Si deseas obtener información para instalar y usar la biblioteca cliente del Servicio de transferencia de almacenamiento, consulta las Bibliotecas cliente del Servicio de transferencia de almacenamiento. Para obtener más información, consulta la documentación de referencia de la API del Servicio de transferencia de almacenamiento.Python

Para autenticarte en el Servicio de transferencia de almacenamiento, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.


from google.cloud import storage_transfer


def create_event_driven_aws_transfer(
    project_id: str,
    description: str,
    source_s3_bucket: str,
    sink_gcs_bucket: str,
    sqs_queue_arn: str,
    aws_access_key_id: str,
    aws_secret_access_key: str,
):
    """Create an event driven transfer between two GCS buckets that tracks an AWS SQS queue"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks an SQS queue'

    # AWS S3 source bucket name
    # source_s3_bucket = 'my-s3-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_gcs_bucket = 'my-gcs-destination-bucket'

    # The ARN of the SQS queue to subscribe to
    # pubsub_id = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

    # AWS Access Key ID. Should be accessed via environment variable for security purposes.
    # aws_access_key_id = 'AKIA...'

    # AWS Secret Access Key. Should be accessed via environment variable for security purposes.
    # aws_secret_access_key = 'HEAoMK2.../...ku8'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "aws_s3_data_source": {
                        "bucket_name": source_s3_bucket,
                        "aws_access_key": {
                            "access_key_id": aws_access_key_id,
                            "secret_access_key": aws_secret_access_key,
                        },
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_gcs_bucket,
                    },
                },
                "event_stream": {
                    "name": sqs_queue_arn,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")