Ler e anexar objetos em buckets zonais

Nesta página, descrevemos como ler e adicionar objetos armazenados em buckets zonais, que usam a classe de armazenamento Rapid Storage.

Nesta página, você aprende a realizar as seguintes operações:

  • Crie e grave em um objeto anexável.

  • Ler objetos anexáveis.

  • Pausar, retomar e finalizar objetos anexáveis.

  • Ler a parte final de objetos anexáveis.

Antes de usar esta página, leia os seguintes recursos:

Fazer gravações anexáveis em objetos

Bibliotecas de cliente

C++

Para mais informações, consulte a documentação de referência da API Cloud Storage C++.

Para se autenticar no Cloud Storage, configure o Application Default Credentials. Saiba mais em Configurar a autenticação para bibliotecas de cliente.

O exemplo a seguir faz o upload de um objeto anexável:

namespace gcs = google::cloud::storage;
auto coro = [](gcs::AsyncClient& client, std::string bucket_name,
               std::string object_name)
    -> google::cloud::future<google::storage::v2::Object> {
  auto [writer, token] =
      (co_await client.StartAppendableObjectUpload(
           gcs::BucketName(std::move(bucket_name)), std::move(object_name)))
          .value();
  std::cout << "Appendable upload started for object " << object_name << "\n";

  token = (co_await writer.Write(std::move(token),
                                 gcs::WritePayload("Some data\n")))
              .value();
  std::cout << "Wrote initial data.\n";

  // Flush the buffered data to the service. This is not a terminal
  // operation. The writer can be used after the flush completes.
  // After a flush, the data is visible to readers.
  auto flush_status = co_await writer.Flush();
  if (!flush_status.ok()) throw std::runtime_error(flush_status.message());

  std::cout << "Flush completed. Persisted size is now "
            << absl::get<std::int64_t>(writer.PersistedState()) << "\n";

  // The writer is still open. We can write more data.
  token = (co_await writer.Write(std::move(token),
                                 gcs::WritePayload("Some more data\n")))
              .value();
  std::cout << "Wrote more data.\n";

  // Finalize the upload to make it a regular object.
  co_return (co_await writer.Finalize(std::move(token))).value();
};

Go

Saiba mais na documentação de referência Go da API Cloud Storage.

Para se autenticar no Cloud Storage, configure o Application Default Credentials. Saiba mais em Configurar a autenticação para bibliotecas de cliente.

O exemplo a seguir faz o upload de um objeto anexável:

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
	"cloud.google.com/go/storage/experimental"
)

