Read and append to objects in zonal buckets

This page describes how to read and make appends to objects stored in zonal buckets, which use the Rapid storage class.

This page shows you how to perform the following operations:

  • Create and write to an appendable object.

  • Read appendable objects.

  • Pause, resume, and finalize appendable objects.

  • Read the tail end of appendable objects.

Before you use this page, you might want to read the following resources:

Make appendable writes to objects

Client libraries

C++

For more information, see the Cloud Storage C++ API reference documentation.

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

The following sample uploads an appendable object:

namespace gcs = google::cloud::storage;
auto coro = [](gcs::AsyncClient& client, std::string bucket_name,
               std::string object_name)
    -> google::cloud::future<google::storage::v2::Object> {
  auto [writer, token] =
      (co_await client.StartAppendableObjectUpload(
           gcs::BucketName(std::move(bucket_name)), std::move(object_name)))
          .value();
  std::cout << "Appendable upload started for object " << object_name << "\n";

  token = (co_await writer.Write(std::move(token),
                                 gcs::WritePayload("Some data\n")))
              .value();
  std::cout << "Wrote initial data.\n";

  // Flush the buffered data to the service. This is not a terminal
  // operation. The writer can be used after the flush completes.
  // After a flush, the data is visible to readers.
  auto flush_status = co_await writer.Flush();
  if (!flush_status.ok()) throw std::runtime_error(flush_status.message());

  std::cout << "Flush completed. Persisted size is now "
            << absl::get<std::int64_t>(writer.PersistedState()) << "\n";

  // The writer is still open. We can write more data.
  token = (co_await writer.Write(std::move(token),
                                 gcs::WritePayload("Some more data\n")))
              .value();
  std::cout << "Wrote more data.\n";

  // Finalize the upload to make it a regular object.
  co_return (co_await writer.Finalize(std::move(token))).value();
};

Go

For more information, see the Cloud Storage Go API reference documentation.

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

The following sample uploads an appendable object:

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
	"cloud.google.com/go/storage/experimental"
)

