Ereignisgesteuerte Übertragungen aus Cloud Storage

Der Storage Transfer Service kann auf Ereignisbenachrichtigungen in Google Cloud reagieren, um automatisch Daten zu übertragen, die in einem Cloud Storage-Bucket hinzugefügt oder aktualisiert wurden. Weitere Informationen zu den Vorteilen ereignisgesteuerter Übertragungen.

Bei ereignisgesteuerten Übertragungen aus Cloud Storage werden Pub/Sub-Benachrichtigungen verwendet, um zu erfahren, wann Objekte im Quell-Bucket geändert oder hinzugefügt wurden. Das Löschen von Objekten wird nicht erkannt. Wenn Sie ein Objekt an der Quelle löschen, wird das zugehörige Objekt im Ziel-Bucket nicht gelöscht.

Bei ereignisgesteuerten Übertragungen wird immer ein Cloud Storage-Bucket als Ziel verwendet.

Berechtigungen konfigurieren

Zusätzlich zu den für alle Übertragungsjobs erforderlichen Berechtigungen benötigen ereignisgesteuerte Übertragungen die Pub/Sub Subscriber Rolle.

  1. So finden Sie den Namen des Storage Transfer Service-Dienst-Agents für Ihr Projekt:

    1. Rufen Sie die googleServiceAccounts.get Referenzseite auf.

      Es wird ein interaktives Steuerfeld mit dem Titel Diese Methode testen geöffnet.

    2. Geben Sie im Steuerfeld unter Anfrageparameter Ihre Projekt-ID ein. Das hier angegebene Projekt muss das Projekt sein, das Sie zum Verwalten des Storage Transfer Service verwenden. Dieses kann sich vom Projekt des Quell-Buckets unterscheiden.

    3. Klicken Sie auf Ausführen.

    Die E-Mail-Adresse des Dienst-Agents wird als Wert von accountEmail zurückgegeben. Kopieren Sie diesen Wert.

    Die E-Mail des Dienst-Agents hat das Format project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com.

  2. Weisen Sie dem Storage Transfer Service-Dienst-Agent die Rolle Pub/Sub Subscriber zu.

    Cloud Console

    Folgen Sie der Anleitung unter Zugriff über die Google Cloud Console steuern, um dem Storage Transfer Service-Dienst die Rolle Pub/Sub Subscriber zuzuweisen. Die Rolle kann auf Thema-, Abo- oder Projektebene zugewiesen werden.

    gcloud CLI

    Folgen Sie der Anleitung unter Richtlinie festlegen, um die folgende Bindung hinzuzufügen:

    {
      "role": "roles/pubsub.subscriber",
      "members": [
        "serviceAccount:project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com"
    }

Pub/Sub konfigurieren

  1. Prüfen Sie, ob Sie die Voraussetzungen für die Verwendung von Pub/Sub mit Cloud Storage erfüllen.

  2. Erstellen Sie eine Pub/Sub-Benachrichtigung für den Cloud Storage-Quell-Bucket.

    Sie können Pub/Sub-Benachrichtigungen nicht mit der Google Cloud Console verwalten. Verwenden Sie stattdessen die gcloud CLI oder eine der verfügbaren Clientbibliotheken.

    gcloud storage buckets notifications create gs://SOURCE_BUCKET_NAME --topic=TOPIC_NAME
  3. Erstellen Sie ein Pull-Abo für das Thema. Sie müssen für jeden Übertragungsjob ein separates Abo erstellen.

    Das folgende Beispiel zeigt den Google Cloud CLI-Befehl zum Erstellen eines Pull-Abos. Eine Anleitung für die Console und Clientbibliotheks-Code finden Sie unter Pull-Abo erstellen.

    gcloud pubsub subscriptions create SUBSCRIPTION_ID --topic=TOPIC_NAME --ack-deadline=300

Übertragungsjob erstellen

Sie können die REST API oder die Google Cloud Console verwenden, um einen ereignisbasierten Übertragungsjob zu erstellen.

Der Name des Übertragungsjobs darf keine vertraulichen Informationen wie personenidentifizierbare Informationen oder Sicherheitsdaten enthalten. Ressourcennamen können an die Namen anderer Google Cloud-Ressourcen weitergegeben werden und für Google-interne Systeme außerhalb Ihres Projekts sichtbar sein.

Cloud Console

  1. Rufen Sie in der Google Cloud Console die Seite Übertragungsjob erstellen auf.

    Übertragungsjob erstellen aufrufen

  2. Wählen Sie sowohl für die Quelle als auch für das Ziel Cloud Storage aus.

  3. Wählen Sie als Planungsmodus die Option Ereignisgesteuert aus und klicken Sie auf Nächster Schritt.

  4. Wählen Sie den Quell-Bucket für diese Übertragung aus.

  5. Geben Sie im Abschnitt Ereignisstream den Namen des Abos ein:

    projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID

  6. Definieren Sie optional Filter und klicken Sie dann auf Nächster Schritt.

  7. Wählen Sie den Ziel-Bucket für diese Übertragung aus.

  8. Geben Sie optional eine Start- und Endzeit für die Übertragung ein. Wenn Sie keine Zeit angeben, wird die Übertragung sofort gestartet und ausgeführt, bis sie manuell beendet wird.

  9. Geben Sie Übertragungsoptionen an. Weitere Informationen finden Sie auf der Seite Übertragungen erstellen.

  10. Klicken Sie auf Erstellen.

Nach dem Erstellen wird der Übertragungsjob ausgeführt und ein Ereignis-Listener wartet auf Benachrichtigungen zum Pub/Sub-Abo. Auf der Seite mit den Jobdetails wird jede Stunde ein Vorgang angezeigt. Außerdem sind Details zu den für jeden Job übertragenen Daten enthalten.

REST

Wenn Sie eine ereignisgesteuerte Übertragung mit der REST API erstellen möchten, senden Sie das folgende JSON-Objekt an den transferJobs.create:

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "gcsDataSource" {
      "bucketName": "GCS_SOURCE_NAME"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

eventStreamStartTime und eventStreamExpirationTime sind optional. Wenn die Startzeit nicht angegeben wird, beginnt die Übertragung sofort. Wenn die Endzeit nicht angegeben wird, wird die Übertragung fortgesetzt, bis sie manuell beendet wird.

Clientbibliotheken

Go

Informationen zum Installieren und Verwenden der Clientbibliothek für den Storage Transfer Service finden Sie unter Storage Transfer Service-Clientbibliotheken. Weitere Informationen finden Sie in der Storage Transfer Service Go API Referenzdokumentation.

Richten Sie zur Authentifizierung beim Storage Transfer Service die Standardanmeldedaten für Anwendungen (ADC) ein. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


func createEventDrivenGCSTransfer(w io.Writer, projectID string, gcsSourceBucket string, gcsSinkBucket string, pubSubId string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source GCS bucket.
	// gcsSourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Pub/Sub topic to subscribe the event driven transfer to.
	// pubSubID := "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID"

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_GcsDataSource{
					GcsDataSource: &storagetransferpb.GcsData{BucketName: gcsSourceBucket}},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: pubSubId},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", gcsSourceBucket, gcsSinkBucket, pubSubId, resp.Name)
	return resp, nil
}

Java

Informationen zum Installieren und Verwenden der Clientbibliothek für den Storage Transfer Service finden Sie unter Storage Transfer Service-Clientbibliotheken. Weitere Informationen finden Sie in der Storage Transfer Service Java API Referenzdokumentation.

Richten Sie zur Authentifizierung beim Storage Transfer Service die Standardanmeldedaten für Anwendungen (ADC) ein. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenGcsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the GCS AWS bucket to transfer data from
    String gcsSourceBucket = "your-gcs-source-bucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-sink-bucket";

    // The ARN of the PubSub queue to subscribe to
    String sqsQueueArn = "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID";

    createEventDrivenGcsTransfer(projectId, gcsSourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenGcsTransfer(
      String projectId, String gcsSourceBucket, String gcsSinkBucket, String pubSubId)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setGcsDataSource(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSourceBucket))
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(pubSubId).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job between from "
              + gcsSourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + pubSubId
              + " with name "
              + response.getName());
    }
  }
}