// createAndWriteAppendableObject creates and uploads a new appendable object in
// a rapid bucket. The object will not be finalized.
func createAndWriteAppendableObject(w io.Writer, bucket, object string) error {
	// bucket := "bucket-name"
	// object := "object-name"
	ctx := context.Background()
	client, err := storage.NewGRPCClient(ctx, experimental.WithZonalBucketAPIs())
	if err != nil {
		return fmt.Errorf("storage.NewGRPCClient: %w", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*10)
	defer cancel()

	// Create a Writer and write some data.
	writer := client.Bucket(bucket).Object(object).NewWriter(ctx)

	if _, err := writer.Write([]byte("Some data\n")); err != nil {
		return fmt.Errorf("Writer.Write: %w", err)
	}

	// Flush the buffered data to the service. This is not a terminal
	// operation. The Writer can be used after the flush completes.
	// After a flush, the data is visible to readers.
	size, err := writer.Flush()
	if err != nil {
		return fmt.Errorf("Writer.Flush: %w", err)
	}
	fmt.Fprintf(w, "Flush completed. Persisted size is now %d\n", size)

	// The Writer is still open. We can write more data.
	if _, err := writer.Write([]byte("Some more data\n")); err != nil {
		return fmt.Errorf("Writer.Write: %w", err)
	}

	// Close the Writer to flush any remaining buffered data.
	// The object will be unfinalized, which means another writer can
	// later append to the object.
	if err := writer.Close(); err != nil {
		return fmt.Errorf("Writer.Close: %w", err)
	}
	fmt.Fprintf(w, "Uploaded object %v\n", object)

	return nil
}

Java

Saiba mais na documentação de referência Java da API Cloud Storage.

Para se autenticar no Cloud Storage, configure o Application Default Credentials. Saiba mais em Configurar a autenticação para bibliotecas de cliente.

O exemplo a seguir faz o upload de um objeto anexável:


import com.google.cloud.storage.BlobAppendableUpload;
import com.google.cloud.storage.BlobAppendableUpload.AppendableUploadWriteableByteChannel;
import com.google.cloud.storage.BlobAppendableUploadConfig;
import com.google.cloud.storage.BlobAppendableUploadConfig.CloseAction;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.FlushPolicy;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Paths;
import java.util.Locale;

public class CreateAndWriteAppendableObject {
  public static void createAndWriteAppendableObject(
      String bucketName, String objectName, String filePath) throws Exception {
    // The ID of your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    // The ID of your GCS object
    // String objectName = "your-object-name";

    // The path to the file to upload
    // String filePath = "path/to/your/file";

    try (Storage storage = StorageOptions.grpc().build().getService()) {
      BlobId blobId = BlobId.of(bucketName, objectName);
      BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build();

      int flushSize = 64 * 1000;
      FlushPolicy.MaxFlushSizeFlushPolicy flushPolicy = FlushPolicy.maxFlushSize(flushSize);
      BlobAppendableUploadConfig config =
          BlobAppendableUploadConfig.of()
              .withCloseAction(CloseAction.FINALIZE_WHEN_CLOSING)
              .withFlushPolicy(flushPolicy);
      BlobAppendableUpload uploadSession = storage.blobAppendableUpload(blobInfo, config);
      try (AppendableUploadWriteableByteChannel channel = uploadSession.open();
          ReadableByteChannel readableByteChannel = FileChannel.open(Paths.get(filePath))) {
        ByteStreams.copy(readableByteChannel, channel);
        // Since the channel is in a try-with-resources block, channel.close()
        // will be implicitly called here, which triggers the finalization.
      } catch (IOException ex) {
        throw new IOException("Failed to upload to object " + blobId.toGsUtilUri(), ex);
      }
      BlobInfo result = storage.get(blobId);
      System.out.printf(
          Locale.US,
          "Object %s successfully uploaded",
          result.getBlobId().toGsUtilUriWithGeneration());
    }
  }
}

Python

Saiba mais na documentação de referência Python da API Cloud Storage.

Para se autenticar no Cloud Storage, configure o Application Default Credentials. Saiba mais em Configurar a autenticação para bibliotecas de cliente.

O exemplo a seguir faz o upload de um objeto anexável:



async def storage_create_and_write_appendable_object(
    bucket_name, object_name, grpc_client=None
):
    """Uploads an appendable object to zonal bucket.

    grpc_client: an existing grpc_client to use, this is only for testing.
    """

    if grpc_client is None:
        grpc_client = AsyncGrpcClient()
    writer = AsyncAppendableObjectWriter(
        client=grpc_client,
        bucket_name=bucket_name,
        object_name=object_name,
        generation=0,  # throws `FailedPrecondition` if object already exists.
    )
    # This creates a new appendable object of size 0 and opens it for appending.
    await writer.open()

    # appends data to the object
    # you can perform `.append` multiple times as needed. Data will be appended
    # to the end of the object.
    await writer.append(b"Some data")

    # Once all appends are done, close the gRPC bidirectional stream.
    await writer.close()

    print(
        f"Appended object {object_name} created of size {writer.persisted_size} bytes."
    )

Ler objetos

Bibliotecas de cliente

C++

Para mais informações, consulte a documentação de referência da API Cloud Storage C++.

Para se autenticar no Cloud Storage, configure o Application Default Credentials. Saiba mais em Configurar a autenticação para bibliotecas de cliente.

O exemplo a seguir faz uma leitura de intervalo em um único objeto:

namespace gcs = google::cloud::storage;

// Helper coroutine to count newlines returned by an AsyncReader.
// This helps consume the data from the read operation.
auto count_newlines =
    [](gcs::AsyncReader reader,
       gcs::AsyncToken token) -> google::cloud::future<std::uint64_t> {
  std::uint64_t count = 0;
  while (token.valid()) {
    auto [payload, t] = (co_await reader.Read(std::move(token))).value();
    token = std::move(t);
    for (auto const& buffer : payload.contents()) {
      count += std::count(buffer.begin(), buffer.end(), '\n');
    }
  }
  co_return count;
};

auto coro =
    [&count_newlines](
        gcs::AsyncClient& client, std::string bucket_name,
        std::string object_name) -> google::cloud::future<std::uint64_t> {
  auto descriptor =
      (co_await client.Open(gcs::BucketName(std::move(bucket_name)),
                            std::move(object_name)))
          .value();

  auto [reader, token] = descriptor.Read(0, 1024);

  co_return co_await count_newlines(std::move(reader), std::move(token));
};

O exemplo a seguir faz uma leitura completa em um único objeto:

namespace gcs = google::cloud::storage;

// Helper coroutine to count newlines returned by an AsyncReader.
// This helps consume the data from the read operation.
auto count_newlines =
    [](gcs::AsyncReader reader,
       gcs::AsyncToken token) -> google::cloud::future<std::uint64_t> {
  std::uint64_t count = 0;
  while (token.valid()) {
    auto [payload, t] = (co_await reader.Read(std::move(token))).value();
    token = std::move(t);
    for (auto const& buffer : payload.contents()) {
      count += std::count(buffer.begin(), buffer.end(), '\n');
    }
  }
  co_return count;
};

auto coro =
    [&count_newlines](
        gcs::AsyncClient& client, std::string bucket_name,
        std::string object_name) -> google::cloud::future<std::uint64_t> {
  auto descriptor =
      (co_await client.Open(gcs::BucketName(std::move(bucket_name)),
                            std::move(object_name)))
          .value();

  auto [reader, token] = descriptor.ReadFromOffset(0);

  co_return co_await count_newlines(std::move(reader), std::move(token));
};

O exemplo a seguir executa leituras de intervalo em um único objeto:

namespace gcs = google::cloud::storage;

// Helper coroutine to count newlines returned by an AsyncReader.
auto count_newlines =
    [](gcs::AsyncReader reader,
       gcs::AsyncToken token) -> google::cloud::future<std::uint64_t> {
  std::uint64_t count = 0;
  while (token.valid()) {
    auto [payload, t] = (co_await reader.Read(std::move(token))).value();
    token = std::move(t);
    for (auto const& buffer : payload.contents()) {
      count += std::count(buffer.begin(), buffer.end(), '\n');
    }
  }
  co_return count;
};

auto coro =
    [&count_newlines](
        gcs::AsyncClient& client, std::string bucket_name,
        std::string object_name) -> google::cloud::future<std::uint64_t> {
  auto descriptor =
      (co_await client.Open(gcs::BucketName(std::move(bucket_name)),
                            std::move(object_name)))
          .value();

  auto [r1, t1] = descriptor.Read(0, 1024);
  auto [r2, t2] = descriptor.Read(0, 1024);
  auto [r3, t3] = descriptor.Read(1024, 1024);

  auto c1 = count_newlines(std::move(r1), std::move(t1));
  auto c2 = count_newlines(std::move(r2), std::move(t2));
  auto c3 = count_newlines(std::move(r3), std::move(t3));
  co_return (co_await std::move(c1)) + (co_await std::move(c2)) +
      (co_await std::move(c3));
};

O exemplo a seguir executa leituras de intervalo em vários objetos (uma leitura por objeto):

namespace gcs = google::cloud::storage;

// Helper coroutine to count newlines returned by an AsyncReader.
auto count_newlines =
    [](gcs::AsyncReader reader,
       gcs::AsyncToken token) -> google::cloud::future<std::uint64_t> {
  std::uint64_t count = 0;
  while (token.valid()) {
    auto [payload, t] = (co_await reader.Read(std::move(token))).value();
    token = std::move(t);
    for (auto const& buffer : payload.contents()) {
      count += std::count(buffer.begin(), buffer.end(), '\n');
    }
  }
  co_return count;
};

auto coro = [&count_newlines](
                gcs::AsyncClient& client, std::string bucket_name,
                std::string object_name1, std::string object_name2,
                std::string object_name3) -> google::cloud::future<void> {
  // List of object names to read (passed as arguments)
  std::vector<std::string> object_names = {object_name1, object_name2,
                                           object_name3};
  std::vector<google::cloud::future<std::uint64_t>> futures;

  // Start ranged reads for all objects and collect futures
  // This example opens multiple objects, not one object multiple times.
  for (auto const& name : object_names) {
    auto descriptor =
        (co_await client.Open(gcs::BucketName(bucket_name), name)).value();
    auto [reader, token] = descriptor.Read(0, 1024);
    futures.push_back(count_newlines(std::move(reader), std::move(token)));
  }

  // Process futures as they become ready and print results
  while (!futures.empty()) {
    bool progress_made = false;
    for (std::size_t i = 0; i < futures.size(); ++i) {
      if (futures[i].is_ready()) {  // Check if the future is ready
        auto count = futures[i].get();
        std::cout << "Object " << object_names[i] << " read returned "
                  << count << " newlines\n";
        futures.erase(futures.begin() + i);
        object_names.erase(object_names.begin() + i);
        progress_made = true;
        break;  // Restart the loop after modifying the vectors
      }
    }
    if (!progress_made) {
      std::this_thread::sleep_for(
          std::chrono::milliseconds(10));  // Avoid busy spin
    }
  }
};

Go

Saiba mais na documentação de referência Go da API Cloud Storage.

Para se autenticar no Cloud Storage, configure o Application Default Credentials. Saiba mais em Configurar a autenticação para bibliotecas de cliente.

O exemplo a seguir faz uma leitura de intervalo em um único objeto:

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
	"cloud.google.com/go/storage/experimental"
)

// openObjectSingleRangedRead reads a single range from an object in a
// rapid bucket.
func openObjectSingleRangedRead(w io.Writer, bucket, object string) ([]byte, error) {
	// bucket := "bucket-name"
	// object := "object-name"
	ctx := context.Background()
	client, err := storage.NewGRPCClient(ctx, experimental.WithZonalBucketAPIs())
	if err != nil {
		return nil, fmt.Errorf("storage.NewGRPCClient: %w", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*10)
	defer cancel()

	// Read the first KiB of the file and copy into a buffer.
	r, err := client.Bucket(bucket).Object(object).NewRangeReader(ctx, 0, 1024)
	if err != nil {
		return nil, fmt.Errorf("NewRangeReader: %w", err)
	}
	defer r.Close()
	buf := new(bytes.Buffer)
	if _, err := io.Copy(buf, r); err != nil {
		return nil, fmt.Errorf("copying data: %v", err)
	}

	fmt.Fprintf(w, "Read the first 1024 bytes of %v into a buffer\n", object)

	return buf.Bytes(), nil
}

O exemplo a seguir faz uma leitura completa em um único objeto:

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
	"cloud.google.com/go/storage/experimental"
)