// createAndWriteAppendableObject creates and uploads a new appendable object in
// a rapid bucket. The object will not be finalized.
func createAndWriteAppendableObject(w io.Writer, bucket, object string) error {
	// bucket := "bucket-name"
	// object := "object-name"
	ctx := context.Background()
	client, err := storage.NewGRPCClient(ctx, experimental.WithZonalBucketAPIs())
	if err != nil {
		return fmt.Errorf("storage.NewGRPCClient: %w", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*10)
	defer cancel()

	// Create a Writer and write some data.
	writer := client.Bucket(bucket).Object(object).NewWriter(ctx)

	if _, err := writer.Write([]byte("Some data\n")); err != nil {
		return fmt.Errorf("Writer.Write: %w", err)
	}

	// Flush the buffered data to the service. This is not a terminal
	// operation. The Writer can be used after the flush completes.
	// After a flush, the data is visible to readers.
	size, err := writer.Flush()
	if err != nil {
		return fmt.Errorf("Writer.Flush: %w", err)
	}
	fmt.Fprintf(w, "Flush completed. Persisted size is now %d\n", size)

	// The Writer is still open. We can write more data.
	if _, err := writer.Write([]byte("Some more data\n")); err != nil {
		return fmt.Errorf("Writer.Write: %w", err)
	}

	// Close the Writer to flush any remaining buffered data.
	// The object will be unfinalized, which means another writer can
	// later append to the object.
	if err := writer.Close(); err != nil {
		return fmt.Errorf("Writer.Close: %w", err)
	}
	fmt.Fprintf(w, "Uploaded object %v\n", object)

	return nil
}

Java

For more information, see the Cloud Storage Java API reference documentation.

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

The following sample uploads an appendable object:


import com.google.cloud.storage.BlobAppendableUpload;
import com.google.cloud.storage.BlobAppendableUpload.AppendableUploadWriteableByteChannel;
import com.google.cloud.storage.BlobAppendableUploadConfig;
import com.google.cloud.storage.BlobAppendableUploadConfig.CloseAction;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.FlushPolicy;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Paths;
import java.util.Locale;

public class CreateAndWriteAppendableObject {
  public static void createAndWriteAppendableObject(
      String bucketName, String objectName, String filePath) throws Exception {
    // The ID of your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    // The ID of your GCS object
    // String objectName = "your-object-name";

    // The path to the file to upload
    // String filePath = "path/to/your/file";

    try (Storage storage = StorageOptions.grpc().build().getService()) {
      BlobId blobId = BlobId.of(bucketName, objectName);
      BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build();

      int flushSize = 64 * 1000;
      FlushPolicy.MaxFlushSizeFlushPolicy flushPolicy = FlushPolicy.maxFlushSize(flushSize);
      BlobAppendableUploadConfig config =
          BlobAppendableUploadConfig.of()
              .withCloseAction(CloseAction.FINALIZE_WHEN_CLOSING)
              .withFlushPolicy(flushPolicy);
      BlobAppendableUpload uploadSession = storage.blobAppendableUpload(blobInfo, config);
      try (AppendableUploadWriteableByteChannel channel = uploadSession.open();
          ReadableByteChannel readableByteChannel = FileChannel.open(Paths.get(filePath))) {
        ByteStreams.copy(readableByteChannel, channel);
        // Since the channel is in a try-with-resources block, channel.close()
        // will be implicitly called here, which triggers the finalization.
      } catch (IOException ex) {
        throw new IOException("Failed to upload to object " + blobId.toGsUtilUri(), ex);
      }
      BlobInfo result = storage.get(blobId);
      System.out.printf(
          Locale.US,
          "Object %s successfully uploaded",
          result.getBlobId().toGsUtilUriWithGeneration());
    }
  }
}

Python

For more information, see the Cloud Storage Python API reference documentation.

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

The following sample uploads an appendable object:



async def storage_create_and_write_appendable_object(
    bucket_name, object_name, grpc_client=None
):
    """Uploads an appendable object to zonal bucket.

    grpc_client: an existing grpc_client to use, this is only for testing.
    """

    if grpc_client is None:
        grpc_client = AsyncGrpcClient()
    writer = AsyncAppendableObjectWriter(
        client=grpc_client,
        bucket_name=bucket_name,
        object_name=object_name,
        generation=0,  # throws `FailedPrecondition` if object already exists.
    )
    # This creates a new appendable object of size 0 and opens it for appending.
    await writer.open()

    # appends data to the object
    # you can perform `.append` multiple times as needed. Data will be appended
    # to the end of the object.
    await writer.append(b"Some data")

    # Once all appends are done, close the gRPC bidirectional stream.
    await writer.close()

    print(
        f"Appended object {object_name} created of size {writer.persisted_size} bytes."
    )

Read objects

Client libraries

C++

For more information, see the Cloud Storage C++ API reference documentation.

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

The following sample performs a ranged read on a single object:

namespace gcs = google::cloud::storage;

// Helper coroutine to count newlines returned by an AsyncReader.
// This helps consume the data from the read operation.
auto count_newlines =
    [](gcs::AsyncReader reader,
       gcs::AsyncToken token) -> google::cloud::future<std::uint64_t> {
  std::uint64_t count = 0;
  while (token.valid()) {
    auto [payload, t] = (co_await reader.Read(std::move(token))).value();
    token = std::move(t);
    for (auto const& buffer : payload.contents()) {
      count += std::count(buffer.begin(), buffer.end(), '\n');
    }
  }
  co_return count;
};

auto coro =
    [&count_newlines](
        gcs::AsyncClient& client, std::string bucket_name,
        std::string object_name) -> google::cloud::future<std::uint64_t> {
  auto descriptor =
      (co_await client.Open(gcs::BucketName(std::move(bucket_name)),
                            std::move(object_name)))
          .value();

  auto [reader, token] = descriptor.Read(0, 1024);

  co_return co_await count_newlines(std::move(reader), std::move(token));
};

The following sample performs a full read on a single object:

namespace gcs = google::cloud::storage;

// Helper coroutine to count newlines returned by an AsyncReader.
// This helps consume the data from the read operation.
auto count_newlines =
    [](gcs::AsyncReader reader,
       gcs::AsyncToken token) -> google::cloud::future<std::uint64_t> {
  std::uint64_t count = 0;
  while (token.valid()) {
    auto [payload, t] = (co_await reader.Read(std::move(token))).value();
    token = std::move(t);
    for (auto const& buffer : payload.contents()) {
      count += std::count(buffer.begin(), buffer.end(), '\n');
    }
  }
  co_return count;
};

auto coro =
    [&count_newlines](
        gcs::AsyncClient& client, std::string bucket_name,
        std::string object_name) -> google::cloud::future<std::uint64_t> {
  auto descriptor =
      (co_await client.Open(gcs::BucketName(std::move(bucket_name)),
                            std::move(object_name)))
          .value();

  auto [reader, token] = descriptor.ReadFromOffset(0);

  co_return co_await count_newlines(std::move(reader), std::move(token));
};

The following sample performs ranged reads on a single object:

namespace gcs = google::cloud::storage;

// Helper coroutine to count newlines returned by an AsyncReader.
auto count_newlines =
    [](gcs::AsyncReader reader,
       gcs::AsyncToken token) -> google::cloud::future<std::uint64_t> {
  std::uint64_t count = 0;
  while (token.valid()) {
    auto [payload, t] = (co_await reader.Read(std::move(token))).value();
    token = std::move(t);
    for (auto const& buffer : payload.contents()) {
      count += std::count(buffer.begin(), buffer.end(), '\n');
    }
  }
  co_return count;
};

auto coro =
    [&count_newlines](
        gcs::AsyncClient& client, std::string bucket_name,
        std::string object_name) -> google::cloud::future<std::uint64_t> {
  auto descriptor =
      (co_await client.Open(gcs::BucketName(std::move(bucket_name)),
                            std::move(object_name)))
          .value();

  auto [r1, t1] = descriptor.Read(0, 1024);
  auto [r2, t2] = descriptor.Read(0, 1024);
  auto [r3, t3] = descriptor.Read(1024, 1024);

  auto c1 = count_newlines(std::move(r1), std::move(t1));
  auto c2 = count_newlines(std::move(r2), std::move(t2));
  auto c3 = count_newlines(std::move(r3), std::move(t3));
  co_return (co_await std::move(c1)) + (co_await std::move(c2)) +
      (co_await std::move(c3));
};

The following sample performs ranged reads on multiple objects (a single read per object):

namespace gcs = google::cloud::storage;

// Helper coroutine to count newlines returned by an AsyncReader.
auto count_newlines =
    [](gcs::AsyncReader reader,
       gcs::AsyncToken token) -> google::cloud::future<std::uint64_t> {
  std::uint64_t count = 0;
  while (token.valid()) {
    auto [payload, t] = (co_await reader.Read(std::move(token))).value();
    token = std::move(t);
    for (auto const& buffer : payload.contents()) {
      count += std::count(buffer.begin(), buffer.end(), '\n');
    }
  }
  co_return count;
};

auto coro = [&count_newlines](
                gcs::AsyncClient& client, std::string bucket_name,
                std::string object_name1, std::string object_name2,
                std::string object_name3) -> google::cloud::future<void> {
  // List of object names to read (passed as arguments)
  std::vector<std::string> object_names = {object_name1, object_name2,
                                           object_name3};
  std::vector<google::cloud::future<std::uint64_t>> futures;

  // Start ranged reads for all objects and collect futures
  // This example opens multiple objects, not one object multiple times.
  for (auto const& name : object_names) {
    auto descriptor =
        (co_await client.Open(gcs::BucketName(bucket_name), name)).value();
    auto [reader, token] = descriptor.Read(0, 1024);
    futures.push_back(count_newlines(std::move(reader), std::move(token)));
  }

  // Process futures as they become ready and print results
  while (!futures.empty()) {
    bool progress_made = false;
    for (std::size_t i = 0; i < futures.size(); ++i) {
      if (futures[i].is_ready()) {  // Check if the future is ready
        auto count = futures[i].get();
        std::cout << "Object " << object_names[i] << " read returned "
                  << count << " newlines\n";
        futures.erase(futures.begin() + i);
        object_names.erase(object_names.begin() + i);
        progress_made = true;
        break;  // Restart the loop after modifying the vectors
      }
    }
    if (!progress_made) {
      std::this_thread::sleep_for(
          std::chrono::milliseconds(10));  // Avoid busy spin
    }
  }
};

Go

For more information, see the Cloud Storage Go API reference documentation.

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

The following sample performs a ranged read on a single object:

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
	"cloud.google.com/go/storage/experimental"
)

// openObjectSingleRangedRead reads a single range from an object in a
// rapid bucket.
func openObjectSingleRangedRead(w io.Writer, bucket, object string) ([]byte, error) {
	// bucket := "bucket-name"
	// object := "object-name"
	ctx := context.Background()
	client, err := storage.NewGRPCClient(ctx, experimental.WithZonalBucketAPIs())
	if err != nil {
		return nil, fmt.Errorf("storage.NewGRPCClient: %w", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*10)
	defer cancel()

	// Read the first KiB of the file and copy into a buffer.
	r, err := client.Bucket(bucket).Object(object).NewRangeReader(ctx, 0, 1024)
	if err != nil {
		return nil, fmt.Errorf("NewRangeReader: %w", err)
	}
	defer r.Close()
	buf := new(bytes.Buffer)
	if _, err := io.Copy(buf, r); err != nil {
		return nil, fmt.Errorf("copying data: %v", err)
	}

	fmt.Fprintf(w, "Read the first 1024 bytes of %v into a buffer\n", object)

	return buf.Bytes(), nil
}

The following sample performs a full read on a single object:

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
	"cloud.google.com/go/storage/experimental"
)

