您可以從區域值區讀取資料,並將資料附加至區域值區中的物件。區域值區可透過 Rapid Bucket 取得,並使用Rapid 儲存空間類別。Rapid 儲存空間類別是高效能的儲存空間類別,專為 I/O 密集型工作負載最佳化。可附加的物件可讓您逐步將資料新增至檔案,不必重寫整個物件,有助於更有效率地管理持續流動的資料。
本頁說明如何讀取及附加儲存在區域值區中的物件,並示範如何執行下列作業:
建立可附加的物件並寫入。
讀取可附加的物件。
暫停、繼續及完成可附加的物件。
讀取可附加物件的尾端。
使用這個頁面之前,建議先閱讀下列資源:
瞭解如何建立區域值區。
對物件進行可附加的寫入作業
用戶端程式庫
C++
詳情請參閱「Cloud Storage C++ API 參考文件」。
如要向 Cloud Storage 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證作業」。
以下範例會上傳可附加的物件:
namespace gcs = google::cloud::storage;
auto coro = [](gcs::AsyncClient& client, std::string bucket_name,
std::string object_name)
-> google::cloud::future<google::storage::v2::Object> {
auto [writer, token] =
(co_await client.StartAppendableObjectUpload(
gcs::BucketName(std::move(bucket_name)), std::move(object_name)))
.value();
std::cout << "Appendable upload started for object " << object_name << "\n";
token = (co_await writer.Write(std::move(token),
gcs::WritePayload("Some data\n")))
.value();
std::cout << "Wrote initial data.\n";
// Flush the buffered data to the service. This is not a terminal
// operation. The writer can be used after the flush completes.
// After a flush, the data is visible to readers.
auto flush_status = co_await writer.Flush();
if (!flush_status.ok()) throw std::runtime_error(flush_status.message());
std::cout << "Flush completed. Persisted size is now "
<< absl::get<std::int64_t>(writer.PersistedState()) << "\n";
// The writer is still open. We can write more data.
token = (co_await writer.Write(std::move(token),
gcs::WritePayload("Some more data\n")))
.value();
std::cout << "Wrote more data.\n";
// Finalize the upload to make it a regular object.
co_return (co_await writer.Finalize(std::move(token))).value();
};Go
詳情請參閱「Cloud Storage Go API 參考文件」。
如要向 Cloud Storage 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證作業」。
以下範例會上傳可附加的物件:
import (
"context"
"fmt"
"io"
"time"
"cloud.google.com/go/storage"
"cloud.google.com/go/storage/experimental"
)
// createAndWriteAppendableObject creates and uploads a new appendable object in
// a rapid bucket. The object will not be finalized.
func createAndWriteAppendableObject(w io.Writer, bucket, object string) error {
// bucket := "bucket-name"
// object := "object-name"
ctx := context.Background()
client, err := storage.NewGRPCClient(ctx, experimental.WithZonalBucketAPIs())
if err != nil {
return fmt.Errorf("storage.NewGRPCClient: %w", err)
}
defer client.Close()
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
// Create a Writer and write some data.
writer := client.Bucket(bucket).Object(object).NewWriter(ctx)
if _, err := writer.Write([]byte("Some data\n")); err != nil {
return fmt.Errorf("Writer.Write: %w", err)
}
// Flush the buffered data to the service. This is not a terminal
// operation. The Writer can be used after the flush completes.
// After a flush, the data is visible to readers.
size, err := writer.Flush()
if err != nil {
return fmt.Errorf("Writer.Flush: %w", err)
}
fmt.Fprintf(w, "Flush completed. Persisted size is now %d\n", size)
// The Writer is still open. We can write more data.
if _, err := writer.Write([]byte("Some more data\n")); err != nil {
return fmt.Errorf("Writer.Write: %w", err)
}
// Close the Writer to flush any remaining buffered data.
// The object will be unfinalized, which means another writer can
// later append to the object.
if err := writer.Close(); err != nil {
return fmt.Errorf("Writer.Close: %w", err)
}
fmt.Fprintf(w, "Uploaded object %v\n", object)
return nil
}
Java
詳情請參閱「Cloud Storage Java API 參考文件」。
如要向 Cloud Storage 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證作業」。
以下範例會上傳可附加的物件:
import com.google.cloud.storage.BlobAppendableUpload;
import com.google.cloud.storage.BlobAppendableUpload.AppendableUploadWriteableByteChannel;
import com.google.cloud.storage.BlobAppendableUploadConfig;
import com.google.cloud.storage.BlobAppendableUploadConfig.CloseAction;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.FlushPolicy;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Paths;
import java.util.Locale;
public class CreateAndWriteAppendableObject {
public static void createAndWriteAppendableObject(
String bucketName, String objectName, String filePath) throws Exception {
// The ID of your GCS bucket
// String bucketName = "your-unique-bucket-name";
// The ID of your GCS object
// String objectName = "your-object-name";
// The path to the file to upload
// String filePath = "path/to/your/file";
try (Storage storage = StorageOptions.grpc().build().getService()) {
BlobId blobId = BlobId.of(bucketName, objectName);
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build();
int flushSize = 64 * 1000;
FlushPolicy.MaxFlushSizeFlushPolicy flushPolicy = FlushPolicy.maxFlushSize(flushSize);
BlobAppendableUploadConfig config =
BlobAppendableUploadConfig.of()
.withCloseAction(CloseAction.FINALIZE_WHEN_CLOSING)
.withFlushPolicy(flushPolicy);
BlobAppendableUpload uploadSession = storage.blobAppendableUpload(blobInfo, config);
try (AppendableUploadWriteableByteChannel channel = uploadSession.open();
ReadableByteChannel readableByteChannel = FileChannel.open(Paths.get(filePath))) {
ByteStreams.copy(readableByteChannel, channel);
// Since the channel is in a try-with-resources block, channel.close()
// will be implicitly called here, which triggers the finalization.
} catch (IOException ex) {
throw new IOException("Failed to upload to object " + blobId.toGsUtilUri(), ex);
}
BlobInfo result = storage.get(blobId);
System.out.printf(
Locale.US,
"Object %s successfully uploaded",
result.getBlobId().toGsUtilUriWithGeneration());
}
}
}
Python
詳情請參閱「Cloud Storage Python API 參考文件」。
如要向 Cloud Storage 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證作業」。
以下範例會上傳可附加的物件:
async def storage_create_and_write_appendable_object(
bucket_name, object_name, grpc_client=None
):
"""Uploads an appendable object to zonal bucket.
grpc_client: an existing grpc_client to use, this is only for testing.
"""
if grpc_client is None:
grpc_client = AsyncGrpcClient()
writer = AsyncAppendableObjectWriter(
client=grpc_client,
bucket_name=bucket_name,
object_name=object_name,
generation=0, # throws `FailedPrecondition` if object already exists.
)
# This creates a new appendable object of size 0 and opens it for appending.
await writer.open()
# appends data to the object
# you can perform `.append` multiple times as needed. Data will be appended
# to the end of the object.
await writer.append(b"Some data")
# Once all appends are done, close the gRPC bidirectional stream.
await writer.close()
print(
f"Appended object {object_name} created of size {writer.persisted_size} bytes."
)
讀取物件
用戶端程式庫
C++
詳情請參閱「Cloud Storage C++ API 參考文件」。
如要向 Cloud Storage 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證作業」。
下例示範如何對單一物件執行範圍讀取作業:
namespace gcs = google::cloud::storage;
// Helper coroutine to count newlines returned by an AsyncReader.
// This helps consume the data from the read operation.
auto count_newlines =
[](gcs::AsyncReader reader,
gcs::AsyncToken token) -> google::cloud::future<std::uint64_t> {
std::uint64_t count = 0;
while (token.valid()) {
auto [payload, t] = (co_await reader.Read(std::move(token))).value();
token = std::move(t);
for (auto const& buffer : payload.contents()) {
count += std::count(buffer.begin(), buffer.end(), '\n');
}
}
co_return count;
};
auto coro =
[&count_newlines](
gcs::AsyncClient& client, std::string bucket_name,
std::string object_name) -> google::cloud::future<std::uint64_t> {
auto descriptor =
(co_await client.Open(gcs::BucketName(std::move(bucket_name)),
std::move(object_name)))
.value();
auto [reader, token] = descriptor.Read(0, 1024);
co_return co_await count_newlines(std::move(reader), std::move(token));
};下列範例會對單一物件執行完整讀取作業:
namespace gcs = google::cloud::storage;
// Helper coroutine to count newlines returned by an AsyncReader.
// This helps consume the data from the read operation.
auto count_newlines =
[](gcs::AsyncReader reader,
gcs::AsyncToken token) -> google::cloud::future<std::uint64_t> {
std::uint64_t count = 0;
while (token.valid()) {
auto [payload, t] = (co_await reader.Read(std::move(token))).value();
token = std::move(t);
for (auto const& buffer : payload.contents()) {
count += std::count(buffer.begin(), buffer.end(), '\n');
}
}
co_return count;
};
auto coro =
[&count_newlines](
gcs::AsyncClient& client, std::string bucket_name,
std::string object_name) -> google::cloud::future<std::uint64_t> {
auto descriptor =
(co_await client.Open(gcs::BucketName(std::move(bucket_name)),
std::move(object_name)))
.value();
auto [reader, token] = descriptor.ReadFromOffset(0);
co_return co_await count_newlines(std::move(reader), std::move(token));
};下例示範如何對單一物件執行範圍讀取:
namespace gcs = google::cloud::storage;
// Helper coroutine to count newlines returned by an AsyncReader.
auto count_newlines =
[](gcs::AsyncReader reader,
gcs::AsyncToken token) -> google::cloud::future<std::uint64_t> {
std::uint64_t count = 0;
while (token.valid()) {
auto [payload, t] = (co_await reader.Read(std::move(token))).value();
token = std::move(t);
for (auto const& buffer : payload.contents()) {
count += std::count(buffer.begin(), buffer.end(), '\n');
}
}
co_return count;
};
auto coro =
[&count_newlines](
gcs::AsyncClient& client, std::string bucket_name,
std::string object_name) -> google::cloud::future<std::uint64_t> {
auto descriptor =
(co_await client.Open(gcs::BucketName(std::move(bucket_name)),
std::move(object_name)))
.value();
auto [r1, t1] = descriptor.Read(0, 1024);
auto [r2, t2] = descriptor.Read(0, 1024);
auto [r3, t3] = descriptor.Read(1024, 1024);
auto c1 = count_newlines(std::move(r1), std::move(t1));
auto c2 = count_newlines(std::move(r2), std::move(t2));
auto c3 = count_newlines(std::move(r3), std::move(t3));
co_return (co_await std::move(c1)) + (co_await std::move(c2)) +
(co_await std::move(c3));
};下列範例會對多個物件執行範圍讀取作業 (每個物件一次讀取):
namespace gcs = google::cloud::storage;
// Helper coroutine to count newlines returned by an AsyncReader.
auto count_newlines =
[](gcs::AsyncReader reader,
gcs::AsyncToken token) -> google::cloud::future<std::uint64_t> {
std::uint64_t count = 0;
while (token.valid()) {
auto [payload, t] = (co_await reader.Read(std::move(token))).value();
token = std::move(t);
for (auto const& buffer : payload.contents()) {
count += std::count(buffer.begin(), buffer.end(), '\n');
}
}
co_return count;
};
auto coro = [&count_newlines](
gcs::AsyncClient& client, std::string bucket_name,
std::string object_name1, std::string object_name2,
std::string object_name3) -> google::cloud::future<void> {
// List of object names to read (passed as arguments)
std::vector<std::string> object_names = {object_name1, object_name2,
object_name3};
std::vector<google::cloud::future<std::uint64_t>> futures;
// Start ranged reads for all objects and collect futures
// This example opens multiple objects, not one object multiple times.
for (auto const& name : object_names) {
auto descriptor =
(co_await client.Open(gcs::BucketName(bucket_name), name)).value();
auto [reader, token] = descriptor.Read(0, 1024);
futures.push_back(count_newlines(std::move(reader), std::move(token)));
}
// Process futures as they become ready and print results
while (!futures.empty()) {
bool progress_made = false;
for (std::size_t i = 0; i < futures.size(); ++i) {
if (futures[i].is_ready()) { // Check if the future is ready
auto count = futures[i].get();
std::cout << "Object " << object_names[i] << " read returned "
<< count << " newlines\n";
futures.erase(futures.begin() + i);
object_names.erase(object_names.begin() + i);
progress_made = true;
break; // Restart the loop after modifying the vectors
}
}
if (!progress_made) {
std::this_thread::sleep_for(
std::chrono::milliseconds(10)); // Avoid busy spin
}
}
};Go
詳情請參閱「Cloud Storage Go API 參考文件」。
如要向 Cloud Storage 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證作業」。
下列範例會對單一物件執行範圍讀取作業:
import (
"bytes"
"context"
"fmt"
"io"
"time"
"cloud.google.com/go/storage"
"cloud.google.com/go/storage/experimental"
)
// openObjectSingleRangedRead reads a single range from an object in a
// rapid bucket.
func openObjectSingleRangedRead(w io.Writer, bucket, object string) ([]byte, error) {
// bucket := "bucket-name"
// object := "object-name"
ctx := context.Background()
client, err := storage.NewGRPCClient(ctx, experimental.WithZonalBucketAPIs())
if err != nil {
return nil, fmt.Errorf("storage.NewGRPCClient: %w", err)
}
defer client.Close()
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
// Read the first KiB of the file and copy into a buffer.
r, err := client.Bucket(bucket).Object(object).NewRangeReader(ctx, 0, 1024)
if err != nil {
return nil, fmt.Errorf("NewRangeReader: %w", err)
}
defer r.Close()
buf := new(bytes.Buffer)
if _, err := io.Copy(buf, r); err != nil {
return nil, fmt.Errorf("copying data: %v", err)
}
fmt.Fprintf(w, "Read the first 1024 bytes of %v into a buffer\n", object)
return buf.Bytes(), nil
}
下列範例會對單一物件執行完整讀取作業:
import (
"bytes"
"context"
"fmt"
"io"
"time"
"cloud.google.com/go/storage"
"cloud.google.com/go/storage/experimental"
)
// OpenObjectReadFullObject reads a full object's data from a
// rapid bucket.
func openObjectReadFullObject(w io.Writer, bucket, object string) ([]byte, error) {
// bucket := "bucket-name"
// object := "object-name"
ctx := context.Background()
client, err := storage.NewGRPCClient(ctx, experimental.WithZonalBucketAPIs())
if err != nil {
return nil, fmt.Errorf("storage.NewGRPCClient: %w", err)
}
defer client.Close()
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
// Read the first KiB of the file and copy into a buffer.
r, err := client.Bucket(bucket).Object(object).NewReader(ctx)
if err != nil {
return nil, fmt.Errorf("NewReader: %w", err)
}
defer r.Close()
buf := new(bytes.Buffer)
if _, err := io.Copy(buf, r); err != nil {
return nil, fmt.Errorf("copying data: %v", err)
}
fmt.Fprintf(w, "Read the data of %v into a buffer\n", object)
return buf.Bytes(), nil
}
下例示範如何對單一物件執行範圍讀取:
import (
"bytes"
"context"
"fmt"
"io"
"time"
"cloud.google.com/go/storage"
"cloud.google.com/go/storage/experimental"
)
// openObjectMultipleRangedRead opens a single object using
// MultiRangeDownloader to download multiple ranges.
func openObjectMultipleRangedRead(w io.Writer, bucket, object string) ([][]byte, error) {
// bucket := "bucket-name"
// object := "object-name"
ctx := context.Background()
client, err := storage.NewGRPCClient(ctx, experimental.WithZonalBucketAPIs())
if err != nil {
return nil, fmt.Errorf("storage.NewGRPCClient: %w", err)
}
defer client.Close()
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
// Create the MultiRangeDownloader, which opens a stream to the object.
mrd, err := client.Bucket(bucket).Object(object).NewMultiRangeDownloader(ctx)
if err != nil {
return nil, fmt.Errorf("NewMultiRangeDownloader: %w", err)
}
// Add some 1 KiB ranges to download. This call is non-blocking. The
// provided callback is invoked when the range download is complete.
startOffsets := []int64{0, 1024, 2048}
var dataBufs [3]bytes.Buffer
var errs []error
for i, off := range startOffsets {
mrd.Add(&dataBufs[i], off, 1024, func(off, length int64, err error) {
if err != nil {
errs = append(errs, err)
} else {
fmt.Fprintf(w, "downloaded range at offset %v", off)
}
})
}
// Wait for all downloads to complete.
mrd.Wait()
if len(errs) > 0 {
return nil, fmt.Errorf("one or more downloads failed; errors: %v", errs)
}
if err := mrd.Close(); err != nil {
return nil, fmt.Errorf("MultiRangeDownloader.Close: %w", err)
}
fmt.Fprintf(w, "Read the ranges of %v into memory\n", object)
// Collect the byte slices
var byteSlices [][]byte
for _, buf := range dataBufs {
byteSlices = append(byteSlices, buf.Bytes())
}
return byteSlices, nil
}
Java
詳情請參閱「Cloud Storage Java API 參考文件」。
如要向 Cloud Storage 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證作業」。
下例示範如何對單一物件執行範圍讀取作業:
import com.google.api.core.ApiFuture;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobReadSession;
import com.google.cloud.storage.RangeSpec;
import com.google.cloud.storage.ReadProjectionConfigs;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import java.util.concurrent.TimeUnit;
public class OpenObjectSingleRangedRead {
public static void openObjectSingleRangedRead(
String bucketName, String objectName, long offset, int length) throws Exception {
// The ID of your GCS bucket
// String bucketName = "your-unique-bucket-name";
// The ID of your GCS object
// String objectName = "your-object-name";
// The beginning of the range
// long offset = 0
// The maximum number of bytes to read from the object.
// int length = 64;
try (Storage storage = StorageOptions.grpc().build().getService()) {
BlobId blobId = BlobId.of(bucketName, objectName);
ApiFuture<BlobReadSession> futureBlobReadSession = storage.blobReadSession(blobId);
try (BlobReadSession blobReadSession = futureBlobReadSession.get(10, TimeUnit.SECONDS)) {
// Define the range of bytes to read.
RangeSpec rangeSpec = RangeSpec.of(offset, length);
ApiFuture<byte[]> future =
blobReadSession.readAs(ReadProjectionConfigs.asFutureBytes().withRangeSpec(rangeSpec));
// Wait for the read to complete.
byte[] bytes = future.get();
System.out.println(
"Successfully read "
+ bytes.length
+ " bytes from object "
+ objectName
+ " in bucket "
+ bucketName);
}
}
}
}下列範例會對單一物件執行完整讀取作業:
import com.google.api.core.ApiFuture;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobReadSession;
import com.google.cloud.storage.ReadAsChannel;
import com.google.cloud.storage.ReadProjectionConfigs;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import java.nio.ByteBuffer;
import java.nio.channels.ScatteringByteChannel;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
public class OpenObjectReadFullObject {
public static void openObjectReadFullObject(String bucketName, String objectName)
throws Exception {
// The ID of your GCS bucket
// String bucketName = "your-unique-bucket-name";
// The ID of your GCS object to read
// String objectName = "your-object-name";
try (Storage storage = StorageOptions.grpc().build().getService()) {
BlobId blobId = BlobId.of(bucketName, objectName);
ApiFuture<BlobReadSession> futureBlobReadSession = storage.blobReadSession(blobId);
try (BlobReadSession blobReadSession = futureBlobReadSession.get(10, TimeUnit.SECONDS)) {
ReadAsChannel readAsChannelConfig = ReadProjectionConfigs.asChannel();
try (ScatteringByteChannel channel = blobReadSession.readAs(readAsChannelConfig)) {
long totalBytesRead = 0;
ByteBuffer buffer = ByteBuffer.allocate(64 * 1024);
int bytesRead;
while ((bytesRead = channel.read(buffer)) != -1) {
totalBytesRead += bytesRead;
buffer.clear();
}
System.out.printf(
Locale.US,
"Successfully read a total of %d bytes from object %s%n",
totalBytesRead,
blobId.toGsUtilUri());
}
}
}
}
}下例示範如何對單一物件執行範圍讀取:
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobReadSession;
import com.google.cloud.storage.RangeSpec;
import com.google.cloud.storage.ReadProjectionConfigs;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class OpenObjectMultipleRangedRead {
public static void openObjectMultipleRangedRead(
String bucketName, String objectName, long offset1, int length1, long offset2, int length2)
throws Exception {
// The ID of your GCS bucket
// String bucketName = "your-unique-bucket-name";
// The ID of your GCS object
// String objectName = "your-object-name";
// The beginning of the range 1
// long offset = 0
// The maximum number of bytes to read in range 1
// int length = 16;
// The beginning of the range 2
// long offset = 16
// The maximum number of bytes to read in range 2
// int length = 32;
try (Storage storage = StorageOptions.grpc().build().getService()) {
BlobId blobId = BlobId.of(bucketName, objectName);
ApiFuture<BlobReadSession> futureBlobReadSession = storage.blobReadSession(blobId);
RangeSpec rangeSpec1 = RangeSpec.of(offset1, length1);
RangeSpec rangeSpec2 = RangeSpec.of(offset2, length2);
try (BlobReadSession blobReadSession = futureBlobReadSession.get(10, TimeUnit.SECONDS)) {
ApiFuture<byte[]> future1 =
blobReadSession.readAs(ReadProjectionConfigs.asFutureBytes().withRangeSpec(rangeSpec1));
ApiFuture<byte[]> future2 =
blobReadSession.readAs(ReadProjectionConfigs.asFutureBytes().withRangeSpec(rangeSpec2));
List<byte[]> allBytes = ApiFutures.allAsList(ImmutableList.of(future1, future2)).get();
byte[] bytes1 = allBytes.get(0);
byte[] bytes2 = allBytes.get(1);
System.out.println(
"Successfully read "
+ bytes1.length
+ " bytes from range 1 and "
+ bytes2.length
+ " bytes from range 2.");
}
}
}
}
下列範例會對多個物件執行範圍讀取作業:
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobReadSession;
import com.google.cloud.storage.RangeSpec;
import com.google.cloud.storage.ReadAsFutureBytes;
import com.google.cloud.storage.ReadProjectionConfigs;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class OpenMultipleObjectsRangedRead {
public static void multipleObjectsSingleRangedRead(
String bucketName, List<String> objectNames, long startOffset, int length) throws Exception {
// The ID of your GCS bucket
// String bucketName = "your-unique-bucket-name";
// The ID of your GCS objects to read
// List<String> objectName = Arrays.asList("object-1", "object-2", "object-3");
RangeSpec singleRange = RangeSpec.of(startOffset, length);
ReadAsFutureBytes rangeConfig =
ReadProjectionConfigs.asFutureBytes().withRangeSpec(singleRange);
try (Storage storage = StorageOptions.grpc().build().getService()) {
List<ApiFuture<byte[]>> futuresToWaitOn = new ArrayList<>();
System.out.printf(
"Initiating single ranged read [%d, %d] on %d objects...%n",
startOffset, startOffset + length - 1, objectNames.size());
for (String objectName : objectNames) {
BlobId blobId = BlobId.of(bucketName, objectName);
ApiFuture<BlobReadSession> futureReadSession = storage.blobReadSession(blobId);
ApiFuture<byte[]> readAndCloseFuture =
ApiFutures.transformAsync(
futureReadSession,
(BlobReadSession session) -> {
ApiFuture<byte[]> readFuture = session.readAs(rangeConfig);
readFuture.addListener(
() -> {
try {
session.close();
} catch (java.io.IOException e) {
System.err.println(
"WARN: Background error while closing session: " + e.getMessage());
}
},
MoreExecutors.directExecutor());
return readFuture;
},
MoreExecutors.directExecutor());
futuresToWaitOn.add(readAndCloseFuture);
}
ApiFutures.allAsList(futuresToWaitOn).get(30, TimeUnit.SECONDS);
System.out.println("All concurrent single-ranged read operations are complete.");
}
}
}Python
詳情請參閱「Cloud Storage Python API 參考文件」。
如要向 Cloud Storage 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證作業」。
下例示範如何對單一物件執行範圍讀取作業:
async def storage_open_object_single_ranged_read(
bucket_name, object_name, start_byte, size, grpc_client=None
):
"""Downloads a range of bytes from an object.
grpc_client: an existing grpc_client to use, this is only for testing.
"""
if grpc_client is None:
grpc_client = AsyncGrpcClient()
mrd = AsyncMultiRangeDownloader(grpc_client, bucket_name, object_name)
try:
# Open the object, mrd always opens in read mode.
await mrd.open()
# requested range will be downloaded into this buffer, user may provide
# their own buffer or file-like object.
output_buffer = BytesIO()
await mrd.download_ranges([(start_byte, size, output_buffer)])
finally:
if mrd.is_stream_open:
await mrd.close()
# Downloaded size can differ from requested size if object is smaller.
# mrd will download at most up to the end of the object.
downloaded_size = output_buffer.getbuffer().nbytes
print(f"Downloaded {downloaded_size} bytes from {object_name}")
下列範例會對單一物件執行完整讀取作業:
async def storage_open_object_read_full_object(
bucket_name, object_name, grpc_client=None
):
"""Downloads the entire content of an object using a multi-range downloader.
grpc_client: an existing grpc_client to use, this is only for testing.
"""
if grpc_client is None:
grpc_client = AsyncGrpcClient()
# mrd = Multi-Range-Downloader
mrd = AsyncMultiRangeDownloader(grpc_client, bucket_name, object_name)
try:
# Open the object, mrd always opens in read mode.
await mrd.open()
# This could be any buffer or file-like object.
output_buffer = BytesIO()
# A download range of (0, 0) means to read from the beginning to the end.
await mrd.download_ranges([(0, 0, output_buffer)])
finally:
if mrd.is_stream_open:
await mrd.close()
downloaded_bytes = output_buffer.getvalue()
print(
f"Downloaded all {len(downloaded_bytes)} bytes from object {object_name} in bucket {bucket_name}."
)
下例示範如何對單一物件執行範圍讀取:
async def storage_open_object_multiple_ranged_read(
bucket_name, object_name, grpc_client=None
):
"""Downloads multiple ranges of bytes from a single object into different buffers.
grpc_client: an existing grpc_client to use, this is only for testing.
"""
if grpc_client is None:
grpc_client = AsyncGrpcClient()
mrd = AsyncMultiRangeDownloader(grpc_client, bucket_name, object_name)
try:
# Open the object, mrd always opens in read mode.
await mrd.open()
# Specify four different buffers to download ranges into.
buffers = [BytesIO(), BytesIO(), BytesIO(), BytesIO()]
# Define the ranges to download. Each range is a tuple of (start_byte, size, buffer).
# All ranges will download 10 bytes from different starting positions.
# We choose arbitrary start bytes for this example. An object should be large enough.
# A user can choose any start byte between 0 and `object_size`.
# If `start_bytes` is greater than `object_size`, mrd will throw an error.
ranges = [
(0, 10, buffers[0]),
(20, 10, buffers[1]),
(40, 10, buffers[2]),
(60, 10, buffers[3]),
]
await mrd.download_ranges(ranges)
finally:
await mrd.close()
# Print the downloaded content from each buffer.
for i, output_buffer in enumerate(buffers):
downloaded_size = output_buffer.getbuffer().nbytes
print(
f"Downloaded {downloaded_size} bytes into buffer {i + 1} from start byte {ranges[i][0]}: {output_buffer.getvalue()}"
)
下列範例會對多個物件執行範圍讀取作業:
async def storage_open_multiple_objects_ranged_read(
bucket_name, object_names, grpc_client=None
):
"""Downloads a range of bytes from multiple objects concurrently.
grpc_client: an existing grpc_client to use, this is only for testing.
"""
if grpc_client is None:
grpc_client = AsyncGrpcClient()
async def _download_range(object_name):
"""Helper coroutine to download a range from a single object."""
mrd = AsyncMultiRangeDownloader(grpc_client, bucket_name, object_name)
try:
# Open the object, mrd always opens in read mode.
await mrd.open()
# Each object downloads the first 100 bytes.
start_byte = 0
size = 100
# requested range will be downloaded into this buffer, user may provide
# their own buffer or file-like object.
output_buffer = BytesIO()
await mrd.download_ranges([(start_byte, size, output_buffer)])
finally:
if mrd.is_stream_open:
await mrd.close()
# Downloaded size can differ from requested size if object is smaller.
# mrd will download at most up to the end of the object.
downloaded_size = output_buffer.getbuffer().nbytes
print(f"Downloaded {downloaded_size} bytes from {object_name}")
download_tasks = [_download_range(name) for name in object_names]
await asyncio.gather(*download_tasks)
暫停及繼續物件
用戶端程式庫
C++
詳情請參閱「Cloud Storage C++ API 參考文件」。
如要向 Cloud Storage 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證作業」。
下例示範如何暫停及繼續附加物件:
namespace gcs = google::cloud::storage;
auto coro = [](gcs::AsyncClient& client, std::string bucket_name,
std::string object_name)
-> google::cloud::future<google::storage::v2::Object> {
// Start an appendable upload and write some data.
auto [writer, token] = (co_await client.StartAppendableObjectUpload(
gcs::BucketName(bucket_name), object_name))
.value();
std::cout << "Appendable upload started.\n";
token = (co_await writer.Write(std::move(token),
gcs::WritePayload("paused data\n")))
.value();
// The writer is closed, but the upload is not finalized. This "pauses" the
// upload, as the object remains appendable.
auto close_status = co_await writer.Close();
if (!close_status.ok()) throw std::runtime_error(close_status.message());
std::cout << "Upload paused.\n";
// To resume the upload we need the object's generation. We can use the
// regular GCS client to get the latest metadata.
auto regular_client = gcs::Client();
auto metadata =
regular_client.GetObjectMetadata(bucket_name, object_name).value();
std::cout << "Object generation is " << metadata.generation() << "\n";
// Now resume the upload.
std::tie(writer, token) =
(co_await client.ResumeAppendableObjectUpload(
gcs::BucketName(bucket_name), object_name, metadata.generation()))
.value();
std::cout << "Upload resumed from offset "
<< absl::get<std::int64_t>(writer.PersistedState()) << "\n";
// Append the rest of the data.
token = (co_await writer.Write(std::move(token),
gcs::WritePayload("resumed data\n")))
.value();
// Finalize the upload and return the object metadata.
co_return (co_await writer.Finalize(std::move(token))).value();
};Go
詳情請參閱「Cloud Storage Go API 參考文件」。
如要向 Cloud Storage 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證作業」。
下例示範如何暫停及繼續附加物件:
import (
"context"
"fmt"
"io"
"time"
"cloud.google.com/go/storage"
"cloud.google.com/go/storage/experimental"
)
// pauseAndResumeAppendableUpload creates a new unfinalized appendable object,
// closes the Writer, then re-opens the object for writing using
// NewWriterFromAppendableObject.
func pauseAndResumeAppendableUpload(w io.Writer, bucket, object string) error {
// bucket := "bucket-name"
// object := "object-name"
ctx := context.Background()
client, err := storage.NewGRPCClient(ctx, experimental.WithZonalBucketAPIs())
if err != nil {
return fmt.Errorf("storage.NewGRPCClient: %w", err)
}
defer client.Close()
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
// Start an appendable upload and write some data.
writer := client.Bucket(bucket).Object(object).NewWriter(ctx)
if _, err := writer.Write([]byte("Some data\n")); err != nil {
return fmt.Errorf("Writer.Write: %w", err)
}
// The writer is closed, but the upload is not finalized. This "pauses" the
// upload, as the object remains appendable.
if err := writer.Close(); err != nil {
return fmt.Errorf("Writer.Close: %w", err)
}
fmt.Fprintf(w, "Uploaded unfinalized object %v\n", object)
// To resume the upload we need the object's generation. We can get this
// from the previous Writer after close.
gen := writer.Attrs().Generation
// Now resume the upload. Writer options including finalization can be
// passed on calling this constructor.
appendWriter, offset, err := client.Bucket(bucket).Object(object).Generation(gen).NewWriterFromAppendableObject(
ctx, &storage.AppendableWriterOpts{
FinalizeOnClose: true,
},
)
if err != nil {
return fmt.Errorf("NewWriterFromAppendableObject: %v", err)
}
fmt.Fprintf(w, "Resuming upload from offset %v\n", offset)
// Append the rest of the data and close the Writer to finalize.
if _, err := appendWriter.Write([]byte("resumed data\n")); err != nil {
return fmt.Errorf("appendWriter.Write: %v", err)
}
if err := appendWriter.Close(); err != nil {
return fmt.Errorf("Writer.Close: %w", err)
}
fmt.Fprintf(w, "Uploaded and finalized object %v\n", object)
return nil
}
Java
詳情請參閱「Cloud Storage Java API 參考文件」。
如要向 Cloud Storage 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證作業」。
下例示範如何暫停及繼續附加物件:
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobAppendableUpload;
import com.google.cloud.storage.BlobAppendableUpload.AppendableUploadWriteableByteChannel;
import com.google.cloud.storage.BlobAppendableUploadConfig;
import com.google.cloud.storage.BlobAppendableUploadConfig.CloseAction;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageChannelUtils;
import com.google.cloud.storage.StorageOptions;
import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.Locale;
public class PauseAndResumeAppendableObjectUpload {
public static void pauseAndResumeAppendableObjectUpload(
String bucketName, String objectName, String filePath) throws Exception {
// The ID of your GCS bucket
// String bucketName = "your-unique-bucket-name";
// The ID of your GCS object
// String objectName = "your-object-name";
// The path to the file to upload
// String filePath = "path/to/your/file";
try (Storage storage = StorageOptions.grpc().build().getService()) {
BlobId blobId = BlobId.of(bucketName, objectName);
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build();
// --- Step 1: Initial string write (PAUSE) ---
// Default close action will be CLOSE_WITHOUT_FINALIZING
BlobAppendableUploadConfig initialConfig = BlobAppendableUploadConfig.of();
BlobAppendableUpload initialUploadSession =
storage.blobAppendableUpload(blobInfo, initialConfig);
try (AppendableUploadWriteableByteChannel channel = initialUploadSession.open()) {
String initialData = "Initial data segment.\n";
ByteBuffer buffer = ByteBuffer.wrap(initialData.getBytes(StandardCharsets.UTF_8));
long totalBytesWritten = StorageChannelUtils.blockingEmptyTo(buffer, channel);
channel.flush();
System.out.printf(
Locale.US, "Wrote %d bytes (initial string) in first segment.\n", totalBytesWritten);
} catch (IOException ex) {
throw new IOException("Failed initial upload to object " + blobId.toGsUtilUri(), ex);
}
Blob existingBlob = storage.get(blobId);
long currentObjectSize = existingBlob.getSize();
System.out.printf(
Locale.US,
"Initial upload paused. Currently uploaded size: %d bytes\n",
currentObjectSize);
// --- Step 2: Resume upload with file content and finalize ---
// Use FINALIZE_WHEN_CLOSING to ensure the object is finalized on channel closure.
BlobAppendableUploadConfig resumeConfig =
BlobAppendableUploadConfig.of().withCloseAction(CloseAction.FINALIZE_WHEN_CLOSING);
BlobAppendableUpload resumeUploadSession =
storage.blobAppendableUpload(existingBlob.toBuilder().build(), resumeConfig);
try (FileChannel fileChannel = FileChannel.open(Paths.get(filePath));
AppendableUploadWriteableByteChannel channel = resumeUploadSession.open()) {
long bytesToAppend = fileChannel.size();
System.out.printf(
Locale.US,
"Appending the entire file (%d bytes) after the initial string.\n",
bytesToAppend);
ByteStreams.copy(fileChannel, channel);
}
BlobInfo result = storage.get(blobId);
System.out.printf(
Locale.US,
"\nObject %s successfully resumed and finalized. Total size: %d bytes\n",
result.getBlobId().toGsUtilUriWithGeneration(),
result.getSize());
}
}
}Python
詳情請參閱「Cloud Storage Python API 參考文件」。
如要向 Cloud Storage 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證作業」。
下例示範如何暫停及繼續附加物件:
async def storage_pause_and_resume_appendable_upload(
bucket_name, object_name, grpc_client=None
):
"""Demonstrates pausing and resuming an appendable object upload.
grpc_client: an existing grpc_client to use, this is only for testing.
"""
if grpc_client is None:
grpc_client = AsyncGrpcClient()
writer1 = AsyncAppendableObjectWriter(
client=grpc_client,
bucket_name=bucket_name,
object_name=object_name,
)
await writer1.open()
await writer1.append(b"First part of the data. ")
print(f"Appended {writer1.persisted_size} bytes with the first writer.")
# 2. After appending some data, close the writer to "pause" the upload.
# NOTE: you can pause indefinitely and still read the conetent uploaded so far using MRD.
await writer1.close()
print("First writer closed. Upload is 'paused'.")
# 3. Create a new writer, passing the generation number from the previous
# writer. This is a precondition to ensure that the object hasn't been
# modified since we last accessed it.
generation_to_resume = writer1.generation
print(f"Generation to resume from is: {generation_to_resume}")
writer2 = AsyncAppendableObjectWriter(
client=grpc_client,
bucket_name=bucket_name,
object_name=object_name,
generation=generation_to_resume,
)
# 4. Open the new writer.
try:
await writer2.open()
# 5. Append some more data using the new writer.
await writer2.append(b"Second part of the data.")
print(f"Appended more data. Total size is now {writer2.persisted_size} bytes.")
finally:
# 6. Finally, close the new writer.
if writer2._is_stream_open:
await writer2.close()
print("Second writer closed. Full object uploaded.")
完成物件
用戶端程式庫
C++
詳情請參閱「Cloud Storage C++ API 參考文件」。
如要向 Cloud Storage 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證作業」。
下例示範如何完成可附加的物件:
namespace gcs = google::cloud::storage;
auto coro = [](gcs::AsyncClient& client, std::string bucket_name,
std::string object_name)
-> google::cloud::future<google::storage::v2::Object> {
// Start an appendable upload.
auto [writer, token] =
(co_await client.StartAppendableObjectUpload(
gcs::BucketName(std::move(bucket_name)), std::move(object_name)))
.value();
std::cout << "Appendable upload started. \n";
// Write some data.
token =
(co_await writer.Write(std::move(token),
gcs::WritePayload("some data to finalize\n")))
.value();
// Finalize the upload. This makes the object non-appendable.
// No more data can be written to this writer.
auto object = (co_await writer.Finalize(std::move(token))).value();
std::cout << "Upload finalized.\n";
co_return object;
};Go
詳情請參閱「Cloud Storage Go API 參考文件」。
如要向 Cloud Storage 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證作業」。
下例示範如何完成可附加的物件:
import (
"context"
"fmt"
"io"
"time"
"cloud.google.com/go/storage"
"cloud.google.com/go/storage/experimental"
)
// finalizeAppendableObject creates, uploads and finalizes a new object in
// a rapid bucket.
func finalizeAppendableObject(w io.Writer, bucket, object string) error {
// bucket := "bucket-name"
// object := "object-name"
ctx := context.Background()
client, err := storage.NewGRPCClient(ctx, experimental.WithZonalBucketAPIs())
if err != nil {
return fmt.Errorf("storage.NewGRPCClient: %w", err)
}
defer client.Close()
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
// Create a Writer and set FinalizeOnClose so that the object will be
// finalized after the write is complete.
writer := client.Bucket(bucket).Object(object).NewWriter(ctx)
writer.FinalizeOnClose = true
if _, err := writer.Write([]byte("some data to finalize\n")); err != nil {
return fmt.Errorf("Writer.Write: %w", err)
}
// Close the Writer to flush any remaining buffered data and finalize
// the upload. This makes the object non-appendable.
// No more data can be written to this object.
if err := writer.Close(); err != nil {
return fmt.Errorf("Writer.Close: %w", err)
}
fmt.Fprintf(w, "Uploaded and finalized object %v\n", object)
return nil
}
Java
詳情請參閱「Cloud Storage Java API 參考文件」。
如要向 Cloud Storage 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證作業」。
下例示範如何完成可附加的物件:
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobAppendableUpload;
import com.google.cloud.storage.BlobAppendableUploadConfig;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
public class FinalizeAppendableObjectUpload {
public static void finalizeAppendableObjectUpload(String bucketName, String objectName)
throws Exception {
// The ID of your GCS bucket
// String bucketName = "your-unique-bucket-name";
// The ID of your GCS unfinalized appendable object
// String objectName = "your-object-name";
try (Storage storage = StorageOptions.grpc().build().getService()) {
BlobId blobId = BlobId.of(bucketName, objectName);
Blob existingBlob = storage.get(blobId);
if (existingBlob == null) {
System.out.println("Object " + objectName + " not found in bucket " + bucketName);
return;
}
BlobInfo blobInfoForTakeover = BlobInfo.newBuilder(existingBlob.getBlobId()).build();
BlobAppendableUpload finalizingSession =
storage.blobAppendableUpload(
blobInfoForTakeover,
BlobAppendableUploadConfig.of()
.withCloseAction(BlobAppendableUploadConfig.CloseAction.FINALIZE_WHEN_CLOSING));
try (BlobAppendableUpload.AppendableUploadWriteableByteChannel channel =
finalizingSession.open()) {
channel.finalizeAndClose();
}
System.out.println(
"Successfully finalized object " + objectName + " in bucket " + bucketName);
}
}
}Python
詳情請參閱「Cloud Storage Python API 參考文件」。
如要向 Cloud Storage 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證作業」。
下例示範如何完成可附加的物件:
async def storage_finalize_appendable_object_upload(
bucket_name, object_name, grpc_client=None
):
"""Creates, writes to, and finalizes an appendable object.
grpc_client: an existing grpc_client to use, this is only for testing.
"""
if grpc_client is None:
grpc_client = AsyncGrpcClient()
writer = AsyncAppendableObjectWriter(
client=grpc_client,
bucket_name=bucket_name,
object_name=object_name,
generation=0, # throws `FailedPrecondition` if object already exists.
)
# This creates a new appendable object of size 0 and opens it for appending.
await writer.open()
# Appends data to the object.
await writer.append(b"Some data")
# finalize the appendable object,
# NOTE:
# 1. once finalized no more appends can be done to the object.
# 2. If you don't want to finalize, you can simply call `writer.close`
# 3. calling `.finalize()` also closes the grpc-bidi stream, calling
# `.close` after `.finalize` may lead to undefined behavior.
object_resource = await writer.finalize()
print(f"Appendable object {object_name} created and finalized.")
print("Object Metadata:")
print(object_resource)
讀取物件的尾部
用戶端程式庫
C++
詳情請參閱「Cloud Storage C++ API 參考文件」。
如要向 Cloud Storage 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證作業」。
下例示範如何讀取可附加物件的尾端:
namespace gcs = google::cloud::storage;
auto coro = [](gcs::AsyncClient& client, std::string bucket_name,
std::string object_name) -> google::cloud::future<void> {
// This coroutine simulates a "tail -f" command on a GCS object. It
// repeatedly polls an appendable object for new content. In a real
// application, the object would be written to by a separate process.
std::cout << "Polling for content from " << object_name << "...\n\n";
// Start an appendable upload. In a real application this would be done by
// a separate process, and this application would only know the object name.
auto [writer, token] = (co_await client.StartAppendableObjectUpload(
gcs::BucketName(bucket_name), object_name))
.value();
std::int64_t bytes_read = 0;
for (int i = 0; i != 2; ++i) {
// In a real application, another process would append data here.
// We simulate this by writing to the object.
auto content =
"More data for tail example, iteration " + std::to_string(i) + "\n";
token =
(co_await writer.Write(std::move(token), gcs::WritePayload(content)))
.value();
(void)co_await writer.Flush();
// Poll for new content by reading from the last known offset.
auto payload =
(co_await client.ReadObjectRange(gcs::BucketName(bucket_name),
object_name, bytes_read, -1))
.value();
for (auto const& buffer : payload.contents()) {
std::cout << std::string(buffer.begin(), buffer.end());
bytes_read += buffer.size();
}
// In a real application you would wait here, e.g. with a timer.
std::this_thread::sleep_for(std::chrono::seconds(1));
}
};Go
詳情請參閱「Cloud Storage Go API 參考文件」。
如要向 Cloud Storage 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證作業」。
下例示範如何讀取可附加物件的尾端:
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"time"
"cloud.google.com/go/storage"
"cloud.google.com/go/storage/experimental"
)
// readAppendableObjectTail simulates a "tail -f" command on a GCS object. It
// repeatedly polls an appendable object for new content. In a real
// application, the object would be written to by a separate process.
func readAppendableObjectTail(w io.Writer, bucket, object string) ([]byte, error) {
// bucket := "bucket-name"
// object := "object-name"
ctx := context.Background()
client, err := storage.NewGRPCClient(ctx, experimental.WithZonalBucketAPIs())
if err != nil {
return nil, fmt.Errorf("storage.NewGRPCClient: %w", err)
}
defer client.Close()
// Set a context timeout. When this timeout is reached, the read stream
// will be closed, so omit this to tail indefinitely.
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
// Create a new appendable object and write some data.
writer := client.Bucket(bucket).Object(object).If(storage.Conditions{DoesNotExist: true}).NewWriter(ctx)
if _, err := writer.Write([]byte("Some data\n")); err != nil {
return nil, fmt.Errorf("Writer.Write: %w", err)
}
if err := writer.Close(); err != nil {
return nil, fmt.Errorf("Writer.Close: %w", err)
}
gen := writer.Attrs().Generation
// Create the MultiRangeDownloader, which opens a read stream to the object.
mrd, err := client.Bucket(bucket).Object(object).NewMultiRangeDownloader(ctx)
if err != nil {
return nil, fmt.Errorf("NewMultiRangeDownloader: %w", err)
}
// In a goroutine, poll the object. In this example we continue until all the
// bytes we expect to see were received, but in a real application this could
// continue to poll indefinitely until some signal is received.
var buf bytes.Buffer
var mrdErr error
done := make(chan bool)
go func() {
var currOff int64
rangeDownloaded := make(chan bool)
for buf.Len() < 100 {
// Add the current range and wait for it to be downloaded.
// Using a length of 0 will read to the current end of the object.
// The callback will give the actual number of bytes that were
// read in each iteration.
mrd.Add(&buf, currOff, 0, func(offset, length int64, err error) {
// After each range is received, update
// the starting offset based on how many bytes were received.
if err != nil {
mrdErr = err
}
currOff += length
rangeDownloaded <- true
})
// Wait for the range download to complete with a timeout of 10s.
select {
case <-rangeDownloaded:
case <-time.After(10 * time.Second):
mrdErr = mrd.Error()
if mrdErr == nil {
mrdErr = errors.New("range request timed out after 10s")
}
}
if mrdErr != nil {
break
}
time.Sleep(1 * time.Second)
}
// After exiting the loop, close MultiRangeDownloader and signal that
// all ranges have been read.
if err := mrd.Close(); err != nil {
mrdErr = err
}
done <- true
}()
// Meanwhile, continue to write 10 bytes at a time to the object.
// This could be done by calling NewWriterFromAppendable object repeatedly
// (as in the example) or calling Writer.Flush without closing the Writer.
for range 9 {
appendWriter, offset, err := client.Bucket(bucket).Object(object).Generation(gen).NewWriterFromAppendableObject(ctx, nil)
if err != nil {
return nil, fmt.Errorf("NewWriterFromAppendableObject: %w", err)
}
if _, err := appendWriter.Write([]byte("more data\n")); err != nil {
return nil, fmt.Errorf("appendWriter.Write: %w", err)
}
if err := appendWriter.Close(); err != nil {
return nil, fmt.Errorf("appendWriter.Close: %w", err)
}
fmt.Fprintf(w, "Wrote 10 bytes at offset %v", offset)
}
// Wait for tailing goroutine to exit.
<-done
if mrdErr != nil {
return nil, fmt.Errorf("MultiRangeDownloader: %w", err)
}
fmt.Fprintf(w, "Read %v bytes from object %v", buf.Len(), object)
return buf.Bytes(), nil
}
Java
詳情請參閱「Cloud Storage Java API 參考文件」。
如要向 Cloud Storage 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證作業」。
下例示範如何讀取可附加物件的尾端:
import com.google.api.core.ApiFuture;
import com.google.cloud.storage.BlobAppendableUpload;
import com.google.cloud.storage.BlobAppendableUpload.AppendableUploadWriteableByteChannel;
import com.google.cloud.storage.BlobAppendableUploadConfig;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.BlobReadSession;
import com.google.cloud.storage.FlushPolicy;
import com.google.cloud.storage.RangeSpec;
import com.google.cloud.storage.ReadProjectionConfigs;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageChannelUtils;
import com.google.cloud.storage.StorageOptions;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
public class ReadAppendableObjectTail {
public static void readAppendableObjectTail(String bucketName, String objectName)
throws Exception {
// The ID of your GCS bucket
// String bucketName = "your-unique-bucket-name";
// The ID of your GCS object
// String objectName = "your-object-name";
try (Storage storage = StorageOptions.grpc().build().getService()) {
BlobId blobId = BlobId.of(bucketName, objectName);
BlobInfo info = BlobInfo.newBuilder(blobId).build();
int totalToWrite = 64 * 1000;
// Define our flush policy to flush small increments
// This is useful for demonstration purposes, but you should use more appropriate values for
// your workload.
int flushSize = totalToWrite / 8;
FlushPolicy.MinFlushSizeFlushPolicy flushPolicy =
FlushPolicy.minFlushSize(flushSize).withMaxPendingBytes(flushSize);
BlobAppendableUploadConfig appendableUploadConfig =
BlobAppendableUploadConfig.of().withFlushPolicy(flushPolicy);
BlobAppendableUpload upload =
storage.blobAppendableUpload(
info, appendableUploadConfig, Storage.BlobWriteOption.doesNotExist());
// Create the object, we'll takeover to write for our example.
upload.open().closeWithoutFinalizing();
BlobInfo gen1 = upload.getResult().get();
BlobAppendableUpload takeover = storage.blobAppendableUpload(gen1, appendableUploadConfig);
try (AppendableUploadWriteableByteChannel channel = takeover.open()) {
// Start a background thread to write some data on a periodic basis
// In reality, you're application would probably be doing thing in another scope
Thread writeThread = startWriteThread(totalToWrite, channel, flushPolicy);
try (BlobReadSession readSession =
storage.blobReadSession(gen1.getBlobId()).get(10, TimeUnit.SECONDS)) {
int zeroCnt = 0;
long read = 0;
while (read < totalToWrite) {
if (zeroCnt >= 30 && !channel.isOpen()) {
System.out.println("breaking");
break;
}
ApiFuture<byte[]> future =
readSession.readAs(
ReadProjectionConfigs.asFutureBytes()
.withRangeSpec(RangeSpec.of(read, flushPolicy.getMinFlushSize())));
byte[] bytes = future.get(20, TimeUnit.SECONDS);
read += bytes.length;
long defaultSleep = 1_500L;
if (bytes.length == 0) {
zeroCnt++;
long millis = defaultSleep * zeroCnt;
System.out.println("millis = " + millis);
Thread.sleep(millis);
} else {
zeroCnt = 0;
System.out.println("bytes.length = " + bytes.length + " read = " + read);
Thread.sleep(defaultSleep);
}
}
assert read == totalToWrite : "not enough bytes";
}
writeThread.join();
}
}
}
private static Thread startWriteThread(
int totalToWrite,
AppendableUploadWriteableByteChannel channel,
FlushPolicy.MinFlushSizeFlushPolicy flushPolicy) {
Thread writeThread =
new Thread(
() -> {
try {
for (long written = 0; written < totalToWrite; ) {
byte alphaOffset = (byte) (written % 0x1a);
ByteBuffer buf = ByteBuffer.wrap(new byte[] {(byte) (0x41 + alphaOffset)});
int w = StorageChannelUtils.blockingEmptyTo(buf, channel);
written += w;
if (written % flushPolicy.getMinFlushSize() == 0) {
channel.flush();
Thread.sleep(40);
}
}
channel.closeWithoutFinalizing();
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
});
writeThread.start();
return writeThread;
}
}Python
詳情請參閱「Cloud Storage Python API 參考文件」。
如要向 Cloud Storage 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證作業」。
下例示範如何讀取可附加物件的尾端:
async def appender(writer: AsyncAppendableObjectWriter, duration: int):
"""Appends 10 bytes to the object every second for a given duration."""
print("Appender started.")
bytes_appended = 0
start_time = time.monotonic()
# Run the appender for the specified duration.
while time.monotonic() - start_time < duration:
await writer.append(BYTES_TO_APPEND)
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
bytes_appended += NUM_BYTES_TO_APPEND_EVERY_SECOND
print(
f"[{now}] Appended {NUM_BYTES_TO_APPEND_EVERY_SECOND} new bytes. Total appended: {bytes_appended} bytes."
)
await asyncio.sleep(0.1)
print("Appender finished.")
async def tailer(
bucket_name: str, object_name: str, duration: int, client: AsyncGrpcClient
):
"""Tails the object by reading new data as it is appended."""
print("Tailer started.")
start_byte = 0
start_time = time.monotonic()
mrd = AsyncMultiRangeDownloader(client, bucket_name, object_name)
try:
await mrd.open()
# Run the tailer for the specified duration.
while time.monotonic() - start_time < duration:
output_buffer = BytesIO()
# A download range of (start, 0) means to read from 'start' to the end.
await mrd.download_ranges([(start_byte, 0, output_buffer)])
bytes_downloaded = output_buffer.getbuffer().nbytes
if bytes_downloaded > 0:
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
print(f"[{now}] Tailer read {bytes_downloaded} new bytes: ")
start_byte += bytes_downloaded
await asyncio.sleep(0.1) # Poll for new data every 0.1 seconds.
finally:
if mrd.is_stream_open:
await mrd.close()
print("Tailer finished.")
# read_appendable_object_tail simulates a "tail -f" command on a GCS object. It
# repeatedly polls an appendable object for new content. In a real
# application, the object would be written to by a separate process.
async def read_appendable_object_tail(
bucket_name: str, object_name: str, duration: int, grpc_client=None
):
"""Main function to create an appendable object and run tasks.
grpc_client: an existing grpc_client to use, this is only for testing.
"""
if grpc_client is None:
grpc_client = AsyncGrpcClient()
writer = AsyncAppendableObjectWriter(
client=grpc_client,
bucket_name=bucket_name,
object_name=object_name,
)
# 1. Create an empty appendable object.
try:
# 1. Create an empty appendable object.
await writer.open()
print(f"Created empty appendable object: {object_name}")
# 2. Create the appender and tailer coroutines.
appender_task = asyncio.create_task(appender(writer, duration))
tailer_task = asyncio.create_task(
tailer(bucket_name, object_name, duration, grpc_client)
)
# 3. Execute the coroutines concurrently.
await asyncio.gather(appender_task, tailer_task)
finally:
if writer._is_stream_open:
await writer.close()
print("Writer closed.")