// OpenObjectReadFullObject reads a full object's data from a
// rapid bucket.
func openObjectReadFullObject(w io.Writer, bucket, object string) ([]byte, error) {
	// bucket := "bucket-name"
	// object := "object-name"
	ctx := context.Background()
	client, err := storage.NewGRPCClient(ctx, experimental.WithZonalBucketAPIs())
	if err != nil {
		return nil, fmt.Errorf("storage.NewGRPCClient: %w", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*10)
	defer cancel()

	// Read the first KiB of the file and copy into a buffer.
	r, err := client.Bucket(bucket).Object(object).NewReader(ctx)
	if err != nil {
		return nil, fmt.Errorf("NewReader: %w", err)
	}
	defer r.Close()
	buf := new(bytes.Buffer)
	if _, err := io.Copy(buf, r); err != nil {
		return nil, fmt.Errorf("copying data: %v", err)
	}

	fmt.Fprintf(w, "Read the data of %v into a buffer\n", object)

	return buf.Bytes(), nil
}

O exemplo a seguir executa leituras de intervalo em um único objeto:

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
	"cloud.google.com/go/storage/experimental"
)

// openObjectMultipleRangedRead opens a single object using
// MultiRangeDownloader to download multiple ranges.
func openObjectMultipleRangedRead(w io.Writer, bucket, object string) ([][]byte, error) {
	// bucket := "bucket-name"
	// object := "object-name"
	ctx := context.Background()
	client, err := storage.NewGRPCClient(ctx, experimental.WithZonalBucketAPIs())
	if err != nil {
		return nil, fmt.Errorf("storage.NewGRPCClient: %w", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*10)
	defer cancel()

	// Create the MultiRangeDownloader, which opens a stream to the object.
	mrd, err := client.Bucket(bucket).Object(object).NewMultiRangeDownloader(ctx)
	if err != nil {
		return nil, fmt.Errorf("NewMultiRangeDownloader: %w", err)
	}

	// Add some 1 KiB ranges to download. This call is non-blocking. The
	// provided callback is invoked when the range download is complete.
	startOffsets := []int64{0, 1024, 2048}
	var dataBufs [3]bytes.Buffer
	var errs []error
	for i, off := range startOffsets {
		mrd.Add(&dataBufs[i], off, 1024, func(off, length int64, err error) {
			if err != nil {
				errs = append(errs, err)
			} else {
				fmt.Fprintf(w, "downloaded range at offset %v", off)
			}
		})
	}

	// Wait for all downloads to complete.
	mrd.Wait()
	if len(errs) > 0 {
		return nil, fmt.Errorf("one or more downloads failed; errors: %v", errs)
	}
	if err := mrd.Close(); err != nil {
		return nil, fmt.Errorf("MultiRangeDownloader.Close: %w", err)
	}

	fmt.Fprintf(w, "Read the ranges of %v into memory\n", object)

	// Collect the byte slices
	var byteSlices [][]byte
	for _, buf := range dataBufs {
		byteSlices = append(byteSlices, buf.Bytes())
	}

	return byteSlices, nil
}

Java

Saiba mais na documentação de referência Java da API Cloud Storage.

Para se autenticar no Cloud Storage, configure o Application Default Credentials. Saiba mais em Configurar a autenticação para bibliotecas de cliente.

O exemplo a seguir faz uma leitura de intervalo em um único objeto:


import com.google.api.core.ApiFuture;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobReadSession;
import com.google.cloud.storage.RangeSpec;
import com.google.cloud.storage.ReadProjectionConfigs;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import java.util.concurrent.TimeUnit;

public class OpenObjectSingleRangedRead {
  public static void openObjectSingleRangedRead(
      String bucketName, String objectName, long offset, int length) throws Exception {
    // The ID of your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    // The ID of your GCS object
    // String objectName = "your-object-name";

    // The beginning of the range
    // long offset = 0

    // The maximum number of bytes to read from the object.
    // int length = 64;

    try (Storage storage = StorageOptions.grpc().build().getService()) {
      BlobId blobId = BlobId.of(bucketName, objectName);
      ApiFuture<BlobReadSession> futureBlobReadSession = storage.blobReadSession(blobId);

      try (BlobReadSession blobReadSession = futureBlobReadSession.get(10, TimeUnit.SECONDS)) {
        // Define the range of bytes to read.
        RangeSpec rangeSpec = RangeSpec.of(offset, length);
        ApiFuture<byte[]> future =
            blobReadSession.readAs(ReadProjectionConfigs.asFutureBytes().withRangeSpec(rangeSpec));

        // Wait for the read to complete.
        byte[] bytes = future.get();

        System.out.println(
            "Successfully read "
                + bytes.length
                + " bytes from object "
                + objectName
                + " in bucket "
                + bucketName);
      }
    }
  }
}

O exemplo a seguir faz uma leitura completa em um único objeto:


import com.google.api.core.ApiFuture;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobReadSession;
import com.google.cloud.storage.ReadAsChannel;
import com.google.cloud.storage.ReadProjectionConfigs;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import java.nio.ByteBuffer;
import java.nio.channels.ScatteringByteChannel;
import java.util.Locale;
import java.util.concurrent.TimeUnit;

public class OpenObjectReadFullObject {
  public static void openObjectReadFullObject(String bucketName, String objectName)
      throws Exception {
    // The ID of your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    // The ID of your GCS object to read
    // String objectName = "your-object-name";

    try (Storage storage = StorageOptions.grpc().build().getService()) {
      BlobId blobId = BlobId.of(bucketName, objectName);
      ApiFuture<BlobReadSession> futureBlobReadSession = storage.blobReadSession(blobId);

      try (BlobReadSession blobReadSession = futureBlobReadSession.get(10, TimeUnit.SECONDS)) {

        ReadAsChannel readAsChannelConfig = ReadProjectionConfigs.asChannel();
        try (ScatteringByteChannel channel = blobReadSession.readAs(readAsChannelConfig)) {
          long totalBytesRead = 0;
          ByteBuffer buffer = ByteBuffer.allocate(64 * 1024);
          int bytesRead;

          while ((bytesRead = channel.read(buffer)) != -1) {
            totalBytesRead += bytesRead;
            buffer.clear();
          }

          System.out.printf(
              Locale.US,
              "Successfully read a total of %d bytes from object %s%n",
              totalBytesRead,
              blobId.toGsUtilUri());
        }
      }
    }
  }
}

O exemplo a seguir executa leituras de intervalo em um único objeto:


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobReadSession;
import com.google.cloud.storage.RangeSpec;
import com.google.cloud.storage.ReadProjectionConfigs;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class OpenObjectMultipleRangedRead {
  public static void openObjectMultipleRangedRead(
      String bucketName, String objectName, long offset1, int length1, long offset2, int length2)
      throws Exception {
    // The ID of your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    // The ID of your GCS object
    // String objectName = "your-object-name";

    // The beginning of the range 1
    // long offset = 0

    // The maximum number of bytes to read in range 1
    // int length = 16;

    // The beginning of the range 2
    // long offset = 16

    // The maximum number of bytes to read in range 2
    // int length = 32;

    try (Storage storage = StorageOptions.grpc().build().getService()) {
      BlobId blobId = BlobId.of(bucketName, objectName);
      ApiFuture<BlobReadSession> futureBlobReadSession = storage.blobReadSession(blobId);
      RangeSpec rangeSpec1 = RangeSpec.of(offset1, length1);
      RangeSpec rangeSpec2 = RangeSpec.of(offset2, length2);

      try (BlobReadSession blobReadSession = futureBlobReadSession.get(10, TimeUnit.SECONDS)) {
        ApiFuture<byte[]> future1 =
            blobReadSession.readAs(ReadProjectionConfigs.asFutureBytes().withRangeSpec(rangeSpec1));
        ApiFuture<byte[]> future2 =
            blobReadSession.readAs(ReadProjectionConfigs.asFutureBytes().withRangeSpec(rangeSpec2));

        List<byte[]> allBytes = ApiFutures.allAsList(ImmutableList.of(future1, future2)).get();

        byte[] bytes1 = allBytes.get(0);
        byte[] bytes2 = allBytes.get(1);

        System.out.println(
            "Successfully read "
                + bytes1.length
                + " bytes from range 1 and "
                + bytes2.length
                + " bytes from range 2.");
      }
    }
  }
}

O exemplo a seguir executa leituras de intervalo em vários objetos:


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobReadSession;
import com.google.cloud.storage.RangeSpec;
import com.google.cloud.storage.ReadAsFutureBytes;
import com.google.cloud.storage.ReadProjectionConfigs;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class OpenMultipleObjectsRangedRead {
  public static void multipleObjectsSingleRangedRead(
      String bucketName, List<String> objectNames, long startOffset, int length) throws Exception {
    // The ID of your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    // The ID of your GCS objects to read
    // List<String> objectName = Arrays.asList("object-1", "object-2", "object-3");

    RangeSpec singleRange = RangeSpec.of(startOffset, length);
    ReadAsFutureBytes rangeConfig =
        ReadProjectionConfigs.asFutureBytes().withRangeSpec(singleRange);

    try (Storage storage = StorageOptions.grpc().build().getService()) {
      List<ApiFuture<byte[]>> futuresToWaitOn = new ArrayList<>();

      System.out.printf(
          "Initiating single ranged read [%d, %d] on %d objects...%n",
          startOffset, startOffset + length - 1, objectNames.size());

      for (String objectName : objectNames) {
        BlobId blobId = BlobId.of(bucketName, objectName);
        ApiFuture<BlobReadSession> futureReadSession = storage.blobReadSession(blobId);

        ApiFuture<byte[]> readAndCloseFuture =
            ApiFutures.transformAsync(
                futureReadSession,
                (BlobReadSession session) -> {
                  ApiFuture<byte[]> readFuture = session.readAs(rangeConfig);

                  readFuture.addListener(
                      () -> {
                        try {
                          session.close();
                        } catch (java.io.IOException e) {
                          System.err.println(
                              "WARN: Background error while closing session: " + e.getMessage());
                        }
                      },
                      MoreExecutors.directExecutor());
                  return readFuture;
                },
                MoreExecutors.directExecutor());

        futuresToWaitOn.add(readAndCloseFuture);
      }
      ApiFutures.allAsList(futuresToWaitOn).get(30, TimeUnit.SECONDS);

      System.out.println("All concurrent single-ranged read operations are complete.");
    }
  }
}

Python

Saiba mais na documentação de referência Python da API Cloud Storage.

Para se autenticar no Cloud Storage, configure o Application Default Credentials. Saiba mais em Configurar a autenticação para bibliotecas de cliente.

O exemplo a seguir faz uma leitura de intervalo em um único objeto:

async def storage_open_object_single_ranged_read(
    bucket_name, object_name, start_byte, size, grpc_client=None
):
    """Downloads a range of bytes from an object.

    grpc_client: an existing grpc_client to use, this is only for testing.
    """
    if grpc_client is None:
        grpc_client = AsyncGrpcClient()

    mrd = AsyncMultiRangeDownloader(grpc_client, bucket_name, object_name)

    try:
        # Open the object, mrd always opens in read mode.
        await mrd.open()

        # requested range will be downloaded into this buffer, user may provide
        # their own buffer or file-like object.
        output_buffer = BytesIO()
        await mrd.download_ranges([(start_byte, size, output_buffer)])
    finally:
        if mrd.is_stream_open:
            await mrd.close()

    # Downloaded size can differ from requested size if object is smaller.
    # mrd will download at most up to the end of the object.
    downloaded_size = output_buffer.getbuffer().nbytes
    print(f"Downloaded {downloaded_size} bytes from {object_name}")

O exemplo a seguir faz uma leitura completa em um único objeto:

async def storage_open_object_read_full_object(
    bucket_name, object_name, grpc_client=None
):
    """Downloads the entire content of an object using a multi-range downloader.

    grpc_client: an existing grpc_client to use, this is only for testing.
    """
    if grpc_client is None:
        grpc_client = AsyncGrpcClient()

    # mrd = Multi-Range-Downloader
    mrd = AsyncMultiRangeDownloader(grpc_client, bucket_name, object_name)

    try:
        # Open the object, mrd always opens in read mode.
        await mrd.open()

        # This could be any buffer or file-like object.
        output_buffer = BytesIO()
        # A download range of (0, 0) means to read from the beginning to the end.
        await mrd.download_ranges([(0, 0, output_buffer)])
    finally:
        if mrd.is_stream_open:
            await mrd.close()

    downloaded_bytes = output_buffer.getvalue()
    print(
        f"Downloaded all {len(downloaded_bytes)} bytes from object {object_name} in bucket {bucket_name}."
    )

O exemplo a seguir executa leituras de intervalo em um único objeto:

async def storage_open_object_multiple_ranged_read(
    bucket_name, object_name, grpc_client=None
):
    """Downloads multiple ranges of bytes from a single object into different buffers.

    grpc_client: an existing grpc_client to use, this is only for testing.
    """
    if grpc_client is None:
        grpc_client = AsyncGrpcClient()

    mrd = AsyncMultiRangeDownloader(grpc_client, bucket_name, object_name)

    try:
        # Open the object, mrd always opens in read mode.
        await mrd.open()

        # Specify four different buffers to download ranges into.
        buffers = [BytesIO(), BytesIO(), BytesIO(), BytesIO()]

        # Define the ranges to download. Each range is a tuple of (start_byte, size, buffer).
        # All ranges will download 10 bytes from different starting positions.
        # We choose arbitrary start bytes for this example. An object should be large enough.
        # A user can choose any start byte between 0 and `object_size`.
        # If `start_bytes` is greater than `object_size`, mrd will throw an error.
        ranges = [
            (0, 10, buffers[0]),
            (20, 10, buffers[1]),
            (40, 10, buffers[2]),
            (60, 10, buffers[3]),
        ]

        await mrd.download_ranges(ranges)

    finally:
        await mrd.close()

    # Print the downloaded content from each buffer.
    for i, output_buffer in enumerate(buffers):
        downloaded_size = output_buffer.getbuffer().nbytes
        print(
            f"Downloaded {downloaded_size} bytes into buffer {i + 1} from start byte {ranges[i][0]}: {output_buffer.getvalue()}"
        )

O exemplo a seguir executa leituras de intervalo em vários objetos:

async def storage_open_multiple_objects_ranged_read(
    bucket_name, object_names, grpc_client=None
):
    """Downloads a range of bytes from multiple objects concurrently.

    grpc_client: an existing grpc_client to use, this is only for testing.
    """
    if grpc_client is None:
        grpc_client = AsyncGrpcClient()

    async def _download_range(object_name):
        """Helper coroutine to download a range from a single object."""
        mrd = AsyncMultiRangeDownloader(grpc_client, bucket_name, object_name)
        try:
            # Open the object, mrd always opens in read mode.
            await mrd.open()

            # Each object downloads the first 100 bytes.
            start_byte = 0
            size = 100

            # requested range will be downloaded into this buffer, user may provide
            # their own buffer or file-like object.
            output_buffer = BytesIO()
            await mrd.download_ranges([(start_byte, size, output_buffer)])
        finally:
            if mrd.is_stream_open:
                await mrd.close()

        # Downloaded size can differ from requested size if object is smaller.
        # mrd will download at most up to the end of the object.
        downloaded_size = output_buffer.getbuffer().nbytes
        print(f"Downloaded {downloaded_size} bytes from {object_name}")

    download_tasks = [_download_range(name) for name in object_names]
    await asyncio.gather(*download_tasks)

Pausar e retomar um objeto

Bibliotecas de cliente

C++

Para mais informações, consulte a documentação de referência da API Cloud Storage C++.

Para se autenticar no Cloud Storage, configure o Application Default Credentials. Saiba mais em Configurar a autenticação para bibliotecas de cliente.

O exemplo a seguir pausa e retoma um objeto anexável:

namespace gcs = google::cloud::storage;
auto coro = [](gcs::AsyncClient& client, std::string bucket_name,
               std::string object_name)
    -> google::cloud::future<google::storage::v2::Object> {
  // Start an appendable upload and write some data.
  auto [writer, token] = (co_await client.StartAppendableObjectUpload(
                              gcs::BucketName(bucket_name), object_name))
                             .value();
  std::cout << "Appendable upload started.\n";
  token = (co_await writer.Write(std::move(token),
                                 gcs::WritePayload("paused data\n")))
              .value();

  // The writer is closed, but the upload is not finalized. This "pauses" the
  // upload, as the object remains appendable.
  auto close_status = co_await writer.Close();
  if (!close_status.ok()) throw std::runtime_error(close_status.message());
  std::cout << "Upload paused.\n";

  // To resume the upload we need the object's generation. We can use the
  // regular GCS client to get the latest metadata.
  auto regular_client = gcs::Client();
  auto metadata =
      regular_client.GetObjectMetadata(bucket_name, object_name).value();
  std::cout << "Object generation is " << metadata.generation() << "\n";

  // Now resume the upload.
  std::tie(writer, token) =
      (co_await client.ResumeAppendableObjectUpload(
           gcs::BucketName(bucket_name), object_name, metadata.generation()))
          .value();
  std::cout << "Upload resumed from offset "
            << absl::get<std::int64_t>(writer.PersistedState()) << "\n";

  // Append the rest of the data.
  token = (co_await writer.Write(std::move(token),
                                 gcs::WritePayload("resumed data\n")))
              .value();

  // Finalize the upload and return the object metadata.
  co_return (co_await writer.Finalize(std::move(token))).value();
};

Go

Saiba mais na documentação de referência Go da API Cloud Storage.

Para se autenticar no Cloud Storage, configure o Application Default Credentials. Saiba mais em Configurar a autenticação para bibliotecas de cliente.

O exemplo a seguir pausa e retoma um objeto anexável:

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
	"cloud.google.com/go/storage/experimental"
)