// OpenObjectReadFullObject reads a full object's data from a
// rapid bucket.
func openObjectReadFullObject(w io.Writer, bucket, object string) ([]byte, error) {
	// bucket := "bucket-name"
	// object := "object-name"
	ctx := context.Background()
	client, err := storage.NewGRPCClient(ctx, experimental.WithZonalBucketAPIs())
	if err != nil {
		return nil, fmt.Errorf("storage.NewGRPCClient: %w", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*10)
	defer cancel()

	// Read the first KiB of the file and copy into a buffer.
	r, err := client.Bucket(bucket).Object(object).NewReader(ctx)
	if err != nil {
		return nil, fmt.Errorf("NewReader: %w", err)
	}
	defer r.Close()
	buf := new(bytes.Buffer)
	if _, err := io.Copy(buf, r); err != nil {
		return nil, fmt.Errorf("copying data: %v", err)
	}

	fmt.Fprintf(w, "Read the data of %v into a buffer\n", object)

	return buf.Bytes(), nil
}

The following sample performs ranged reads on a single object:

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
	"cloud.google.com/go/storage/experimental"
)

// openObjectMultipleRangedRead opens a single object using
// MultiRangeDownloader to download multiple ranges.
func openObjectMultipleRangedRead(w io.Writer, bucket, object string) ([][]byte, error) {
	// bucket := "bucket-name"
	// object := "object-name"
	ctx := context.Background()
	client, err := storage.NewGRPCClient(ctx, experimental.WithZonalBucketAPIs())
	if err != nil {
		return nil, fmt.Errorf("storage.NewGRPCClient: %w", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*10)
	defer cancel()

	// Create the MultiRangeDownloader, which opens a stream to the object.
	mrd, err := client.Bucket(bucket).Object(object).NewMultiRangeDownloader(ctx)
	if err != nil {
		return nil, fmt.Errorf("NewMultiRangeDownloader: %w", err)
	}

	// Add some 1 KiB ranges to download. This call is non-blocking. The
	// provided callback is invoked when the range download is complete.
	startOffsets := []int64{0, 1024, 2048}
	var dataBufs [3]bytes.Buffer
	var errs []error
	for i, off := range startOffsets {
		mrd.Add(&dataBufs[i], off, 1024, func(off, length int64, err error) {
			if err != nil {
				errs = append(errs, err)
			} else {
				fmt.Fprintf(w, "downloaded range at offset %v", off)
			}
		})
	}

	// Wait for all downloads to complete.
	mrd.Wait()
	if len(errs) > 0 {
		return nil, fmt.Errorf("one or more downloads failed; errors: %v", errs)
	}
	if err := mrd.Close(); err != nil {
		return nil, fmt.Errorf("MultiRangeDownloader.Close: %w", err)
	}

	fmt.Fprintf(w, "Read the ranges of %v into memory\n", object)

	// Collect the byte slices
	var byteSlices [][]byte
	for _, buf := range dataBufs {
		byteSlices = append(byteSlices, buf.Bytes())
	}

	return byteSlices, nil
}

Java

For more information, see the Cloud Storage Java API reference documentation.

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

The following sample performs a ranged read on a single object:


import com.google.api.core.ApiFuture;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobReadSession;
import com.google.cloud.storage.RangeSpec;
import com.google.cloud.storage.ReadProjectionConfigs;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import java.util.concurrent.TimeUnit;

public class OpenObjectSingleRangedRead {
  public static void openObjectSingleRangedRead(
      String bucketName, String objectName, long offset, int length) throws Exception {
    // The ID of your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    // The ID of your GCS object
    // String objectName = "your-object-name";

    // The beginning of the range
    // long offset = 0

    // The maximum number of bytes to read from the object.
    // int length = 64;

    try (Storage storage = StorageOptions.grpc().build().getService()) {
      BlobId blobId = BlobId.of(bucketName, objectName);
      ApiFuture<BlobReadSession> futureBlobReadSession = storage.blobReadSession(blobId);

      try (BlobReadSession blobReadSession = futureBlobReadSession.get(10, TimeUnit.SECONDS)) {
        // Define the range of bytes to read.
        RangeSpec rangeSpec = RangeSpec.of(offset, length);
        ApiFuture<byte[]> future =
            blobReadSession.readAs(ReadProjectionConfigs.asFutureBytes().withRangeSpec(rangeSpec));

        // Wait for the read to complete.
        byte[] bytes = future.get();

        System.out.println(
            "Successfully read "
                + bytes.length
                + " bytes from object "
                + objectName
                + " in bucket "
                + bucketName);
      }
    }
  }
}

The following sample performs a full read on a single object:


import com.google.api.core.ApiFuture;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobReadSession;
import com.google.cloud.storage.ReadAsChannel;
import com.google.cloud.storage.ReadProjectionConfigs;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import java.nio.ByteBuffer;
import java.nio.channels.ScatteringByteChannel;
import java.util.Locale;
import java.util.concurrent.TimeUnit;

public class OpenObjectReadFullObject {
  public static void openObjectReadFullObject(String bucketName, String objectName)
      throws Exception {
    // The ID of your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    // The ID of your GCS object to read
    // String objectName = "your-object-name";

    try (Storage storage = StorageOptions.grpc().build().getService()) {
      BlobId blobId = BlobId.of(bucketName, objectName);
      ApiFuture<BlobReadSession> futureBlobReadSession = storage.blobReadSession(blobId);

      try (BlobReadSession blobReadSession = futureBlobReadSession.get(10, TimeUnit.SECONDS)) {

        ReadAsChannel readAsChannelConfig = ReadProjectionConfigs.asChannel();
        try (ScatteringByteChannel channel = blobReadSession.readAs(readAsChannelConfig)) {
          long totalBytesRead = 0;
          ByteBuffer buffer = ByteBuffer.allocate(64 * 1024);
          int bytesRead;

          while ((bytesRead = channel.read(buffer)) != -1) {
            totalBytesRead += bytesRead;
            buffer.clear();
          }

          System.out.printf(
              Locale.US,
              "Successfully read a total of %d bytes from object %s%n",
              totalBytesRead,
              blobId.toGsUtilUri());
        }
      }
    }
  }
}

The following sample performs ranged reads on a single object:


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobReadSession;
import com.google.cloud.storage.RangeSpec;
import com.google.cloud.storage.ReadProjectionConfigs;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class OpenObjectMultipleRangedRead {
  public static void openObjectMultipleRangedRead(
      String bucketName, String objectName, long offset1, int length1, long offset2, int length2)
      throws Exception {
    // The ID of your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    // The ID of your GCS object
    // String objectName = "your-object-name";

    // The beginning of the range 1
    // long offset = 0

    // The maximum number of bytes to read in range 1
    // int length = 16;

    // The beginning of the range 2
    // long offset = 16

    // The maximum number of bytes to read in range 2
    // int length = 32;

    try (Storage storage = StorageOptions.grpc().build().getService()) {
      BlobId blobId = BlobId.of(bucketName, objectName);
      ApiFuture<BlobReadSession> futureBlobReadSession = storage.blobReadSession(blobId);
      RangeSpec rangeSpec1 = RangeSpec.of(offset1, length1);
      RangeSpec rangeSpec2 = RangeSpec.of(offset2, length2);

      try (BlobReadSession blobReadSession = futureBlobReadSession.get(10, TimeUnit.SECONDS)) {
        ApiFuture<byte[]> future1 =
            blobReadSession.readAs(ReadProjectionConfigs.asFutureBytes().withRangeSpec(rangeSpec1));
        ApiFuture<byte[]> future2 =
            blobReadSession.readAs(ReadProjectionConfigs.asFutureBytes().withRangeSpec(rangeSpec2));

        List<byte[]> allBytes = ApiFutures.allAsList(ImmutableList.of(future1, future2)).get();

        byte[] bytes1 = allBytes.get(0);
        byte[] bytes2 = allBytes.get(1);

        System.out.println(
            "Successfully read "
                + bytes1.length
                + " bytes from range 1 and "
                + bytes2.length
                + " bytes from range 2.");
      }
    }
  }
}

The following sample performs ranged reads on multiple objects:


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobReadSession;
import com.google.cloud.storage.RangeSpec;
import com.google.cloud.storage.ReadAsFutureBytes;
import com.google.cloud.storage.ReadProjectionConfigs;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class OpenMultipleObjectsRangedRead {
  public static void multipleObjectsSingleRangedRead(
      String bucketName, List<String> objectNames, long startOffset, int length) throws Exception {
    // The ID of your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    // The ID of your GCS objects to read
    // List<String> objectName = Arrays.asList("object-1", "object-2", "object-3");

    RangeSpec singleRange = RangeSpec.of(startOffset, length);
    ReadAsFutureBytes rangeConfig =
        ReadProjectionConfigs.asFutureBytes().withRangeSpec(singleRange);

    try (Storage storage = StorageOptions.grpc().build().getService()) {
      List<ApiFuture<byte[]>> futuresToWaitOn = new ArrayList<>();

      System.out.printf(
          "Initiating single ranged read [%d, %d] on %d objects...%n",
          startOffset, startOffset + length - 1, objectNames.size());

      for (String objectName : objectNames) {
        BlobId blobId = BlobId.of(bucketName, objectName);
        ApiFuture<BlobReadSession> futureReadSession = storage.blobReadSession(blobId);

        ApiFuture<byte[]> readAndCloseFuture =
            ApiFutures.transformAsync(
                futureReadSession,
                (BlobReadSession session) -> {
                  ApiFuture<byte[]> readFuture = session.readAs(rangeConfig);

                  readFuture.addListener(
                      () -> {
                        try {
                          session.close();
                        } catch (java.io.IOException e) {
                          System.err.println(
                              "WARN: Background error while closing session: " + e.getMessage());
                        }
                      },
                      MoreExecutors.directExecutor());
                  return readFuture;
                },
                MoreExecutors.directExecutor());

        futuresToWaitOn.add(readAndCloseFuture);
      }
      ApiFutures.allAsList(futuresToWaitOn).get(30, TimeUnit.SECONDS);

      System.out.println("All concurrent single-ranged read operations are complete.");
    }
  }
}

Python

For more information, see the Cloud Storage Python API reference documentation.

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

The following sample performs a ranged read on a single object:

async def storage_open_object_single_ranged_read(
    bucket_name, object_name, start_byte, size, grpc_client=None
):
    """Downloads a range of bytes from an object.

    grpc_client: an existing grpc_client to use, this is only for testing.
    """
    if grpc_client is None:
        grpc_client = AsyncGrpcClient()

    mrd = AsyncMultiRangeDownloader(grpc_client, bucket_name, object_name)

    try:
        # Open the object, mrd always opens in read mode.
        await mrd.open()

        # requested range will be downloaded into this buffer, user may provide
        # their own buffer or file-like object.
        output_buffer = BytesIO()
        await mrd.download_ranges([(start_byte, size, output_buffer)])
    finally:
        if mrd.is_stream_open:
            await mrd.close()

    # Downloaded size can differ from requested size if object is smaller.
    # mrd will download at most up to the end of the object.
    downloaded_size = output_buffer.getbuffer().nbytes
    print(f"Downloaded {downloaded_size} bytes from {object_name}")

The following sample performs a full read on a single object:

async def storage_open_object_read_full_object(
    bucket_name, object_name, grpc_client=None
):
    """Downloads the entire content of an object using a multi-range downloader.

    grpc_client: an existing grpc_client to use, this is only for testing.
    """
    if grpc_client is None:
        grpc_client = AsyncGrpcClient()

    # mrd = Multi-Range-Downloader
    mrd = AsyncMultiRangeDownloader(grpc_client, bucket_name, object_name)

    try:
        # Open the object, mrd always opens in read mode.
        await mrd.open()

        # This could be any buffer or file-like object.
        output_buffer = BytesIO()
        # A download range of (0, 0) means to read from the beginning to the end.
        await mrd.download_ranges([(0, 0, output_buffer)])
    finally:
        if mrd.is_stream_open:
            await mrd.close()

    downloaded_bytes = output_buffer.getvalue()
    print(
        f"Downloaded all {len(downloaded_bytes)} bytes from object {object_name} in bucket {bucket_name}."
    )

The following sample performs ranged reads on a single object:

async def storage_open_object_multiple_ranged_read(
    bucket_name, object_name, grpc_client=None
):
    """Downloads multiple ranges of bytes from a single object into different buffers.

    grpc_client: an existing grpc_client to use, this is only for testing.
    """
    if grpc_client is None:
        grpc_client = AsyncGrpcClient()

    mrd = AsyncMultiRangeDownloader(grpc_client, bucket_name, object_name)

    try:
        # Open the object, mrd always opens in read mode.
        await mrd.open()

        # Specify four different buffers to download ranges into.
        buffers = [BytesIO(), BytesIO(), BytesIO(), BytesIO()]

        # Define the ranges to download. Each range is a tuple of (start_byte, size, buffer).
        # All ranges will download 10 bytes from different starting positions.
        # We choose arbitrary start bytes for this example. An object should be large enough.
        # A user can choose any start byte between 0 and `object_size`.
        # If `start_bytes` is greater than `object_size`, mrd will throw an error.
        ranges = [
            (0, 10, buffers[0]),
            (20, 10, buffers[1]),
            (40, 10, buffers[2]),
            (60, 10, buffers[3]),
        ]

        await mrd.download_ranges(ranges)

    finally:
        await mrd.close()

    # Print the downloaded content from each buffer.
    for i, output_buffer in enumerate(buffers):
        downloaded_size = output_buffer.getbuffer().nbytes
        print(
            f"Downloaded {downloaded_size} bytes into buffer {i + 1} from start byte {ranges[i][0]}: {output_buffer.getvalue()}"
        )

The following sample performs ranged reads on multiple objects:

async def storage_open_multiple_objects_ranged_read(
    bucket_name, object_names, grpc_client=None
):
    """Downloads a range of bytes from multiple objects concurrently.

    grpc_client: an existing grpc_client to use, this is only for testing.
    """
    if grpc_client is None:
        grpc_client = AsyncGrpcClient()

    async def _download_range(object_name):
        """Helper coroutine to download a range from a single object."""
        mrd = AsyncMultiRangeDownloader(grpc_client, bucket_name, object_name)
        try:
            # Open the object, mrd always opens in read mode.
            await mrd.open()

            # Each object downloads the first 100 bytes.
            start_byte = 0
            size = 100

            # requested range will be downloaded into this buffer, user may provide
            # their own buffer or file-like object.
            output_buffer = BytesIO()
            await mrd.download_ranges([(start_byte, size, output_buffer)])
        finally:
            if mrd.is_stream_open:
                await mrd.close()

        # Downloaded size can differ from requested size if object is smaller.
        # mrd will download at most up to the end of the object.
        downloaded_size = output_buffer.getbuffer().nbytes
        print(f"Downloaded {downloaded_size} bytes from {object_name}")

    download_tasks = [_download_range(name) for name in object_names]
    await asyncio.gather(*download_tasks)

Pause and resume an object

Client libraries

C++

For more information, see the Cloud Storage C++ API reference documentation.

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

The following sample pauses and resumes an appendable object:

namespace gcs = google::cloud::storage;
auto coro = [](gcs::AsyncClient& client, std::string bucket_name,
               std::string object_name)
    -> google::cloud::future<google::storage::v2::Object> {
  // Start an appendable upload and write some data.
  auto [writer, token] = (co_await client.StartAppendableObjectUpload(
                              gcs::BucketName(bucket_name), object_name))
                             .value();
  std::cout << "Appendable upload started.\n";
  token = (co_await writer.Write(std::move(token),
                                 gcs::WritePayload("paused data\n")))
              .value();

  // The writer is closed, but the upload is not finalized. This "pauses" the
  // upload, as the object remains appendable.
  auto close_status = co_await writer.Close();
  if (!close_status.ok()) throw std::runtime_error(close_status.message());
  std::cout << "Upload paused.\n";

  // To resume the upload we need the object's generation. We can use the
  // regular GCS client to get the latest metadata.
  auto regular_client = gcs::Client();
  auto metadata =
      regular_client.GetObjectMetadata(bucket_name, object_name).value();
  std::cout << "Object generation is " << metadata.generation() << "\n";

  // Now resume the upload.
  std::tie(writer, token) =
      (co_await client.ResumeAppendableObjectUpload(
           gcs::BucketName(bucket_name), object_name, metadata.generation()))
          .value();
  std::cout << "Upload resumed from offset "
            << absl::get<std::int64_t>(writer.PersistedState()) << "\n";

  // Append the rest of the data.
  token = (co_await writer.Write(std::move(token),
                                 gcs::WritePayload("resumed data\n")))
              .value();

  // Finalize the upload and return the object metadata.
  co_return (co_await writer.Finalize(std::move(token))).value();
};

Go

For more information, see the Cloud Storage Go API reference documentation.

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

The following sample pauses and resumes an appendable object:

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
	"cloud.google.com/go/storage/experimental"
)