Node.js

Informationen zum Installieren und Verwenden der Clientbibliothek für den Storage Transfer Service finden Sie unter Storage Transfer Service-Clientbibliotheken. Weitere Informationen finden Sie in der Storage Transfer Service Node.js API Referenzdokumentation.

Richten Sie zur Authentifizierung beim Storage Transfer Service die Standardanmeldedaten für Anwendungen (ADC) ein. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// Google Cloud Storage source bucket name
// gcsSourceBucket = 'my-gcs-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The subscription ID to a Pubsub queue to track
// pubsubId = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks a Pubsub subscription.
 */
async function createEventDrivenGcsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        gcsDataSource: {
          bucketName: gcsSourceBucket,
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: pubsubId,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${gcsSourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenGcsTransfer();

Python

Informationen zum Installieren und Verwenden der Clientbibliothek für den Storage Transfer Service finden Sie unter Storage Transfer Service-Clientbibliotheken. Weitere Informationen finden Sie in der Storage Transfer Service Python API Referenzdokumentation.

Richten Sie zur Authentifizierung beim Storage Transfer Service die Standardanmeldedaten für Anwendungen (ADC) ein. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


from google.cloud import storage_transfer


def create_event_driven_gcs_transfer(
    project_id: str,
    description: str,
    source_bucket: str,
    sink_bucket: str,
    pubsub_id: str,
):
    """Create an event driven transfer between two GCS buckets that tracks a PubSub subscription"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks a pubsub subscription'

    # Google Cloud Storage source bucket name
    # source_bucket = 'my-gcs-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_bucket = 'my-gcs-destination-bucket'

    # The Pubsub Subscription ID to track
    # pubsub_id = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "gcs_data_source": {
                        "bucket_name": source_bucket,
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_bucket,
                    },
                },
                "event_stream": {
                    "name": pubsub_id,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")

Ereignisgesteuerte Übertragung überwachen

Wenn Sie eine ereignisgesteuerte Übertragung erstellen, erstellt der Storage Transfer Service einen Übertragungsjob. Sobald die Startzeit erreicht ist, wird ein Übertragungsvorgang gestartet und ein Ereignis-Listener wartet auf Benachrichtigungen aus der Pub/Sub-Warteschlange.

Der Übertragungsvorgang wird etwa 24 Stunden lang mit dem Status in progress ausgeführt. Nach 24 Stunden wird der Vorgang beendet und ein neuer Vorgang beginnt. Alle 24 Stunden wird ein neuer Vorgang erstellt, bis die Endzeit des Übertragungsjobs erreicht ist oder der Job manuell beendet wird.

Wenn eine Dateiübertragung ausgeführt wird, wenn der Vorgang beendet werden soll, wird der aktuelle Vorgang so lange fortgesetzt, bis die Datei vollständig übertragen wurde. Ein neuer Vorgang wird gestartet und die beiden Vorgänge werden gleichzeitig ausgeführt, bis der alte Vorgang beendet ist. Alle Ereignisse, die in diesem Zeitraum erkannt werden, werden vom neuen Vorgang verarbeitet.

So rufen Sie den aktuellen Vorgang und alle abgeschlossenen Vorgänge auf:

Google Cloud Console

  1. Rufen Sie in der Google Cloud Console die Seite Storage Transfer Service auf.

    Storage Transfer Service aufrufen

  2. Wählen Sie in der Jobliste entweder den Tab Alle oder Cloud-to-Cloud aus.

  3. Klicken Sie auf die Job-ID für Ihre Übertragung. In der Spalte Planungsmodus werden alle ereignisgesteuerten Übertragungen im Vergleich zu Batchübertragungen identifiziert.

  4. Wählen Sie den Tab Vorgänge aus. Details zum aktuellen Vorgang werden angezeigt und abgeschlossene Vorgänge sind in der Tabelle Ausführungsverlauf aufgeführt. Klicken Sie auf einen abgeschlossenen Vorgang, um weitere Details zu sehen.

gcloud

Verwenden Sie gcloud transfer jobs monitor, um den Fortschritt eines Jobs in Echtzeit zu überwachen. Die Antwort zeigt den aktuellen Vorgang, die Startzeit des Jobs, die Menge der übertragenen Daten, die übersprungenen Byte und die Anzahl der Fehler.

gcloud transfer jobs monitor JOB_NAME

So rufen Sie den Namen des aktuellen Vorgangs ab:

gcloud transfer jobs describe JOB_NAME --format="value(latestOperationName)"

So listen Sie aktuelle und abgeschlossene Vorgänge auf:

gcloud transfer operations list --job-names=JOB_NAME

So rufen Sie Details zu einem Vorgang auf:

gcloud transfer operations describe OPERATION_NAME