// pauseAndResumeAppendableUpload creates a new unfinalized appendable object,
// closes the Writer, then re-opens the object for writing using
// NewWriterFromAppendableObject.
func pauseAndResumeAppendableUpload(w io.Writer, bucket, object string) error {
	// bucket := "bucket-name"
	// object := "object-name"
	ctx := context.Background()
	client, err := storage.NewGRPCClient(ctx, experimental.WithZonalBucketAPIs())
	if err != nil {
		return fmt.Errorf("storage.NewGRPCClient: %w", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*10)
	defer cancel()

	// Start an appendable upload and write some data.
	writer := client.Bucket(bucket).Object(object).NewWriter(ctx)

	if _, err := writer.Write([]byte("Some data\n")); err != nil {
		return fmt.Errorf("Writer.Write: %w", err)
	}

	// The writer is closed, but the upload is not finalized. This "pauses" the
	// upload, as the object remains appendable.
	if err := writer.Close(); err != nil {
		return fmt.Errorf("Writer.Close: %w", err)
	}
	fmt.Fprintf(w, "Uploaded unfinalized object %v\n", object)

	// To resume the upload we need the object's generation. We can get this
	// from the previous Writer after close.
	gen := writer.Attrs().Generation

	// Now resume the upload. Writer options including finalization can be
	// passed on calling this constructor.
	appendWriter, offset, err := client.Bucket(bucket).Object(object).Generation(gen).NewWriterFromAppendableObject(
		ctx, &storage.AppendableWriterOpts{
			FinalizeOnClose: true,
		},
	)
	if err != nil {
		return fmt.Errorf("NewWriterFromAppendableObject: %v", err)
	}
	fmt.Fprintf(w, "Resuming upload from offset %v\n", offset)

	// Append the rest of the data and close the Writer to finalize.
	if _, err := appendWriter.Write([]byte("resumed data\n")); err != nil {
		return fmt.Errorf("appendWriter.Write: %v", err)
	}
	if err := appendWriter.Close(); err != nil {
		return fmt.Errorf("Writer.Close: %w", err)
	}
	fmt.Fprintf(w, "Uploaded and finalized object %v\n", object)
	return nil
}

Java

Saiba mais na documentação de referência Java da API Cloud Storage.

Para se autenticar no Cloud Storage, configure o Application Default Credentials. Saiba mais em Configurar a autenticação para bibliotecas de cliente.

O exemplo a seguir pausa e retoma um objeto anexável:


import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobAppendableUpload;
import com.google.cloud.storage.BlobAppendableUpload.AppendableUploadWriteableByteChannel;
import com.google.cloud.storage.BlobAppendableUploadConfig;
import com.google.cloud.storage.BlobAppendableUploadConfig.CloseAction;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageChannelUtils;
import com.google.cloud.storage.StorageOptions;
import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.Locale;

public class PauseAndResumeAppendableObjectUpload {
  public static void pauseAndResumeAppendableObjectUpload(
      String bucketName, String objectName, String filePath) throws Exception {
    // The ID of your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    // The ID of your GCS object
    // String objectName = "your-object-name";

    // The path to the file to upload
    // String filePath = "path/to/your/file";

    try (Storage storage = StorageOptions.grpc().build().getService()) {
      BlobId blobId = BlobId.of(bucketName, objectName);
      BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build();

      // --- Step 1: Initial string write (PAUSE) ---
      // Default close action will be CLOSE_WITHOUT_FINALIZING
      BlobAppendableUploadConfig initialConfig = BlobAppendableUploadConfig.of();
      BlobAppendableUpload initialUploadSession =
          storage.blobAppendableUpload(blobInfo, initialConfig);

      try (AppendableUploadWriteableByteChannel channel = initialUploadSession.open()) {
        String initialData = "Initial data segment.\n";
        ByteBuffer buffer = ByteBuffer.wrap(initialData.getBytes(StandardCharsets.UTF_8));
        long totalBytesWritten = StorageChannelUtils.blockingEmptyTo(buffer, channel);
        channel.flush();

        System.out.printf(
            Locale.US, "Wrote %d bytes (initial string) in first segment.\n", totalBytesWritten);
      } catch (IOException ex) {
        throw new IOException("Failed initial upload to object " + blobId.toGsUtilUri(), ex);
      }

      Blob existingBlob = storage.get(blobId);
      long currentObjectSize = existingBlob.getSize();
      System.out.printf(
          Locale.US,
          "Initial upload paused. Currently uploaded size: %d bytes\n",
          currentObjectSize);

      // --- Step 2: Resume upload with file content and finalize ---
      // Use FINALIZE_WHEN_CLOSING to ensure the object is finalized on channel closure.
      BlobAppendableUploadConfig resumeConfig =
          BlobAppendableUploadConfig.of().withCloseAction(CloseAction.FINALIZE_WHEN_CLOSING);
      BlobAppendableUpload resumeUploadSession =
          storage.blobAppendableUpload(existingBlob.toBuilder().build(), resumeConfig);

      try (FileChannel fileChannel = FileChannel.open(Paths.get(filePath));
          AppendableUploadWriteableByteChannel channel = resumeUploadSession.open()) {
        long bytesToAppend = fileChannel.size();
        System.out.printf(
            Locale.US,
            "Appending the entire file (%d bytes) after the initial string.\n",
            bytesToAppend);

        ByteStreams.copy(fileChannel, channel);
      }

      BlobInfo result = storage.get(blobId);
      System.out.printf(
          Locale.US,
          "\nObject %s successfully resumed and finalized. Total size: %d bytes\n",
          result.getBlobId().toGsUtilUriWithGeneration(),
          result.getSize());
    }
  }
}

Python

Saiba mais na documentação de referência Python da API Cloud Storage.

Para se autenticar no Cloud Storage, configure o Application Default Credentials. Saiba mais em Configurar a autenticação para bibliotecas de cliente.

O exemplo a seguir pausa e retoma um objeto anexável:

async def storage_pause_and_resume_appendable_upload(
    bucket_name, object_name, grpc_client=None
):
    """Demonstrates pausing and resuming an appendable object upload.

    grpc_client: an existing grpc_client to use, this is only for testing.
    """
    if grpc_client is None:
        grpc_client = AsyncGrpcClient()

    writer1 = AsyncAppendableObjectWriter(
        client=grpc_client,
        bucket_name=bucket_name,
        object_name=object_name,
    )
    await writer1.open()
    await writer1.append(b"First part of the data. ")
    print(f"Appended {writer1.persisted_size} bytes with the first writer.")

    # 2. After appending some data, close the writer to "pause" the upload.
    #  NOTE: you can pause indefinitely and still read the conetent uploaded so far using MRD.
    await writer1.close()

    print("First writer closed. Upload is 'paused'.")

    # 3. Create a new writer, passing the generation number from the previous
    #    writer. This is a precondition to ensure that the object hasn't been
    #    modified since we last accessed it.
    generation_to_resume = writer1.generation
    print(f"Generation to resume from is: {generation_to_resume}")

    writer2 = AsyncAppendableObjectWriter(
        client=grpc_client,
        bucket_name=bucket_name,
        object_name=object_name,
        generation=generation_to_resume,
    )
    # 4. Open the new writer.
    try:
        await writer2.open()

        # 5. Append some more data using the new writer.
        await writer2.append(b"Second part of the data.")
        print(f"Appended more data. Total size is now {writer2.persisted_size} bytes.")
    finally:
        # 6. Finally, close the new writer.
        if writer2._is_stream_open:
            await writer2.close()
    print("Second writer closed. Full object uploaded.")

Finalizar um objeto

Bibliotecas de cliente

C++

Para mais informações, consulte a documentação de referência da API Cloud Storage C++.

Para se autenticar no Cloud Storage, configure o Application Default Credentials. Saiba mais em Configurar a autenticação para bibliotecas de cliente.

O exemplo a seguir finaliza um objeto anexável:

namespace gcs = google::cloud::storage;
auto coro = [](gcs::AsyncClient& client, std::string bucket_name,
               std::string object_name)
    -> google::cloud::future<google::storage::v2::Object> {
  // Start an appendable upload.
  auto [writer, token] =
      (co_await client.StartAppendableObjectUpload(
           gcs::BucketName(std::move(bucket_name)), std::move(object_name)))
          .value();
  std::cout << "Appendable upload started. \n";

  // Write some data.
  token =
      (co_await writer.Write(std::move(token),
                             gcs::WritePayload("some data to finalize\n")))
          .value();

  // Finalize the upload. This makes the object non-appendable.
  // No more data can be written to this writer.
  auto object = (co_await writer.Finalize(std::move(token))).value();
  std::cout << "Upload finalized.\n";
  co_return object;
};

Go

Saiba mais na documentação de referência Go da API Cloud Storage.

Para se autenticar no Cloud Storage, configure o Application Default Credentials. Saiba mais em Configurar a autenticação para bibliotecas de cliente.

O exemplo a seguir finaliza um objeto anexável:

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
	"cloud.google.com/go/storage/experimental"
)