// pauseAndResumeAppendableUpload creates a new unfinalized appendable object,
// closes the Writer, then re-opens the object for writing using
// NewWriterFromAppendableObject.
func pauseAndResumeAppendableUpload(w io.Writer, bucket, object string) error {
	// bucket := "bucket-name"
	// object := "object-name"
	ctx := context.Background()
	client, err := storage.NewGRPCClient(ctx, experimental.WithZonalBucketAPIs())
	if err != nil {
		return fmt.Errorf("storage.NewGRPCClient: %w", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*10)
	defer cancel()

	// Start an appendable upload and write some data.
	writer := client.Bucket(bucket).Object(object).NewWriter(ctx)

	if _, err := writer.Write([]byte("Some data\n")); err != nil {
		return fmt.Errorf("Writer.Write: %w", err)
	}

	// The writer is closed, but the upload is not finalized. This "pauses" the
	// upload, as the object remains appendable.
	if err := writer.Close(); err != nil {
		return fmt.Errorf("Writer.Close: %w", err)
	}
	fmt.Fprintf(w, "Uploaded unfinalized object %v\n", object)

	// To resume the upload we need the object's generation. We can get this
	// from the previous Writer after close.
	gen := writer.Attrs().Generation

	// Now resume the upload. Writer options including finalization can be
	// passed on calling this constructor.
	appendWriter, offset, err := client.Bucket(bucket).Object(object).Generation(gen).NewWriterFromAppendableObject(
		ctx, &storage.AppendableWriterOpts{
			FinalizeOnClose: true,
		},
	)
	if err != nil {
		return fmt.Errorf("NewWriterFromAppendableObject: %v", err)
	}
	fmt.Fprintf(w, "Resuming upload from offset %v\n", offset)

	// Append the rest of the data and close the Writer to finalize.
	if _, err := appendWriter.Write([]byte("resumed data\n")); err != nil {
		return fmt.Errorf("appendWriter.Write: %v", err)
	}
	if err := appendWriter.Close(); err != nil {
		return fmt.Errorf("Writer.Close: %w", err)
	}
	fmt.Fprintf(w, "Uploaded and finalized object %v\n", object)
	return nil
}

Java

For more information, see the Cloud Storage Java API reference documentation.

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

The following sample pauses and resumes an appendable object:


import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobAppendableUpload;
import com.google.cloud.storage.BlobAppendableUpload.AppendableUploadWriteableByteChannel;
import com.google.cloud.storage.BlobAppendableUploadConfig;
import com.google.cloud.storage.BlobAppendableUploadConfig.CloseAction;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageChannelUtils;
import com.google.cloud.storage.StorageOptions;
import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.Locale;

public class PauseAndResumeAppendableObjectUpload {
  public static void pauseAndResumeAppendableObjectUpload(
      String bucketName, String objectName, String filePath) throws Exception {
    // The ID of your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    // The ID of your GCS object
    // String objectName = "your-object-name";

    // The path to the file to upload
    // String filePath = "path/to/your/file";

    try (Storage storage = StorageOptions.grpc().build().getService()) {
      BlobId blobId = BlobId.of(bucketName, objectName);
      BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build();

      // --- Step 1: Initial string write (PAUSE) ---
      // Default close action will be CLOSE_WITHOUT_FINALIZING
      BlobAppendableUploadConfig initialConfig = BlobAppendableUploadConfig.of();
      BlobAppendableUpload initialUploadSession =
          storage.blobAppendableUpload(blobInfo, initialConfig);

      try (AppendableUploadWriteableByteChannel channel = initialUploadSession.open()) {
        String initialData = "Initial data segment.\n";
        ByteBuffer buffer = ByteBuffer.wrap(initialData.getBytes(StandardCharsets.UTF_8));
        long totalBytesWritten = StorageChannelUtils.blockingEmptyTo(buffer, channel);
        channel.flush();

        System.out.printf(
            Locale.US, "Wrote %d bytes (initial string) in first segment.\n", totalBytesWritten);
      } catch (IOException ex) {
        throw new IOException("Failed initial upload to object " + blobId.toGsUtilUri(), ex);
      }

      Blob existingBlob = storage.get(blobId);
      long currentObjectSize = existingBlob.getSize();
      System.out.printf(
          Locale.US,
          "Initial upload paused. Currently uploaded size: %d bytes\n",
          currentObjectSize);

      // --- Step 2: Resume upload with file content and finalize ---
      // Use FINALIZE_WHEN_CLOSING to ensure the object is finalized on channel closure.
      BlobAppendableUploadConfig resumeConfig =
          BlobAppendableUploadConfig.of().withCloseAction(CloseAction.FINALIZE_WHEN_CLOSING);
      BlobAppendableUpload resumeUploadSession =
          storage.blobAppendableUpload(existingBlob.toBuilder().build(), resumeConfig);

      try (FileChannel fileChannel = FileChannel.open(Paths.get(filePath));
          AppendableUploadWriteableByteChannel channel = resumeUploadSession.open()) {
        long bytesToAppend = fileChannel.size();
        System.out.printf(
            Locale.US,
            "Appending the entire file (%d bytes) after the initial string.\n",
            bytesToAppend);

        ByteStreams.copy(fileChannel, channel);
      }

      BlobInfo result = storage.get(blobId);
      System.out.printf(
          Locale.US,
          "\nObject %s successfully resumed and finalized. Total size: %d bytes\n",
          result.getBlobId().toGsUtilUriWithGeneration(),
          result.getSize());
    }
  }
}

Python

For more information, see the Cloud Storage Python API reference documentation.

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

The following sample pauses and resumes an appendable object:

async def storage_pause_and_resume_appendable_upload(
    bucket_name, object_name, grpc_client=None
):
    """Demonstrates pausing and resuming an appendable object upload.

    grpc_client: an existing grpc_client to use, this is only for testing.
    """
    if grpc_client is None:
        grpc_client = AsyncGrpcClient()

    writer1 = AsyncAppendableObjectWriter(
        client=grpc_client,
        bucket_name=bucket_name,
        object_name=object_name,
    )
    await writer1.open()
    await writer1.append(b"First part of the data. ")
    print(f"Appended {writer1.persisted_size} bytes with the first writer.")

    # 2. After appending some data, close the writer to "pause" the upload.
    #  NOTE: you can pause indefinitely and still read the conetent uploaded so far using MRD.
    await writer1.close()

    print("First writer closed. Upload is 'paused'.")

    # 3. Create a new writer, passing the generation number from the previous
    #    writer. This is a precondition to ensure that the object hasn't been
    #    modified since we last accessed it.
    generation_to_resume = writer1.generation
    print(f"Generation to resume from is: {generation_to_resume}")

    writer2 = AsyncAppendableObjectWriter(
        client=grpc_client,
        bucket_name=bucket_name,
        object_name=object_name,
        generation=generation_to_resume,
    )
    # 4. Open the new writer.
    try:
        await writer2.open()

        # 5. Append some more data using the new writer.
        await writer2.append(b"Second part of the data.")
        print(f"Appended more data. Total size is now {writer2.persisted_size} bytes.")
    finally:
        # 6. Finally, close the new writer.
        if writer2._is_stream_open:
            await writer2.close()
    print("Second writer closed. Full object uploaded.")

Finalize an object

Client libraries

C++

For more information, see the Cloud Storage C++ API reference documentation.

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

The following sample finalizes an appendable object:

namespace gcs = google::cloud::storage;
auto coro = [](gcs::AsyncClient& client, std::string bucket_name,
               std::string object_name)
    -> google::cloud::future<google::storage::v2::Object> {
  // Start an appendable upload.
  auto [writer, token] =
      (co_await client.StartAppendableObjectUpload(
           gcs::BucketName(std::move(bucket_name)), std::move(object_name)))
          .value();
  std::cout << "Appendable upload started. \n";

  // Write some data.
  token =
      (co_await writer.Write(std::move(token),
                             gcs::WritePayload("some data to finalize\n")))
          .value();

  // Finalize the upload. This makes the object non-appendable.
  // No more data can be written to this writer.
  auto object = (co_await writer.Finalize(std::move(token))).value();
  std::cout << "Upload finalized.\n";
  co_return object;
};

Go

For more information, see the Cloud Storage Go API reference documentation.

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

The following sample finalizes an appendable object:

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
	"cloud.google.com/go/storage/experimental"
)

