使用 BigQuery 数据通过 Gemini 执行批量预测

使用 BigQuery 数据源作为输入,通过 Gemini 执行批量文本预测。

深入探索

如需查看包含此代码示例的详细文档,请参阅以下内容:

代码示例

Java

在尝试此示例之前,请按照《Vertex AI 快速入门:使用客户端库》中的 Java 设置说明执行操作。 如需了解详情,请参阅 Vertex AI Java API 参考文档

如需向 Vertex AI 进行身份验证,请设置应用默认凭证。 如需了解详情,请参阅为本地开发环境设置身份验证


import static com.google.genai.types.JobState.Known.JOB_STATE_CANCELLED;
import static com.google.genai.types.JobState.Known.JOB_STATE_FAILED;
import static com.google.genai.types.JobState.Known.JOB_STATE_PAUSED;
import static com.google.genai.types.JobState.Known.JOB_STATE_SUCCEEDED;

import com.google.genai.Client;
import com.google.genai.types.BatchJob;
import com.google.genai.types.BatchJobDestination;
import com.google.genai.types.BatchJobSource;
import com.google.genai.types.CreateBatchJobConfig;
import com.google.genai.types.GetBatchJobConfig;
import com.google.genai.types.HttpOptions;
import com.google.genai.types.JobState;
import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;

public class BatchPredictionWithBq {

  public static void main(String[] args) throws InterruptedException {
    // TODO(developer): Replace these variables before running the sample.
    // To use a tuned model, set the model param to your tuned model using the following format:
    // modelId = "projects/{PROJECT_ID}/locations/{LOCATION}/models/{MODEL_ID}
    String modelId = "gemini-2.5-flash";
    String outputUri = "bq://your-project.your_dataset.your_table";
    createBatchJob(modelId, outputUri);
  }

  // Creates a batch prediction job with Google BigQuery.
  public static JobState createBatchJob(String modelId, String outputUri)
      throws InterruptedException {
    // Client Initialization. Once created, it can be reused for multiple requests.
    try (Client client =
        Client.builder()
            .location("us-central1")
            .vertexAI(true)
            .httpOptions(HttpOptions.builder().apiVersion("v1").build())
            .build()) {

      // See the documentation:
      // https://googleapis.github.io/java-genai/javadoc/com/google/genai/Batches.html
      BatchJobSource batchJobSource =
          BatchJobSource.builder()
              .bigqueryUri("bq://storage-samples.generative_ai.batch_requests_for_multimodal_input")
              .format("bigquery")
              .build();

      CreateBatchJobConfig batchJobConfig =
          CreateBatchJobConfig.builder()
              .displayName("your-display-name")
              .dest(BatchJobDestination.builder().bigqueryUri(outputUri).format("bigquery").build())
              .build();

      BatchJob batchJob = client.batches.create(modelId, batchJobSource, batchJobConfig);

      String jobName =
          batchJob.name().orElseThrow(() -> new IllegalStateException("Missing job name"));
      JobState jobState =
          batchJob.state().orElseThrow(() -> new IllegalStateException("Missing job state"));
      System.out.println("Job name: " + jobName);
      System.out.println("Job state: " + jobState);
      // Job name:
      // projects/.../locations/.../batchPredictionJobs/3189981423167602688
      // Job state: JOB_STATE_PENDING

      // See the documentation:
      // https://googleapis.github.io/java-genai/javadoc/com/google/genai/types/BatchJob.html
      Set<JobState.Known> completedStates =
          EnumSet.of(JOB_STATE_SUCCEEDED, JOB_STATE_FAILED, JOB_STATE_CANCELLED, JOB_STATE_PAUSED);

      while (!completedStates.contains(jobState.knownEnum())) {
        TimeUnit.SECONDS.sleep(30);
        batchJob = client.batches.get(jobName, GetBatchJobConfig.builder().build());
        jobState =
            batchJob
                .state()
                .orElseThrow(() -> new IllegalStateException("Missing job state during polling"));
        System.out.println("Job state: " + jobState);
      }
      // Example response:
      // Job state: JOB_STATE_QUEUED
      // Job state: JOB_STATE_RUNNING
      // Job state: JOB_STATE_RUNNING
      // ...
      // Job state: JOB_STATE_SUCCEEDED
      return jobState;
    }
  }
}

Python

在尝试此示例之前,请按照《Vertex AI 快速入门:使用客户端库》中的 Python 设置说明执行操作。 如需了解详情,请参阅 Vertex AI Python API 参考文档

如需向 Vertex AI 进行身份验证,请设置应用默认凭证。 如需了解详情,请参阅为本地开发环境设置身份验证

import time

from google import genai
from google.genai.types import CreateBatchJobConfig, JobState, HttpOptions

client = genai.Client(http_options=HttpOptions(api_version="v1"))

# TODO(developer): Update and un-comment below line
# output_uri = f"bq://your-project.your_dataset.your_table"

job = client.batches.create(
    # To use a tuned model, set the model param to your tuned model using the following format:
    # model="projects/{PROJECT_ID}/locations/{LOCATION}/models/{MODEL_ID}
    model="gemini-2.5-flash",
    src="bq://storage-samples.generative_ai.batch_requests_for_multimodal_input",
    config=CreateBatchJobConfig(dest=output_uri),
)
print(f"Job name: {job.name}")
print(f"Job state: {job.state}")
# Example response:
# Job name: projects/.../locations/.../batchPredictionJobs/9876453210000000000
# Job state: JOB_STATE_PENDING

# See the documentation: https://googleapis.github.io/python-genai/genai.html#genai.types.BatchJob
completed_states = {
    JobState.JOB_STATE_SUCCEEDED,
    JobState.JOB_STATE_FAILED,
    JobState.JOB_STATE_CANCELLED,
    JobState.JOB_STATE_PAUSED,
}

while job.state not in completed_states:
    time.sleep(30)
    job = client.batches.get(name=job.name)
    print(f"Job state: {job.state}")
# Example response:
# Job state: JOB_STATE_PENDING
# Job state: JOB_STATE_RUNNING
# Job state: JOB_STATE_RUNNING
# ...
# Job state: JOB_STATE_SUCCEEDED

后续步骤

如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅Google Cloud 示例浏览器