// finalizeAppendableObject creates, uploads and finalizes a new object in
// a rapid bucket.
func finalizeAppendableObject(w io.Writer, bucket, object string) error {
	// bucket := "bucket-name"
	// object := "object-name"
	ctx := context.Background()
	client, err := storage.NewGRPCClient(ctx, experimental.WithZonalBucketAPIs())
	if err != nil {
		return fmt.Errorf("storage.NewGRPCClient: %w", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*10)
	defer cancel()

	// Create a Writer and set FinalizeOnClose so that the object will be
	// finalized after the write is complete.
	writer := client.Bucket(bucket).Object(object).NewWriter(ctx)
	writer.FinalizeOnClose = true

	if _, err := writer.Write([]byte("some data to finalize\n")); err != nil {
		return fmt.Errorf("Writer.Write: %w", err)
	}

	// Close the Writer to flush any remaining buffered data and finalize
	// the upload. This makes the object non-appendable.
	// No more data can be written to this object.
	if err := writer.Close(); err != nil {
		return fmt.Errorf("Writer.Close: %w", err)
	}
	fmt.Fprintf(w, "Uploaded and finalized object %v\n", object)

	return nil
}

Java

Saiba mais na documentação de referência Java da API Cloud Storage.

Para se autenticar no Cloud Storage, configure o Application Default Credentials. Saiba mais em Configurar a autenticação para bibliotecas de cliente.

O exemplo a seguir finaliza um objeto anexável:


import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobAppendableUpload;
import com.google.cloud.storage.BlobAppendableUploadConfig;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

public class FinalizeAppendableObjectUpload {
  public static void finalizeAppendableObjectUpload(String bucketName, String objectName)
      throws Exception {
    // The ID of your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    // The ID of your GCS unfinalized appendable object
    // String objectName = "your-object-name";

    try (Storage storage = StorageOptions.grpc().build().getService()) {
      BlobId blobId = BlobId.of(bucketName, objectName);
      Blob existingBlob = storage.get(blobId);

      if (existingBlob == null) {
        System.out.println("Object " + objectName + " not found in bucket " + bucketName);
        return;
      }

      BlobInfo blobInfoForTakeover = BlobInfo.newBuilder(existingBlob.getBlobId()).build();
      BlobAppendableUpload finalizingSession =
          storage.blobAppendableUpload(
              blobInfoForTakeover,
              BlobAppendableUploadConfig.of()
                  .withCloseAction(BlobAppendableUploadConfig.CloseAction.FINALIZE_WHEN_CLOSING));

      try (BlobAppendableUpload.AppendableUploadWriteableByteChannel channel =
          finalizingSession.open()) {
        channel.finalizeAndClose();
      }

      System.out.println(
          "Successfully finalized object " + objectName + " in bucket " + bucketName);
    }
  }
}

Python

Saiba mais na documentação de referência Python da API Cloud Storage.

Para se autenticar no Cloud Storage, configure o Application Default Credentials. Saiba mais em Configurar a autenticação para bibliotecas de cliente.

O exemplo a seguir finaliza um objeto anexável:

async def storage_finalize_appendable_object_upload(
    bucket_name, object_name, grpc_client=None
):
    """Creates, writes to, and finalizes an appendable object.

    grpc_client: an existing grpc_client to use, this is only for testing.
    """

    if grpc_client is None:
        grpc_client = AsyncGrpcClient()
    writer = AsyncAppendableObjectWriter(
        client=grpc_client,
        bucket_name=bucket_name,
        object_name=object_name,
        generation=0,  # throws `FailedPrecondition` if object already exists.
    )
    # This creates a new appendable object of size 0 and opens it for appending.
    await writer.open()

    # Appends data to the object.
    await writer.append(b"Some data")

    # finalize the appendable object,
    # NOTE:
    # 1. once finalized no more appends can be done to the object.
    # 2. If you don't want to finalize, you can simply call `writer.close`
    # 3. calling `.finalize()` also closes the grpc-bidi stream, calling
    #   `.close` after `.finalize` may lead to undefined behavior.
    object_resource = await writer.finalize()

    print(f"Appendable object {object_name} created and finalized.")
    print("Object Metadata:")
    print(object_resource)

Ler a parte final de um objeto

Bibliotecas de cliente

C++

Para mais informações, consulte a documentação de referência da API Cloud Storage C++.

Para se autenticar no Cloud Storage, configure o Application Default Credentials. Saiba mais em Configurar a autenticação para bibliotecas de cliente.

O exemplo a seguir lê a parte final de um objeto anexável:

namespace gcs = google::cloud::storage;
auto coro = [](gcs::AsyncClient& client, std::string bucket_name,
               std::string object_name) -> google::cloud::future<void> {
  // This coroutine simulates a "tail -f" command on a GCS object. It
  // repeatedly polls an appendable object for new content. In a real
  // application, the object would be written to by a separate process.
  std::cout << "Polling for content from " << object_name << "...\n\n";

  // Start an appendable upload. In a real application this would be done by
  // a separate process, and this application would only know the object name.
  auto [writer, token] = (co_await client.StartAppendableObjectUpload(
                              gcs::BucketName(bucket_name), object_name))
                             .value();

  std::int64_t bytes_read = 0;
  for (int i = 0; i != 2; ++i) {
    // In a real application, another process would append data here.
    // We simulate this by writing to the object.
    auto content =
        "More data for tail example, iteration " + std::to_string(i) + "\n";
    token =
        (co_await writer.Write(std::move(token), gcs::WritePayload(content)))
            .value();
    (void)co_await writer.Flush();

    // Poll for new content by reading from the last known offset.
    auto payload =
        (co_await client.ReadObjectRange(gcs::BucketName(bucket_name),
                                         object_name, bytes_read, -1))
            .value();
    for (auto const& buffer : payload.contents()) {
      std::cout << std::string(buffer.begin(), buffer.end());
      bytes_read += buffer.size();
    }
    // In a real application you would wait here, e.g. with a timer.
    std::this_thread::sleep_for(std::chrono::seconds(1));
  }
};

Go

Saiba mais na documentação de referência Go da API Cloud Storage.

Para se autenticar no Cloud Storage, configure o Application Default Credentials. Saiba mais em Configurar a autenticação para bibliotecas de cliente.

O exemplo a seguir lê a parte final de um objeto anexável:

import (
	"bytes"
	"context"
	"errors"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
	"cloud.google.com/go/storage/experimental"
)

// readAppendableObjectTail simulates a "tail -f" command on a GCS object. It
// repeatedly polls an appendable object for new content. In a real
// application, the object would be written to by a separate process.
func readAppendableObjectTail(w io.Writer, bucket, object string) ([]byte, error) {
	// bucket := "bucket-name"
	// object := "object-name"
	ctx := context.Background()
	client, err := storage.NewGRPCClient(ctx, experimental.WithZonalBucketAPIs())
	if err != nil {
		return nil, fmt.Errorf("storage.NewGRPCClient: %w", err)
	}
	defer client.Close()

	// Set a context timeout. When this timeout is reached, the read stream
	// will be closed, so omit this to tail indefinitely.
	ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
	defer cancel()

	// Create a new appendable object and write some data.
	writer := client.Bucket(bucket).Object(object).If(storage.Conditions{DoesNotExist: true}).NewWriter(ctx)
	if _, err := writer.Write([]byte("Some data\n")); err != nil {
		return nil, fmt.Errorf("Writer.Write: %w", err)
	}
	if err := writer.Close(); err != nil {
		return nil, fmt.Errorf("Writer.Close: %w", err)
	}
	gen := writer.Attrs().Generation

	// Create the MultiRangeDownloader, which opens a read stream to the object.
	mrd, err := client.Bucket(bucket).Object(object).NewMultiRangeDownloader(ctx)
	if err != nil {
		return nil, fmt.Errorf("NewMultiRangeDownloader: %w", err)
	}

	// In a goroutine, poll the object. In this example we continue until all the
	// bytes we expect to see were received, but in a real application this could
	// continue to poll indefinitely until some signal is received.
	var buf bytes.Buffer
	var mrdErr error
	done := make(chan bool)
	go func() {
		var currOff int64
		rangeDownloaded := make(chan bool)
		for buf.Len() < 100 {
			// Add the current range and wait for it to be downloaded.
			// Using a length of 0 will read to the current end of the object.
			// The callback will give the actual number of bytes that were
			// read in each iteration.
			mrd.Add(&buf, currOff, 0, func(offset, length int64, err error) {
				// After each range is received, update
				// the starting offset based on how many bytes were received.
				if err != nil {
					mrdErr = err
				}
				currOff += length
				rangeDownloaded <- true
			})
			// Wait for the range download to complete with a timeout of 10s.
			select {
			case <-rangeDownloaded:
			case <-time.After(10 * time.Second):
				mrdErr = mrd.Error()
				if mrdErr == nil {
					mrdErr = errors.New("range request timed out after 10s")
				}
			}

			if mrdErr != nil {
				break
			}
			time.Sleep(1 * time.Second)
		}
		// After exiting the loop, close MultiRangeDownloader and signal that
		// all ranges have been read.
		if err := mrd.Close(); err != nil {
			mrdErr = err
		}
		done <- true
	}()

	// Meanwhile, continue to write 10 bytes at a time to the object.
	// This could be done by calling NewWriterFromAppendable object repeatedly
	// (as in the example) or calling Writer.Flush without closing the Writer.
	for range 9 {
		appendWriter, offset, err := client.Bucket(bucket).Object(object).Generation(gen).NewWriterFromAppendableObject(ctx, nil)
		if err != nil {
			return nil, fmt.Errorf("NewWriterFromAppendableObject: %w", err)
		}
		if _, err := appendWriter.Write([]byte("more data\n")); err != nil {
			return nil, fmt.Errorf("appendWriter.Write: %w", err)
		}
		if err := appendWriter.Close(); err != nil {
			return nil, fmt.Errorf("appendWriter.Close: %w", err)
		}
		fmt.Fprintf(w, "Wrote 10 bytes at offset %v", offset)
	}

	// Wait for tailing goroutine to exit.
	<-done
	if mrdErr != nil {
		return nil, fmt.Errorf("MultiRangeDownloader: %w", err)
	}
	fmt.Fprintf(w, "Read %v bytes from object %v", buf.Len(), object)
	return buf.Bytes(), nil
}

Java

Saiba mais na documentação de referência Java da API Cloud Storage.

Para se autenticar no Cloud Storage, configure o Application Default Credentials. Saiba mais em Configurar a autenticação para bibliotecas de cliente.

O exemplo a seguir lê a parte final de um objeto anexável:


import com.google.api.core.ApiFuture;
import com.google.cloud.storage.BlobAppendableUpload;
import com.google.cloud.storage.BlobAppendableUpload.AppendableUploadWriteableByteChannel;
import com.google.cloud.storage.BlobAppendableUploadConfig;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.BlobReadSession;
import com.google.cloud.storage.FlushPolicy;
import com.google.cloud.storage.RangeSpec;
import com.google.cloud.storage.ReadProjectionConfigs;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageChannelUtils;
import com.google.cloud.storage.StorageOptions;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;

public class ReadAppendableObjectTail {
  public static void readAppendableObjectTail(String bucketName, String objectName)
      throws Exception {
    // The ID of your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    // The ID of your GCS object
    // String objectName = "your-object-name";

    try (Storage storage = StorageOptions.grpc().build().getService()) {
      BlobId blobId = BlobId.of(bucketName, objectName);
      BlobInfo info = BlobInfo.newBuilder(blobId).build();
      int totalToWrite = 64 * 1000;
      // Define our flush policy to flush small increments
      // This is useful for demonstration purposes, but you should use more appropriate values for
      // your workload.
      int flushSize = totalToWrite / 8;
      FlushPolicy.MinFlushSizeFlushPolicy flushPolicy =
          FlushPolicy.minFlushSize(flushSize).withMaxPendingBytes(flushSize);
      BlobAppendableUploadConfig appendableUploadConfig =
          BlobAppendableUploadConfig.of().withFlushPolicy(flushPolicy);
      BlobAppendableUpload upload =
          storage.blobAppendableUpload(
              info, appendableUploadConfig, Storage.BlobWriteOption.doesNotExist());
      // Create the object, we'll takeover to write for our example.
      upload.open().closeWithoutFinalizing();
      BlobInfo gen1 = upload.getResult().get();
      BlobAppendableUpload takeover = storage.blobAppendableUpload(gen1, appendableUploadConfig);

      try (AppendableUploadWriteableByteChannel channel = takeover.open()) {
        // Start a background thread to write some data on a periodic basis
        // In reality, you're application would probably be doing thing in another scope
        Thread writeThread = startWriteThread(totalToWrite, channel, flushPolicy);
        try (BlobReadSession readSession =
            storage.blobReadSession(gen1.getBlobId()).get(10, TimeUnit.SECONDS)) {
          int zeroCnt = 0;
          long read = 0;
          while (read < totalToWrite) {
            if (zeroCnt >= 30 && !channel.isOpen()) {
              System.out.println("breaking");
              break;
            }
            ApiFuture<byte[]> future =
                readSession.readAs(
                    ReadProjectionConfigs.asFutureBytes()
                        .withRangeSpec(RangeSpec.of(read, flushPolicy.getMinFlushSize())));
            byte[] bytes = future.get(20, TimeUnit.SECONDS);

            read += bytes.length;
            long defaultSleep = 1_500L;
            if (bytes.length == 0) {
              zeroCnt++;
              long millis = defaultSleep * zeroCnt;
              System.out.println("millis = " + millis);
              Thread.sleep(millis);
            } else {
              zeroCnt = 0;
              System.out.println("bytes.length = " + bytes.length + " read = " + read);
              Thread.sleep(defaultSleep);
            }
          }
          assert read == totalToWrite : "not enough bytes";
        }
        writeThread.join();
      }
    }
  }

  private static Thread startWriteThread(
      int totalToWrite,
      AppendableUploadWriteableByteChannel channel,
      FlushPolicy.MinFlushSizeFlushPolicy flushPolicy) {
    Thread writeThread =
        new Thread(
            () -> {
              try {
                for (long written = 0; written < totalToWrite; ) {
                  byte alphaOffset = (byte) (written % 0x1a);

                  ByteBuffer buf = ByteBuffer.wrap(new byte[] {(byte) (0x41 + alphaOffset)});
                  int w = StorageChannelUtils.blockingEmptyTo(buf, channel);
                  written += w;
                  if (written % flushPolicy.getMinFlushSize() == 0) {
                    channel.flush();
                    Thread.sleep(40);
                  }
                }
                channel.closeWithoutFinalizing();

              } catch (IOException | InterruptedException e) {
                throw new RuntimeException(e);
              }
            });
    writeThread.start();
    return writeThread;
  }
}

Python

Saiba mais na documentação de referência Python da API Cloud Storage.

Para se autenticar no Cloud Storage, configure o Application Default Credentials. Saiba mais em Configurar a autenticação para bibliotecas de cliente.

O exemplo a seguir lê a parte final de um objeto anexável:

async def appender(writer: AsyncAppendableObjectWriter, duration: int):
    """Appends 10 bytes to the object every second for a given duration."""
    print("Appender started.")
    bytes_appended = 0
    start_time = time.monotonic()
    # Run the appender for the specified duration.
    while time.monotonic() - start_time < duration:
        await writer.append(BYTES_TO_APPEND)
        now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
        bytes_appended += NUM_BYTES_TO_APPEND_EVERY_SECOND
        print(
            f"[{now}] Appended {NUM_BYTES_TO_APPEND_EVERY_SECOND} new bytes. Total appended: {bytes_appended} bytes."
        )
        await asyncio.sleep(0.1)
    print("Appender finished.")


async def tailer(
    bucket_name: str, object_name: str, duration: int, client: AsyncGrpcClient
):
    """Tails the object by reading new data as it is appended."""
    print("Tailer started.")
    start_byte = 0
    start_time = time.monotonic()
    mrd = AsyncMultiRangeDownloader(client, bucket_name, object_name)
    try:
        await mrd.open()
        # Run the tailer for the specified duration.
        while time.monotonic() - start_time < duration:
            output_buffer = BytesIO()
            # A download range of (start, 0) means to read from 'start' to the end.
            await mrd.download_ranges([(start_byte, 0, output_buffer)])

            bytes_downloaded = output_buffer.getbuffer().nbytes
            if bytes_downloaded > 0:
                now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
                print(f"[{now}] Tailer read {bytes_downloaded} new bytes: ")
                start_byte += bytes_downloaded

            await asyncio.sleep(0.1)  # Poll for new data every 0.1 seconds.
    finally:
        if mrd.is_stream_open:
            await mrd.close()
    print("Tailer finished.")


# read_appendable_object_tail simulates a "tail -f" command on a GCS object. It
# repeatedly polls an appendable object for new content. In a real
# application, the object would be written to by a separate process.
async def read_appendable_object_tail(
    bucket_name: str, object_name: str, duration: int, grpc_client=None
):
    """Main function to create an appendable object and run tasks.

    grpc_client: an existing grpc_client to use, this is only for testing.
    """
    if grpc_client is None:
        grpc_client = AsyncGrpcClient()
    writer = AsyncAppendableObjectWriter(
        client=grpc_client,
        bucket_name=bucket_name,
        object_name=object_name,
    )
    # 1. Create an empty appendable object.
    try:
        # 1. Create an empty appendable object.
        await writer.open()
        print(f"Created empty appendable object: {object_name}")

        # 2. Create the appender and tailer coroutines.
        appender_task = asyncio.create_task(appender(writer, duration))
        tailer_task = asyncio.create_task(
            tailer(bucket_name, object_name, duration, grpc_client)
        )

        # 3. Execute the coroutines concurrently.
        await asyncio.gather(appender_task, tailer_task)
    finally:
        if writer._is_stream_open:
            await writer.close()
            print("Writer closed.")

A seguir