// finalizeAppendableObject creates, uploads and finalizes a new object in
// a rapid bucket.
func finalizeAppendableObject(w io.Writer, bucket, object string) error {
	// bucket := "bucket-name"
	// object := "object-name"
	ctx := context.Background()
	client, err := storage.NewGRPCClient(ctx, experimental.WithZonalBucketAPIs())
	if err != nil {
		return fmt.Errorf("storage.NewGRPCClient: %w", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*10)
	defer cancel()

	// Create a Writer and set FinalizeOnClose so that the object will be
	// finalized after the write is complete.
	writer := client.Bucket(bucket).Object(object).NewWriter(ctx)
	writer.FinalizeOnClose = true

	if _, err := writer.Write([]byte("some data to finalize\n")); err != nil {
		return fmt.Errorf("Writer.Write: %w", err)
	}

	// Close the Writer to flush any remaining buffered data and finalize
	// the upload. This makes the object non-appendable.
	// No more data can be written to this object.
	if err := writer.Close(); err != nil {
		return fmt.Errorf("Writer.Close: %w", err)
	}
	fmt.Fprintf(w, "Uploaded and finalized object %v\n", object)

	return nil
}

Java

For more information, see the Cloud Storage Java API reference documentation.

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

The following sample finalizes an appendable object:


import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobAppendableUpload;
import com.google.cloud.storage.BlobAppendableUploadConfig;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

public class FinalizeAppendableObjectUpload {
  public static void finalizeAppendableObjectUpload(String bucketName, String objectName)
      throws Exception {
    // The ID of your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    // The ID of your GCS unfinalized appendable object
    // String objectName = "your-object-name";

    try (Storage storage = StorageOptions.grpc().build().getService()) {
      BlobId blobId = BlobId.of(bucketName, objectName);
      Blob existingBlob = storage.get(blobId);

      if (existingBlob == null) {
        System.out.println("Object " + objectName + " not found in bucket " + bucketName);
        return;
      }

      BlobInfo blobInfoForTakeover = BlobInfo.newBuilder(existingBlob.getBlobId()).build();
      BlobAppendableUpload finalizingSession =
          storage.blobAppendableUpload(
              blobInfoForTakeover,
              BlobAppendableUploadConfig.of()
                  .withCloseAction(BlobAppendableUploadConfig.CloseAction.FINALIZE_WHEN_CLOSING));

      try (BlobAppendableUpload.AppendableUploadWriteableByteChannel channel =
          finalizingSession.open()) {
        channel.finalizeAndClose();
      }

      System.out.println(
          "Successfully finalized object " + objectName + " in bucket " + bucketName);
    }
  }
}

Python

For more information, see the Cloud Storage Python API reference documentation.

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

The following sample finalizes an appendable object:

async def storage_finalize_appendable_object_upload(
    bucket_name, object_name, grpc_client=None
):
    """Creates, writes to, and finalizes an appendable object.

    grpc_client: an existing grpc_client to use, this is only for testing.
    """

    if grpc_client is None:
        grpc_client = AsyncGrpcClient()
    writer = AsyncAppendableObjectWriter(
        client=grpc_client,
        bucket_name=bucket_name,
        object_name=object_name,
        generation=0,  # throws `FailedPrecondition` if object already exists.
    )
    # This creates a new appendable object of size 0 and opens it for appending.
    await writer.open()

    # Appends data to the object.
    await writer.append(b"Some data")

    # finalize the appendable object,
    # NOTE:
    # 1. once finalized no more appends can be done to the object.
    # 2. If you don't want to finalize, you can simply call `writer.close`
    # 3. calling `.finalize()` also closes the grpc-bidi stream, calling
    #   `.close` after `.finalize` may lead to undefined behavior.
    object_resource = await writer.finalize()

    print(f"Appendable object {object_name} created and finalized.")
    print("Object Metadata:")
    print(object_resource)

Read the tail of an object

Client libraries

C++

For more information, see the Cloud Storage C++ API reference documentation.

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

The following sample reads the tail of an appendable object:

namespace gcs = google::cloud::storage;
auto coro = [](gcs::AsyncClient& client, std::string bucket_name,
               std::string object_name) -> google::cloud::future<void> {
  // This coroutine simulates a "tail -f" command on a GCS object. It
  // repeatedly polls an appendable object for new content. In a real
  // application, the object would be written to by a separate process.
  std::cout << "Polling for content from " << object_name << "...\n\n";

  // Start an appendable upload. In a real application this would be done by
  // a separate process, and this application would only know the object name.
  auto [writer, token] = (co_await client.StartAppendableObjectUpload(
                              gcs::BucketName(bucket_name), object_name))
                             .value();

  std::int64_t bytes_read = 0;
  for (int i = 0; i != 2; ++i) {
    // In a real application, another process would append data here.
    // We simulate this by writing to the object.
    auto content =
        "More data for tail example, iteration " + std::to_string(i) + "\n";
    token =
        (co_await writer.Write(std::move(token), gcs::WritePayload(content)))
            .value();
    (void)co_await writer.Flush();

    // Poll for new content by reading from the last known offset.
    auto payload =
        (co_await client.ReadObjectRange(gcs::BucketName(bucket_name),
                                         object_name, bytes_read, -1))
            .value();
    for (auto const& buffer : payload.contents()) {
      std::cout << std::string(buffer.begin(), buffer.end());
      bytes_read += buffer.size();
    }
    // In a real application you would wait here, e.g. with a timer.
    std::this_thread::sleep_for(std::chrono::seconds(1));
  }
};

Go

For more information, see the Cloud Storage Go API reference documentation.

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

The following sample reads the tail of an appendable object:

import (
	"bytes"
	"context"
	"errors"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
	"cloud.google.com/go/storage/experimental"
)

// readAppendableObjectTail simulates a "tail -f" command on a GCS object. It
// repeatedly polls an appendable object for new content. In a real
// application, the object would be written to by a separate process.
func readAppendableObjectTail(w io.Writer, bucket, object string) ([]byte, error) {
	// bucket := "bucket-name"
	// object := "object-name"
	ctx := context.Background()
	client, err := storage.NewGRPCClient(ctx, experimental.WithZonalBucketAPIs())
	if err != nil {
		return nil, fmt.Errorf("storage.NewGRPCClient: %w", err)
	}
	defer client.Close()

	// Set a context timeout. When this timeout is reached, the read stream
	// will be closed, so omit this to tail indefinitely.
	ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
	defer cancel()

	// Create a new appendable object and write some data.
	writer := client.Bucket(bucket).Object(object).If(storage.Conditions{DoesNotExist: true}).NewWriter(ctx)
	if _, err := writer.Write([]byte("Some data\n")); err != nil {
		return nil, fmt.Errorf("Writer.Write: %w", err)
	}
	if err := writer.Close(); err != nil {
		return nil, fmt.Errorf("Writer.Close: %w", err)
	}
	gen := writer.Attrs().Generation

	// Create the MultiRangeDownloader, which opens a read stream to the object.
	mrd, err := client.Bucket(bucket).Object(object).NewMultiRangeDownloader(ctx)
	if err != nil {
		return nil, fmt.Errorf("NewMultiRangeDownloader: %w", err)
	}

	// In a goroutine, poll the object. In this example we continue until all the
	// bytes we expect to see were received, but in a real application this could
	// continue to poll indefinitely until some signal is received.
	var buf bytes.Buffer
	var mrdErr error
	done := make(chan bool)
	go func() {
		var currOff int64
		rangeDownloaded := make(chan bool)
		for buf.Len() < 100 {
			// Add the current range and wait for it to be downloaded.
			// Using a length of 0 will read to the current end of the object.
			// The callback will give the actual number of bytes that were
			// read in each iteration.
			mrd.Add(&buf, currOff, 0, func(offset, length int64, err error) {
				// After each range is received, update
				// the starting offset based on how many bytes were received.
				if err != nil {
					mrdErr = err
				}
				currOff += length
				rangeDownloaded <- true
			})
			// Wait for the range download to complete with a timeout of 10s.
			select {
			case <-rangeDownloaded:
			case <-time.After(10 * time.Second):
				mrdErr = mrd.Error()
				if mrdErr == nil {
					mrdErr = errors.New("range request timed out after 10s")
				}
			}

			if mrdErr != nil {
				break
			}
			time.Sleep(1 * time.Second)
		}
		// After exiting the loop, close MultiRangeDownloader and signal that
		// all ranges have been read.
		if err := mrd.Close(); err != nil {
			mrdErr = err
		}
		done <- true
	}()

	// Meanwhile, continue to write 10 bytes at a time to the object.
	// This could be done by calling NewWriterFromAppendable object repeatedly
	// (as in the example) or calling Writer.Flush without closing the Writer.
	for range 9 {
		appendWriter, offset, err := client.Bucket(bucket).Object(object).Generation(gen).NewWriterFromAppendableObject(ctx, nil)
		if err != nil {
			return nil, fmt.Errorf("NewWriterFromAppendableObject: %w", err)
		}
		if _, err := appendWriter.Write([]byte("more data\n")); err != nil {
			return nil, fmt.Errorf("appendWriter.Write: %w", err)
		}
		if err := appendWriter.Close(); err != nil {
			return nil, fmt.Errorf("appendWriter.Close: %w", err)
		}
		fmt.Fprintf(w, "Wrote 10 bytes at offset %v", offset)
	}

	// Wait for tailing goroutine to exit.
	<-done
	if mrdErr != nil {
		return nil, fmt.Errorf("MultiRangeDownloader: %w", err)
	}
	fmt.Fprintf(w, "Read %v bytes from object %v", buf.Len(), object)
	return buf.Bytes(), nil
}

Java

For more information, see the Cloud Storage Java API reference documentation.

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

The following sample reads the tail of an appendable object:


import com.google.api.core.ApiFuture;
import com.google.cloud.storage.BlobAppendableUpload;
import com.google.cloud.storage.BlobAppendableUpload.AppendableUploadWriteableByteChannel;
import com.google.cloud.storage.BlobAppendableUploadConfig;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.BlobReadSession;
import com.google.cloud.storage.FlushPolicy;
import com.google.cloud.storage.RangeSpec;
import com.google.cloud.storage.ReadProjectionConfigs;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageChannelUtils;
import com.google.cloud.storage.StorageOptions;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;

public class ReadAppendableObjectTail {
  public static void readAppendableObjectTail(String bucketName, String objectName)
      throws Exception {
    // The ID of your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    // The ID of your GCS object
    // String objectName = "your-object-name";

    try (Storage storage = StorageOptions.grpc().build().getService()) {
      BlobId blobId = BlobId.of(bucketName, objectName);
      BlobInfo info = BlobInfo.newBuilder(blobId).build();
      int totalToWrite = 64 * 1000;
      // Define our flush policy to flush small increments
      // This is useful for demonstration purposes, but you should use more appropriate values for
      // your workload.
      int flushSize = totalToWrite / 8;
      FlushPolicy.MinFlushSizeFlushPolicy flushPolicy =
          FlushPolicy.minFlushSize(flushSize).withMaxPendingBytes(flushSize);
      BlobAppendableUploadConfig appendableUploadConfig =
          BlobAppendableUploadConfig.of().withFlushPolicy(flushPolicy);
      BlobAppendableUpload upload =
          storage.blobAppendableUpload(
              info, appendableUploadConfig, Storage.BlobWriteOption.doesNotExist());
      // Create the object, we'll takeover to write for our example.
      upload.open().closeWithoutFinalizing();
      BlobInfo gen1 = upload.getResult().get();
      BlobAppendableUpload takeover = storage.blobAppendableUpload(gen1, appendableUploadConfig);

      try (AppendableUploadWriteableByteChannel channel = takeover.open()) {
        // Start a background thread to write some data on a periodic basis
        // In reality, you're application would probably be doing thing in another scope
        Thread writeThread = startWriteThread(totalToWrite, channel, flushPolicy);
        try (BlobReadSession readSession =
            storage.blobReadSession(gen1.getBlobId()).get(10, TimeUnit.SECONDS)) {
          int zeroCnt = 0;
          long read = 0;
          while (read < totalToWrite) {
            if (zeroCnt >= 30 && !channel.isOpen()) {
              System.out.println("breaking");
              break;
            }
            ApiFuture<byte[]> future =
                readSession.readAs(
                    ReadProjectionConfigs.asFutureBytes()
                        .withRangeSpec(RangeSpec.of(read, flushPolicy.getMinFlushSize())));
            byte[] bytes = future.get(20, TimeUnit.SECONDS);

            read += bytes.length;
            long defaultSleep = 1_500L;
            if (bytes.length == 0) {
              zeroCnt++;
              long millis = defaultSleep * zeroCnt;
              System.out.println("millis = " + millis);
              Thread.sleep(millis);
            } else {
              zeroCnt = 0;
              System.out.println("bytes.length = " + bytes.length + " read = " + read);
              Thread.sleep(defaultSleep);
            }
          }
          assert read == totalToWrite : "not enough bytes";
        }
        writeThread.join();
      }
    }
  }

  private static Thread startWriteThread(
      int totalToWrite,
      AppendableUploadWriteableByteChannel channel,
      FlushPolicy.MinFlushSizeFlushPolicy flushPolicy) {
    Thread writeThread =
        new Thread(
            () -> {
              try {
                for (long written = 0; written < totalToWrite; ) {
                  byte alphaOffset = (byte) (written % 0x1a);

                  ByteBuffer buf = ByteBuffer.wrap(new byte[] {(byte) (0x41 + alphaOffset)});
                  int w = StorageChannelUtils.blockingEmptyTo(buf, channel);
                  written += w;
                  if (written % flushPolicy.getMinFlushSize() == 0) {
                    channel.flush();
                    Thread.sleep(40);
                  }
                }
                channel.closeWithoutFinalizing();

              } catch (IOException | InterruptedException e) {
                throw new RuntimeException(e);
              }
            });
    writeThread.start();
    return writeThread;
  }
}

Python

For more information, see the Cloud Storage Python API reference documentation.

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

The following sample reads the tail of an appendable object:

async def appender(writer: AsyncAppendableObjectWriter, duration: int):
    """Appends 10 bytes to the object every second for a given duration."""
    print("Appender started.")
    bytes_appended = 0
    start_time = time.monotonic()
    # Run the appender for the specified duration.
    while time.monotonic() - start_time < duration:
        await writer.append(BYTES_TO_APPEND)
        now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
        bytes_appended += NUM_BYTES_TO_APPEND_EVERY_SECOND
        print(
            f"[{now}] Appended {NUM_BYTES_TO_APPEND_EVERY_SECOND} new bytes. Total appended: {bytes_appended} bytes."
        )
        await asyncio.sleep(0.1)
    print("Appender finished.")


async def tailer(
    bucket_name: str, object_name: str, duration: int, client: AsyncGrpcClient
):
    """Tails the object by reading new data as it is appended."""
    print("Tailer started.")
    start_byte = 0
    start_time = time.monotonic()
    mrd = AsyncMultiRangeDownloader(client, bucket_name, object_name)
    try:
        await mrd.open()
        # Run the tailer for the specified duration.
        while time.monotonic() - start_time < duration:
            output_buffer = BytesIO()
            # A download range of (start, 0) means to read from 'start' to the end.
            await mrd.download_ranges([(start_byte, 0, output_buffer)])

            bytes_downloaded = output_buffer.getbuffer().nbytes
            if bytes_downloaded > 0:
                now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
                print(f"[{now}] Tailer read {bytes_downloaded} new bytes: ")
                start_byte += bytes_downloaded

            await asyncio.sleep(0.1)  # Poll for new data every 0.1 seconds.
    finally:
        if mrd.is_stream_open:
            await mrd.close()
    print("Tailer finished.")


# read_appendable_object_tail simulates a "tail -f" command on a GCS object. It
# repeatedly polls an appendable object for new content. In a real
# application, the object would be written to by a separate process.
async def read_appendable_object_tail(
    bucket_name: str, object_name: str, duration: int, grpc_client=None
):
    """Main function to create an appendable object and run tasks.

    grpc_client: an existing grpc_client to use, this is only for testing.
    """
    if grpc_client is None:
        grpc_client = AsyncGrpcClient()
    writer = AsyncAppendableObjectWriter(
        client=grpc_client,
        bucket_name=bucket_name,
        object_name=object_name,
    )
    # 1. Create an empty appendable object.
    try:
        # 1. Create an empty appendable object.
        await writer.open()
        print(f"Created empty appendable object: {object_name}")

        # 2. Create the appender and tailer coroutines.
        appender_task = asyncio.create_task(appender(writer, duration))
        tailer_task = asyncio.create_task(
            tailer(bucket_name, object_name, duration, grpc_client)
        )

        # 3. Execute the coroutines concurrently.
        await asyncio.gather(appender_task, tailer_task)
    finally:
        if writer._is_stream_open:
            await writer.close()
            print("Writer closed.")

What's next