在 Python 中使用使用者定義函式

Python 使用者定義函式 (UDF) 可讓您在 Python 中實作純量函式,並在 SQL 查詢中使用。Python UDF 與 SQL 和 JavaScript UDF 類似,但具備額外功能。您可以使用 Python UDF 從 Python Package Index (PyPI) 安裝第三方程式庫,並透過 Cloud 資源連線存取外部服務。

Python UDF 會在 BigQuery 管理的資源上建構及執行。

限制

  • 目前僅支援 python-3.11 執行階段。
  • 您無法建立暫時性 Python UDF。
  • 您無法搭配具體化檢視表使用 Python UDF。
  • 呼叫 Python UDF 的查詢結果不會快取,因為系統一律會假設 Python UDF 的傳回值不具決定性。
  • 不支援虛擬私有雲網路
  • 不支援 Assured Workloads
  • 不支援的資料類型:JSONRANGEINTERVALGEOGRAPHY
  • 執行 Python UDF 的容器最多只能設定 4 個 vCPU 和 16 GiB
  • 不支援客戶自行管理的加密金鑰 (CMEK)

必要的角色

所需 IAM 角色取決於您是 Python UDF 擁有者還是使用者。

UDF 擁有者

Python UDF 擁有者通常會建立或更新 UDF。如果您建立的 Python UDF 參照 Cloud 資源連結,也需要其他角色。只有在 UDF 使用 WITH CONNECTION 子句存取外部服務時,才需要這個連線。

如要取得建立或更新 Python UDF 所需的權限,請要求系統管理員授予您下列 IAM 角色:

如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

這些預先定義的角色具備建立或更新 Python UDF 所需的權限。如要查看確切的必要權限,請展開「Required permissions」(必要權限) 部分:

所需權限

如要建立或更新 Python UDF,您必須具備下列權限:

  • 使用 CREATE FUNCTION 陳述式建立 Python UDF: bigquery.routines.create 在資料集上
  • 使用 CREATE FUNCTION 陳述式更新 Python UDF: bigquery.routines.update 在資料集上
  • 執行 CREATE FUNCTION 陳述式查詢作業: bigquery.jobs.create 在專案中
  • 建立新的 Cloud 資源連線 bigquery.connections.create 在專案中
  • CREATE FUNCTION 陳述式中使用連線: bigquery.connections.delegate on the connection

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

如要進一步瞭解 BigQuery 中的角色,請參閱「預先定義的 IAM 角色」。

UDF 使用者

Python UDF 使用者會叫用他人建立的 UDF。如果叫用參照 Cloud 資源連線的 Python UDF,也需要其他角色。

如要取得叫用他人建立的 Python UDF 所需的權限,請要求系統管理員授予您下列 IAM 角色:

如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

這些預先定義的角色具備叫用他人建立的 Python UDF 所需的權限。如要查看確切的必要權限,請展開「Required permissions」(必要權限) 部分:

所需權限

如要叫用他人建立的 Python UDF,您必須具備下列權限:

  • 如要執行參照 Python UDF 的查詢作業: bigquery.jobs.create 在專案中
  • 如要叫用其他人建立的 Python UDF: bigquery.routines.get 在資料集上
  • 如要執行參照 Cloud 資源連結的 Python UDF,請按照下列步驟操作: bigquery.connections.use on the connection

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

如要進一步瞭解 BigQuery 中的角色,請參閱「預先定義的 IAM 角色」。

呼叫 Python UDF

如果您有權叫用 Python UDF,就可以像呼叫任何其他函式一樣呼叫它。如要使用在其他專案中定義的函式,請使用該函式的完整名稱。舉例來說,如要呼叫定義為 bigquery-utils 社群 UDF 的 cw_xml_extract Python UDF,請按照下列步驟操作:

控制台

  1. 前往「BigQuery」頁面。

    前往「BigQuery」

  2. 在查詢編輯器中輸入下列範例:

    SELECT
      `bqutil`.`fn`.`cw_xml_extract`(xml, '//title/text()') AS `title`
    FROM UNNEST([
      STRUCT('''<book id="1">
        <title>The Great Gatsby</title>
        <author>F. Scott Fitzgerald</author>
      </book>''' AS xml),
      STRUCT('''<book id="2">
        <title>1984</title>
        <author>George Orwell</author>
      </book>''' AS xml),
      STRUCT('''<book id="3">
        <title>Brave New World</title>
        <author>Aldous Huxley</author>
      </book>''' AS xml)
    ])
    
  3. 按一下「Run」(執行)

    這個範例會產生下列輸出內容:

    +--------------------------+
    | title                    |
    +--------------------------+
    | The Great Gatsby         |
    | 1984                     |
    | Brave New World          |
    +--------------------------+
    

BigQuery DataFrames

下列範例使用 BigQuery DataFrames sql_scalarread_gbq_functionapply 方法呼叫 Python UDF:

import textwrap
from typing import Tuple

import bigframes.pandas as bpd
import pandas as pd
import pyarrow as pa


# Using partial ordering mode enables more efficient query optimizations.
bpd.options.bigquery.ordering_mode = "partial"


def call_python_udf(
    project_id: str, location: str,
) -> Tuple[pd.Series, bpd.Series]:
    # Set the billing project to use for queries. This step is optional, as the
    # project can be inferred from your environment in many cases.
    bpd.options.bigquery.project = project_id  # "your-project-id"

    # Since this example works with local data, set a processing location.
    bpd.options.bigquery.location = location  # "US"

    # Create a sample series.
    xml_series = pd.Series(
        [
            textwrap.dedent(
                """
                <book id="1">
                    <title>The Great Gatsby</title>
                    <author>F. Scott Fitzgerald</author>
                </book>
                """
            ),
            textwrap.dedent(
                """
                <book id="2">
                    <title>1984</title>
                    <author>George Orwell</author>
                </book>
                """
            ),
            textwrap.dedent(
                """
                <book id="3">
                    <title>Brave New World</title>
                    <author>Aldous Huxley</author>
                </book>
                """
            ),
        ],
        dtype=pd.ArrowDtype(pa.string()),
    )
    df = pd.DataFrame({"xml": xml_series})

    # Use the BigQuery Accessor, which is automatically registered on pandas
    # DataFrames when you import bigframes.  This example uses a function that
    # has been deployed to bigquery-utils for demonstration purposes. To use in
    # production, deploy the function at
    # https://github.com/GoogleCloudPlatform/bigquery-utils/blob/master/udfs/community/cw_xml_extract.sqlx
    # to your own project.
    titles_pandas = df.bigquery.sql_scalar(
        "`bqutil`.`fn`.cw_xml_extract({xml}, '//title/text()')",
    )

    # Alternatively, call read_gbq_function to get a pointer to the function
    # that can be applied on BigQuery DataFrames objects.
    cw_xml_extract = bpd.read_gbq_function("bqutil.fn.cw_xml_extract")
    xml_bigframes = bpd.read_pandas(xml_series)

    xpath_query = "//title/text()"
    titles_bigframes = xml_bigframes.apply(cw_xml_extract, args=(xpath_query,))
    return titles_pandas, titles_bigframes

建立永久 Python UDF

建立 Python UDF 時,請遵守下列規則:

  • Python UDF 的主體必須是代表 Python 程式碼的引號字串常值。如要進一步瞭解以引號括住的字串文字,請參閱以引號括住的字串文字格式

  • Python UDF 的主體必須包含 Python 函式,該函式會用於 Python UDF 選項清單中的 entry_point 引數。

  • 您需要在runtime_version選項中指定 Python 執行階段版本。目前僅支援 Python 執行階段版本 python-3.11。如需可用選項的完整清單,請參閱 CREATE FUNCTION 陳述式的函式選項清單

如要建立永久 Python UDF,請使用CREATE FUNCTION陳述式,但不要使用 TEMPTEMPORARY 關鍵字。如要刪除永久性 Python UDF,請使用 DROP FUNCTION 陳述式。

使用 CREATE FUNCTION 陳述式建立 Python UDF 時,BigQuery 會根據基礎映像檔建立或更新容器映像檔。容器會使用您的程式碼和任何指定的套件依附元件,以基本映像檔為基礎建構而成。建立容器是長時間執行的程序。執行 CREATE FUNCTION 陳述式後的第一個查詢可能會自動等待圖片完成。通常不需要任何外部依附元件,容器映像檔應可在不到一分鐘內建立完成。

範例

如要查看建立永久性 Python UDF 的範例,請選擇下列任一選項:

控制台

下列範例會建立名為 multiplyInputs 的永久 Python UDF,並從 SELECT 陳述式中呼叫該 UDF:

  1. 前往「BigQuery」頁面

    前往「BigQuery」

  2. 在查詢編輯器中,輸入下列 CREATE FUNCTION 陳述式:

    CREATE FUNCTION `PROJECT_ID.DATASET_ID`.multiplyInputs(x FLOAT64, y FLOAT64)
    RETURNS FLOAT64
    LANGUAGE python
    OPTIONS(runtime_version="python-3.11", entry_point="multiply")
    AS r'''
    
    def multiply(x, y):
        return x * y
    
    ''';
    
    -- Call the Python UDF.
    WITH numbers AS
        (SELECT 1 AS x, 5 as y
        UNION ALL
        SELECT 2 AS x, 10 as y
        UNION ALL
        SELECT 3 as x, 15 as y)
    SELECT x, y,
    `PROJECT_ID.DATASET_ID`.multiplyInputs(x, y) AS product
    FROM numbers;

    取代 PROJECT_ID。將 DATASET_ID 替換為專案 ID 和資料集 ID。

  3. 按一下「執行」

    這個範例會產生下列輸出內容:

    +-----+-----+--------------+
    | x   | y   | product      |
    +-----+-----+--------------+
    | 1   | 5   |  5.0         |
    | 2   | 10  | 20.0         |
    | 3   | 15  | 45.0         |
    +-----+-----+--------------+
    

BigQuery DataFrames

下列範例使用 BigQuery DataFrames,將自訂函式轉換為 Python UDF:

import bigframes.pandas as bpd

# Set BigQuery DataFrames options
bpd.options.bigquery.project = your_gcp_project_id
bpd.options.bigquery.location = "US"

# BigQuery DataFrames gives you the ability to turn your custom functions
# into a BigQuery Python UDF. One can find more details about the usage and
# the requirements via `help` command.
help(bpd.udf)

# Read a table and inspect the column of interest.
df = bpd.read_gbq("bigquery-public-data.ml_datasets.penguins")
df["body_mass_g"].peek(10)

# Define a custom function, and specify the intent to turn it into a
# BigQuery Python UDF. Let's try a `pandas`-like use case in which we want
# to apply a user defined function to every value in a `Series`, more
# specifically bucketize the `body_mass_g` value of the penguins, which is a
# real number, into a category, which is a string.
@bpd.udf(
    dataset=your_bq_dataset_id,
    name=your_bq_routine_id,
)
def get_bucket(num: float) -> str:
    if not num:
        return "NA"
    boundary = 4000
    return "at_or_above_4000" if num >= boundary else "below_4000"

# Then we can apply the udf on the `Series` of interest via
# `apply` API and store the result in a new column in the DataFrame.
df = df.assign(body_mass_bucket=df["body_mass_g"].apply(get_bucket))

# This will add a new column `body_mass_bucket` in the DataFrame. You can
# preview the original value and the bucketized value side by side.
df[["body_mass_g", "body_mass_bucket"]].peek(10)

# The above operation was possible by doing all the computation on the
# cloud through an underlying BigQuery Python UDF that was created to
# support the user's operations in the Python code.

# The BigQuery Python UDF created to support the BigQuery DataFrames
# udf can be located via a property `bigframes_bigquery_function`
# set in the udf object.
print(f"Created BQ Python UDF: {get_bucket.bigframes_bigquery_function}")

# If you have already defined a custom function in BigQuery, either via the
# BigQuery Google Cloud Console or with the `udf` decorator,
# or otherwise, you may use it with BigQuery DataFrames with the
# `read_gbq_function` method. More details are available via the `help`
# command.
help(bpd.read_gbq_function)

existing_get_bucket_bq_udf = get_bucket.bigframes_bigquery_function

# Here is an example of using `read_gbq_function` to load an existing
# BigQuery Python UDF.
df = bpd.read_gbq("bigquery-public-data.ml_datasets.penguins")
get_bucket_function = bpd.read_gbq_function(existing_get_bucket_bq_udf)

df = df.assign(body_mass_bucket=df["body_mass_g"].apply(get_bucket_function))
df.peek(10)

# Let's continue trying other potential use cases of udf. Let's say we
# consider the `species`, `island` and `sex` of the penguins sensitive
# information and want to redact that by replacing with their hash code
# instead. Let's define another scalar custom function and decorate it
# as a udf. The custom function in this example has external package
# dependency, which can be specified via `packages` parameter.
@bpd.udf(
    dataset=your_bq_dataset_id,
    name=your_bq_routine_id,
    packages=["cryptography"],
)
def get_hash(input: str) -> str:
    from cryptography.fernet import Fernet

    # handle missing value
    if input is None:
        input = ""

    key = Fernet.generate_key()
    f = Fernet(key)
    return f.encrypt(input.encode()).decode()

# We can use this udf in another `pandas`-like API `map` that
# can be applied on a DataFrame
df_redacted = df[["species", "island", "sex"]].map(get_hash)
df_redacted.peek(10)

# If the BigQuery routine is no longer needed, we can clean it up
# to free up any cloud quota
session = bpd.get_global_session()
session.bqclient.delete_routine(f"{your_bq_dataset_id}.{your_bq_routine_id}")

建立向量化 Python UDF

您可以實作 Python UDF,使用向量化處理一批資料列,而非單一資料列。向量化可提高查詢效能。您可以使用 Pandas 或 Apache Arrow 建立向量化 UDF。

如要控管批次處理行為,請在CREATE OR REPLACE FUNCTION選項清單中使用 max_batching_rows 選項,指定每個批次中的資料列數上限。如果您指定 max_batching_rows,BigQuery 會判斷批次中的資料列數量,最多為 max_batching_rows 限制。如未指定 max_batching_rows,系統會自動判斷要批次處理的資料列數量。

使用 Pandas

向量化 Python UDF 具有單一 pandas.DataFrame 引數,且必須加上註解。pandas.DataFrame 引數的欄數與 CREATE FUNCTION 陳述式中定義的 Python UDF 參數相同。pandas.DataFrame 引數中的資料欄名稱與 UDF 的參數名稱相同。

函式必須傳回 pandas.Series 或單欄 pandas.DataFrame,且列數與輸入內容相同。

下列範例會建立名為 multiplyInputs 的向量化 Python UDF,並使用兩個參數:xy

  1. 前往「BigQuery」頁面

    前往「BigQuery」

  2. 在查詢編輯器中,輸入下列 CREATE FUNCTION 陳述式:

    CREATE FUNCTION `PROJECT_ID.DATASET_ID`.multiplyVectorized(x FLOAT64, y FLOAT64)
    RETURNS FLOAT64
    LANGUAGE python
    OPTIONS(runtime_version="python-3.11", entry_point="vectorized_multiply")
    AS r'''
    import pandas as pd
    
    def vectorized_multiply(df: pd.DataFrame):
      return df['x'] * df['y']
    
    ''';

    取代 PROJECT_ID。將 DATASET_ID 替換為專案 ID 和資料集 ID。

    呼叫 UDF 的方式與上一個範例相同。

  3. 按一下「Run」(執行)

使用 Apache Arrow

以下範例使用 Apache Arrow RecordBatch介面。使用 RecordBatch 介面時,函式會將等長資料欄的資料列批次傳遞至進入點。下列範例使用 Apache Arrow 建立名為 multiplyVectorizedArrow 的向量化 Python UDF。

  1. 前往「BigQuery」頁面

    前往「BigQuery」

  2. 在查詢編輯器中,輸入下列 CREATE FUNCTION 陳述式:

    CREATE FUNCTION `PROJECT_ID.DATASET_ID`.multiplyVectorizedArrow(x FLOAT64, y FLOAT64)
    RETURNS FLOAT64
    LANGUAGE python
    OPTIONS(
      runtime_version="python-3.11",
      entry_point="vectorized_multiply_arrow"
    )
    AS r'''
    import pyarrow as pa
    import pyarrow.compute as pc
    
    def vectorized_multiply_arrow(batch: pa.RecordBatch):
        # Access columns directly from the Arrow RecordBatch
        x = batch.column('x')
        y = batch.column('y')
    
        # Use pyarrow.compute for vectorized operations
        return pc.multiply(x, y)
    ''';

    取代 PROJECT_ID。將 DATASET_ID 替換為專案 ID 和資料集 ID。

    呼叫 UDF 的方式與先前的範例相同。

  3. 按一下「Run」(執行)

支援的 Python UDF 資料類型

下表定義 BigQuery 資料類型、Python 資料類型和 Pandas 資料類型之間的對應關係:

BigQuery 資料類型 標準 UDF 使用的 Python 內建資料型別 向量化 UDF 使用的 Pandas 資料類型 用於向量化 UDF 中 ARRAY 和 STRUCT 的 PyArrow 資料類型
BOOL bool BooleanDtype DataType(bool)
INT64 int Int64Dtype DataType(int64)
FLOAT64 float FloatDtype DataType(double)
STRING str StringDtype DataType(string)
BYTES bytes binary[pyarrow] DataType(binary)
TIMESTAMP

函式參數:datetime.datetime (已設定世界標準時間時區)

函式傳回值:datetime.datetime (已設定任何時區)

函式參數:timestamp[us, tz=UTC][pyarrow]

函式傳回值:timestamp[us, tz=*][pyarrow]\(any timezone\)

TimestampType(timestamp[us]) (含時區)
DATE datetime.date date32[pyarrow] DataType(date32[day])
TIME datetime.time time64[pyarrow] Time64Type(time64[us])
DATETIME datetime.datetime (不含時區) timestamp[us][pyarrow] TimestampType(timestamp[us]),不含時區
ARRAY list list<...>[pyarrow],其中元素資料類型為 pandas.ArrowDtype ListType
STRUCT dict struct<...>[pyarrow],其中欄位資料類型為 pandas.ArrowDtype StructType

支援的執行階段版本

BigQuery Python UDF 支援 python-3.11 執行階段。這個 Python 版本包含一些額外預先安裝的套件。如果是系統程式庫,請檢查執行階段基本映像檔。

執行階段版本 Python 版本 收錄
python-3.11 Python 3.11 numpy 1.26.3
pyarrow 14.0.2
pandas 2.1.4
python-dateutil 2.8.2
absl-py 2.0.0
pytz 2023.3.post1
tzdata 2023.4
six 1.16.0
grpcio 1.76.0
grpcio-protobuf 6.33.5tools 1.76.0
typing-extensions 4.15.0

使用第三方套件

您可以使用CREATE FUNCTION 選項清單,使用 Python 標準程式庫和預先安裝的套件以外的模組。您可以從 Python Package Index (PyPI) 安裝套件,也可以從 Cloud Storage 匯入 Python 檔案。

從 Python Package Index 安裝套件

安裝套件時,您必須提供套件名稱,並可選擇使用 Python 套件版本指定符提供套件版本。

如果套件位於執行階段,系統會使用該套件,除非 CREATE FUNCTION 選項清單中指定了特定版本。如果未指定套件版本,且套件不在執行階段中,系統會使用最新版本。僅支援車輪二進位格式的套件。

以下範例說明如何建立 Python UDF,使用 CREATE OR REPLACE FUNCTION 選項清單安裝 scipy 套件:

  1. 前往「BigQuery」頁面

    前往「BigQuery」

  2. 在查詢編輯器中,輸入下列 CREATE FUNCTION 陳述式:

    CREATE FUNCTION `PROJECT_ID.DATASET_ID`.area(radius FLOAT64)
    RETURNS FLOAT64 LANGUAGE python
    OPTIONS (entry_point='area_handler', runtime_version='python-3.11', packages=['scipy==1.15.3'])
    AS r"""
    import scipy
    
    def area_handler(radius):
      return scipy.constants.pi*radius*radius
    """;
    
    SELECT `PROJECT_ID.DATASET_ID`.area(4.5);

    取代 PROJECT_ID。將 DATASET_ID 替換為專案 ID 和資料集 ID。

  3. 按一下「Run」(執行)

將其他 Python 檔案匯入為程式庫

您可以從 Cloud Storage 匯入 Python 檔案,使用「函式選項」清單擴充 Python UDF。

在 UDF 的 Python 程式碼中,您可以使用 import 陳述式,然後加上 Cloud Storage 物件的路徑,將 Cloud Storage 中的 Python 檔案匯入為模組。舉例來說,如果您要匯入 gs://BUCKET_NAME/path/to/lib1.py,匯入陳述式就會是 import path.to.lib1

Python 檔案名稱必須是 Python ID。物件名稱中的每個 folder 名稱 (/ 後方) 都應是有效的 Python 識別碼。在 ASCII 範圍 (U+0001..U+007F) 內,以下字元可用於 ID:

  • 大寫和大小寫字母 A 到 Z。
  • 底線。
  • 數字 0 到 9,但 ID 的第一個字元不得為數字。

以下範例說明如何建立 Python UDF,從名為 my_bucket 的 Cloud Storage bucket 匯入 lib1.py 用戶端程式庫套件:

  1. 前往「BigQuery」頁面

    前往「BigQuery」

  2. 在查詢編輯器中,輸入下列 CREATE FUNCTION 陳述式:

    CREATE FUNCTION `PROJECT_ID.DATASET_ID`.myFunc(a FLOAT64, b STRING)
    RETURNS STRING LANGUAGE python
    OPTIONS (
    entry_point='compute', runtime_version='python-3.11',
    library=['gs://my_bucket/path/to/lib1.py'])
    AS r"""
    import path.to.lib1 as lib1
    
    def compute(a, b):
      # doInterestingStuff is a function defined in
      # gs://my_bucket/path/to/lib1.py
      return lib1.doInterestingStuff(a, b);
    
    """;

    取代 PROJECT_ID。將 DATASET_ID 替換為專案 ID 和資料集 ID。

  3. 按一下「Run」(執行)

設定 Python UDF 的容器限制

您可以透過 CREATE FUNCTION 選項清單,為執行 Python UDF 的容器指定 CPU、記憶體和容器要求並行限制。

根據預設,容器會分配到下列資源:

  • 分配的記憶體為 512Mi
  • 分配的 CPU 為 1.0 個 vCPU。
  • 容器要求並行限制為 80

下列範例會使用 CREATE FUNCTION 選項清單建立 Python UDF,指定容器限制:

  1. 前往「BigQuery」頁面

    前往「BigQuery」

  2. 在查詢編輯器中,輸入下列 CREATE FUNCTION 陳述式:

    CREATE FUNCTION `PROJECT_ID.DATASET_ID`.resizeImage(image BYTES)
    RETURNS BYTES LANGUAGE python
    OPTIONS (entry_point='resize_image', runtime_version='python-3.11',
    packages=['Pillow==11.2.1'], container_memory='CONTAINER_MEMORY', container_cpu=CONTAINER_CPU,
    container_request_concurrency=CONTAINER_REQUEST_CONCURRENCY)
    AS r"""
    import io
    from PIL import Image
    
    def resize_image(image_bytes):
      img = Image.open(io.BytesIO(image_bytes))
    
      resized_img = img.resize((256, 256), Image.Resampling.LANCZOS)
      output_stream = io.BytesIO()
      resized_img.convert('RGB').save(output_stream, format='JPEG')
      return output_stream.getvalue()
    """;

    更改下列內容:

    • PROJECT_IDDATASET_ID: 您的專案 ID 和資料集 ID。
    • CONTAINER_MEMORY:記憶體值,格式如下:<integer_number><unit>。單位必須是下列其中一個值:Mi (MiB)、M (MB)、Gi (GiB) 或 G (GB)。例如:2Gi
    • CONTAINER_CPU:CPU 值。Python UDF 支援介於 0.331.0 之間的分數 CPU 值,以及 124 的非分數 CPU 值。
    • CONTAINER_REQUEST_CONCURRENCY:每個 Python UDF 容器執行個體的並行要求數量上限。值必須是介於 11000 之間的整數。
  3. 按一下「Run」(執行)

支援的 CPU 值

Python UDF 支援介於 0.331.0 之間的分數 CPU 值,以及 124 的非分數 CPU 值。執行 Python UDF 的容器最多可設定 4 個 vCPU。預設值為 1.0。系統會先將輸入值的小數部分四捨五入至小數點後兩位,再套用至容器。

支援的記憶體值

Python UDF 容器支援下列格式的記憶體值: <integer_number><unit>。單位必須是下列其中一個值:MiMGiG。可設定的記憶體容量下限為 256Mi。可設定的記憶體容量上限為 16Gi

根據您選擇的記憶體值,您也必須指定適當的 CPU 量。下表顯示每個記憶體值的 CPU 最小值和最大值:

記憶體 最低 CPU 最高 CPU
256Mi512Mi 0.33 2
大於 512Mi 且小於或等於 1Gi 0.5 2
大於 1Gi 且小於 2Gi 1 2
2Gi4Gi 1 4
大於 4Gi 且小於或等於 8Gi 2 4
大於 8Gi 且小於或等於 16Gi 4 4

或者,如果您已決定要分配的 CPU 數量,可以使用下表判斷適當的記憶體範圍:

CPU 最低記憶體 記憶體上限
小於 0.5 256Mi 512Mi
0.5 至低於 1 256Mi 1Gi
1 256Mi 4Gi
2 256Mi 8Gi
4 2Gi 16Gi

在 Python 程式碼中呼叫 Google Cloud 或線上服務

Python UDF 會使用雲端資源連結服務帳戶,存取 Google Cloud 服務或外部服務。必須將存取服務的權限授予連線的服務帳戶。所需權限會因存取的服務和從 Python 程式碼呼叫的 API 而異。

如果您建立 Python UDF 時未使用 Cloud 資源連結,系統會在封鎖網路存取的環境中執行函式。如果 UDF 會存取線上服務,您必須使用 Cloud 資源連線建立 UDF。否則,UDF 會遭到封鎖,無法存取網路,直到達到內部連線逾時為止。

以下範例說明如何從 Python UDF 存取 Cloud Translation 服務。這個範例有兩個專案:一個是名為 my_query_project 的專案,您可以在其中建立 UDF 和 Cloud 資源連結;另一個是名為 my_translate_project 的專案,您可以在其中執行 Cloud Translation。

建立 Cloud 資源連結

首先,您要在 my_query_project 中建立 Cloud 資源連結。如要建立 Cloud 資源連結,請按照下列步驟操作。

選取下列選項之一:

控制台

  1. 前往「BigQuery」頁面。

    前往「BigQuery」

  2. 點選左側窗格中的 「Explorer」

    醒目顯示的「Explorer」窗格按鈕。

    如果沒有看到左側窗格,請按一下「展開左側窗格」圖示 開啟窗格。

  3. 在「Explorer」窗格中展開專案名稱,然後按一下「Connections」

  4. 在「Connections」(連線) 頁面中,按一下「Create connection」(建立連線)

  5. 在「連線類型」中,選擇「Vertex AI 遠端模型、遠端函式、BigLake 和 Spanner (Cloud 資源)」

  6. 在「連線 ID」欄位中,輸入連線名稱。

  7. 在「位置類型」部分,選取連線位置。連線應與資料集等其他資源位於同一位置。

  8. 點選「建立連線」

  9. 點選「前往連線」

  10. 在「連線資訊」窗格中,複製服務帳戶 ID,以便在後續步驟中使用。

SQL

使用 CREATE CONNECTION 陳述式

  1. 前往 Google Cloud 控制台的「BigQuery」頁面。

    前往「BigQuery」

  2. 在查詢編輯器中輸入下列陳述式:

    CREATE CONNECTION [IF NOT EXISTS] `CONNECTION_NAME`
    OPTIONS (
      connection_type = "CLOUD_RESOURCE",
      friendly_name = "FRIENDLY_NAME",
      description = "DESCRIPTION"
      );

    請替換下列項目:

    • CONNECTION_NAME:連線名稱,格式為 PROJECT_ID.LOCATION.CONNECTION_IDLOCATION.CONNECTION_IDCONNECTION_ID。如果省略專案或位置,系統會從執行陳述式的專案和位置推斷。
    • FRIENDLY_NAME (選用):連線的描述性名稱。
    • DESCRIPTION (選用):連線說明。

  3. 按一下「執行」

如要進一步瞭解如何執行查詢,請參閱「執行互動式查詢」。

bq

  1. 在指令列環境中建立連線:

    bq mk --connection --location=REGION --project_id=PROJECT_ID \
        --connection_type=CLOUD_RESOURCE CONNECTION_ID

    --project_id 參數會覆寫預設專案。

    更改下列內容:

    • REGION:您的連線區域
    • PROJECT_ID:您的 Google Cloud 專案 ID
    • CONNECTION_ID:連線的 ID

    建立連線資源時,BigQuery 會建立專屬的系統服務帳戶,並將其與連線建立關聯。

    疑難排解:如果收到下列連線錯誤訊息,請更新 Google Cloud SDK

    Flags parsing error: flag --connection_type=CLOUD_RESOURCE: value should be one of...
    
  2. 擷取並複製服務帳戶 ID,以供後續步驟使用:

    bq show --connection PROJECT_ID.REGION.CONNECTION_ID

    輸出結果會與下列內容相似:

    name                          properties
    1234.REGION.CONNECTION_ID     {"serviceAccountId": "connection-1234-9u56h9@gcp-sa-bigquery-condel.iam.gserviceaccount.com"}
    

Python

在試用這個範例之前,請先按照「使用用戶端程式庫的 BigQuery 快速入門導覽課程」中的 Python 設定說明操作。詳情請參閱 BigQuery Python API 參考說明文件

如要向 BigQuery 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證作業」。

import google.api_core.exceptions
from google.cloud import bigquery_connection_v1

client = bigquery_connection_v1.ConnectionServiceClient()


def create_connection(
    project_id: str,
    location: str,
    connection_id: str,
):
    """Creates a BigQuery connection to a Cloud Resource.

    Cloud Resource connection creates a service account which can then be
    granted access to other Google Cloud resources for federated queries.

    Args:
        project_id: The Google Cloud project ID.
        location: The location of the connection (for example, "us-central1").
        connection_id: The ID of the connection to create.
    """

    parent = client.common_location_path(project_id, location)

    connection = bigquery_connection_v1.Connection(
        friendly_name="Example Connection",
        description="A sample connection for a Cloud Resource.",
        cloud_resource=bigquery_connection_v1.CloudResourceProperties(),
    )

    try:
        created_connection = client.create_connection(
            parent=parent, connection_id=connection_id, connection=connection
        )
        print(f"Successfully created connection: {created_connection.name}")
        print(f"Friendly name: {created_connection.friendly_name}")
        print(
            f"Service Account: {created_connection.cloud_resource.service_account_id}"
        )

    except google.api_core.exceptions.AlreadyExists:
        print(f"Connection with ID '{connection_id}' already exists.")
        print("Please use a different connection ID.")
    except Exception as e:
        print(f"An unexpected error occurred while creating the connection: {e}")

Node.js

在試用這個範例之前,請先按照「使用用戶端程式庫的 BigQuery 快速入門導覽課程」中的 Node.js 設定說明操作。詳情請參閱 BigQuery Node.js API 參考說明文件

如要向 BigQuery 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證作業」。

const {ConnectionServiceClient} =
  require('@google-cloud/bigquery-connection').v1;
const {status} = require('@grpc/grpc-js');

const client = new ConnectionServiceClient();

/**
 * Creates a new BigQuery connection to a Cloud Resource.
 *
 * A Cloud Resource connection creates a service account that can be granted access
 * to other Google Cloud resources.
 *
 * @param {string} projectId The Google Cloud project ID. for example, 'example-project-id'
 * @param {string} location The location of the project to create the connection in. for example, 'us-central1'
 * @param {string} connectionId The ID of the connection to create. for example, 'example-connection-id'
 */
async function createConnection(projectId, location, connectionId) {
  const parent = client.locationPath(projectId, location);

  const connection = {
    friendlyName: 'Example Connection',
    description: 'A sample connection for a Cloud Resource',
    // The service account for this cloudResource will be created by the API.
    // Its ID will be available in the response.
    cloudResource: {},
  };

  const request = {
    parent,
    connectionId,
    connection,
  };

  try {
    const [response] = await client.createConnection(request);

    console.log(`Successfully created connection: ${response.name}`);
    console.log(`Friendly name: ${response.friendlyName}`);

    console.log(`Service Account: ${response.cloudResource.serviceAccountId}`);
  } catch (err) {
    if (err.code === status.ALREADY_EXISTS) {
      console.log(`Connection '${connectionId}' already exists.`);
    } else {
      console.error(`Error creating connection: ${err.message}`);
    }
  }
}

Terraform

請使用 google_bigquery_connection 資源。

如要向 BigQuery 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證作業」。

下列範例會在 US 區域中建立名為 my_cloud_resource_connection 的 Cloud 資源連結:


# This queries the provider for project information.
data "google_project" "default" {}

# This creates a cloud resource connection in the US region named my_cloud_resource_connection.
# Note: The cloud resource nested object has only one output field - serviceAccountId.
resource "google_bigquery_connection" "default" {
  connection_id = "my_cloud_resource_connection"
  project       = data.google_project.default.project_id
  location      = "US"
  cloud_resource {}
}

如要在 Google Cloud 專案中套用 Terraform 設定,請完成下列各節的步驟。

準備 Cloud Shell

  1. 啟動 Cloud Shell
  2. 設定要套用 Terraform 設定的預設 Google Cloud 專案。

    每項專案只需要執行一次這個指令,且可以在任何目錄中執行。

    export GOOGLE_CLOUD_PROJECT=PROJECT_ID

    如果您在 Terraform 設定檔中設定明確值,環境變數就會遭到覆寫。

準備目錄

每個 Terraform 設定檔都必須有自己的目錄 (也稱為根模組)。

  1. Cloud Shell 中建立目錄,並在該目錄中建立新檔案。檔案名稱的副檔名必須是 .tf,例如 main.tf。在本教學課程中,這個檔案稱為 main.tf
    mkdir DIRECTORY && cd DIRECTORY && touch main.tf
  2. 如果您正在學習教學課程,可以複製每個章節或步驟中的程式碼範例。

    將程式碼範例複製到新建立的 main.tf

    視需要從 GitHub 複製程式碼。如果 Terraform 代码片段是端對端解決方案的一部分,建議您使用這個方法。

  3. 查看並修改範例參數,套用至您的環境。
  4. 儲存變更。
  5. 初始化 Terraform。每個目錄只需執行一次這項操作。
    terraform init

    如要使用最新版 Google 供應商,請加入 -upgrade 選項:

    terraform init -upgrade

套用變更

  1. 檢查設定,確認 Terraform 即將建立或更新的資源符合您的預期:
    terraform plan

    視需要修正設定。

  2. 執行下列指令,並在提示中輸入 yes,套用 Terraform 設定:
    terraform apply

    等待 Terraform 顯示「Apply complete!」訊息。

  3. 開啟 Google Cloud 專案即可查看結果。在 Google Cloud 控制台中,前往 UI 中的資源,確認 Terraform 已建立或更新這些資源。

將存取權授予連線的服務帳戶

設定連線的權限時,您需要先前複製的服務帳戶 ID。建立連線資源時,BigQuery 會建立專屬的系統服務帳戶,並將其與連線建立關聯。

如要授予 Cloud 資源連結服務帳戶專案存取權,請在 my_query_project 中將服務使用情形個人使用者角色 (roles/serviceusage.serviceUsageConsumer) 授予服務帳戶,並在 my_translate_project 中將 Cloud Translation API 使用者角色 (roles/cloudtranslate.user) 授予服務帳戶。

  1. 前往身分與存取權管理頁面。

    前往「IAM」(身分與存取權管理) 頁面

  2. 確認已選取 my_query_project

  3. 按一下 「授予存取權」

  4. 在「新增主體」欄位,輸入先前複製的 Cloud 資源連線服務帳戶 ID。

  5. 在「Select a role」(請選擇角色) 欄位中,依序選取「Service usage」(服務用量) 和「Service usage consumer」(服務用量消費者)

  6. 按一下 [儲存]

  7. 在專案選取器中,選擇 my_translate_project

  8. 前往身分與存取權管理頁面。

    前往「IAM」(身分與存取權管理) 頁面

  9. 按一下 「授予存取權」

  10. 在「新增主體」欄位,輸入先前複製的 Cloud 資源連線服務帳戶 ID。

  11. 在「Select a role」(選取角色) 欄位中,依序選擇「Cloud translation」和「Cloud Translation API user」(Cloud Translation API 使用者)

  12. 按一下 [儲存]

建立呼叫 Cloud Translation 服務的 Python UDF

my_query_project 中,使用 Cloud 資源連結建立 Python UDF,呼叫 Cloud Translation 服務。

  1. 前往 Google Cloud 控制台的「BigQuery」頁面。

    前往「BigQuery」

  2. 在查詢編輯器中輸入下列 CREATE FUNCTION 陳述式:

    CREATE FUNCTION `PROJECT_ID.DATASET_ID`.translate_to_es(x STRING)
    RETURNS STRING LANGUAGE python
    WITH CONNECTION `PROJECT_ID.REGION.CONNECTION_ID`
    OPTIONS (entry_point='do_translate', runtime_version='python-3.11', packages=['google-cloud-translate>=3.11', 'google-api-core'])
    AS r"""
    
    from google.api_core.retry import Retry
    from google.cloud import translate
    
    project = "my_translate_project"
    translate_client = translate.TranslationServiceClient()
    
    def do_translate(x : str) -> str:
    
        response = translate_client.translate_text(
            request={
                "parent": f"projects/{project}/locations/us-central1",
                "contents": [x],
                "target_language_code": "es",
                "mime_type": "text/plain",
            },
            retry=Retry(),
        )
        return response.translations[0].translated_text
    
    """;
    
    -- Call the UDF.
    WITH text_table AS
      (SELECT "Hello" AS text
      UNION ALL
      SELECT "Good morning" AS text
      UNION ALL
      SELECT "Goodbye" AS text)
    SELECT text,
    `PROJECT_ID.DATASET_ID`.translate_to_es(text) AS translated_text
    FROM text_table;

    更改下列內容:

    • PROJECT_ID.DATASET_ID: 專案 ID 和資料集 ID
    • REGION.CONNECTION_ID:連線的區域和連線 ID
  3. 按一下「Run」(執行)

    輸出內容應如下所示:

    +--------------------------+-------------------------------+
    | text                     | translated_text               |
    +--------------------------+-------------------------------+
    | Hello                    | Hola                          |
    | Good morning             | Buen dia                      |
    | Goodbye                  | Adios                         |
    +--------------------------+-------------------------------+
    

最佳做法

建立 Python UDF 時,請遵循下列最佳做法:

  • 最佳化查詢邏輯以進行批次處理。複雜的查詢結構可能會停用批次處理。這會強制執行緩慢的逐列處理作業,大幅增加大型資料集的延遲時間。
  • 避免在條件運算式中使用 UDF。
  • 請避免使用 UDF 直接嵌入 STRUCT 欄位。
  • 在投影中隔離 UDF。如要確保批次處理,請使用一般資料表運算式 (CTE) 或子查詢,在 SELECT 陳述式中執行 UDF。然後在另一個步驟中,對該結果執行篩選或聯結。
  • 最佳化資料酬載。個別資料列的大小可能會影響批次處理功能的效率。
  • 盡量縮小資料列大小。盡可能縮小每個資料列,以便在單一批次中處理最多資料列。
  • 有效率地設定容器限制。擴充性取決於 CPU、記憶體和要求並行。
  • 使用疊代式調整時,請從預設值開始。如果效能不佳,請分析監控指標,找出特定瓶頸。
  • 調度資源。如果監控指標顯示使用率偏高,請增加分配的 CPU 和記憶體。
  • 管理外部依附元件和可靠性。與外部服務互動的 UDF 需要連線和適當權限。
  • 實作 API 逾時。當 Python UDF 存取網際網路時,請為 API 呼叫設定逾時,避免發生非預期行為。例如從 Cloud Storage 值區讀取資料。

查看 Python UDF 指標

Python UDF 會將指標匯出至 Cloud Monitoring。這些指標可協助您監控 UDF 運作狀態和資源消耗量的各個層面,深入瞭解 UDF 執行個體的效能和行為。

受控資源類型

Python UDF 的指標會回報至下列 Cloud Monitoring 資源類型:

  • 「Type」(類型)bigquery.googleapis.com/ManagedRoutineInvocation
  • 顯示名稱:BigQuery 代管處理常式叫用
  • 標籤
    • resource_container:執行查詢作業的專案 ID。
    • location:查詢作業的執行位置。
    • query_job_id:叫用 Python UDF 的查詢作業 ID。
    • routine_project_id:儲存所叫用常式的專案 ID。
    • routine_dataset_id:儲存所叫用常式的資料集 ID。
    • routine_id:叫用常式的 ID。

指標

下列指標適用於 bigquery.googleapis.com/ManagedRoutineInvocation 資源類型:

指標 說明 單位 值類型
bigquery.googleapis.com/managed_routine/python/cpu_utilizations 叫用 Python UDF 時,這項指標會顯示查詢作業中所有 Python UDF 執行個體的 CPU 使用率分布情形。 102.% DISTRIBUTION
bigquery.googleapis.com/managed_routine/python/memory_utilizations 叫用 Python UDF 時,這項指標會顯示查詢工作所有 Python UDF 執行個體的記憶體使用率分布情形。 102.% DISTRIBUTION
bigquery.googleapis.com/managed_routine/python/max_request_concurrencies 這項指標會顯示每個 Python UDF 執行個體處理的並行要求數量上限分布情形。 數量 DISTRIBUTION

查看指標

如要查看 Python UDF 的指標,請選擇下列章節中的其中一個選項。

工作詳細資料

如要查看特定查詢工作的 Python UDF 指標,請按照下列步驟操作:

  1. 前往「BigQuery」頁面

    前往「BigQuery」

  2. 按一下「工作記錄」

  3. 在「Job ID」(工作 ID) 欄中,按一下查詢工作 ID。

  4. 在「查詢工作詳細資料」頁面中,按一下「Cloud Monitoring 資訊主頁」。 這個連結會顯示經過篩選的資訊主頁,只顯示該工作的 Python UDF 指標。

Metrics Explorer

如要在 Metrics Explorer 中查看 Python UDF 指標,請按照下列步驟操作:

  1. 前往 Cloud Monitoring 的「Metrics Explorer」頁面。

    前往「Metrics Explorer」頁面

  2. 按一下「選取指標」,然後在「篩選器」欄位中輸入 BigQuery Managed Routine Invocationbigquery.googleapis.com/ManagedRoutineInvocation

  3. 選擇「Bigquery Managed Routine」>「Managed_routine」

  4. 點選任一可用指標,例如:

    • 執行個體 CPU 使用率
    • 執行個體記憶體使用率
    • 並行要求數量上限
  5. 按一下「套用」

    根據預設,指標會顯示在圖表中。

  6. 您可以使用監控資源類型中定義的標籤,篩選指標並加以分組。如要篩選指標,請按照下列步驟操作:

    1. 在「Filter」(篩選器) 欄位中,選擇資源類型,例如 query_job_idroutine_id

    2. 在「Value」(值) 欄位中,輸入工作 ID 或常式 ID,或從清單中選擇一個。

Cloud Monitoring 資訊主頁

如要使用監控資訊主頁查看 Python UDF 指標,請按照下列步驟操作:

  1. 前往 Cloud Monitoring 的「Dashboards」(資訊主頁) 頁面。

    前往資訊主頁

  2. 按一下「BigQuery Managed Routine Query Monitoring」(BigQuery 代管常式查詢監控) 資訊主頁。

    這個資訊主頁會概略顯示 UDF 的主要指標。

  3. 如要篩選這個資訊主頁,請按照下列步驟操作:

    1. 按一下「篩選器」圖示

    2. 在「依資源篩選」清單中,選擇專案 ID、位置、例行工作 ID 或工作 ID 等選項。

支援的地區

所有 BigQuery 多區域和區域位置都支援 Python UDF。

定價

Python UDF 免費提供,不另外收費。

啟用計費功能後,會發生下列情況:

  • 系統會使用 BigQuery 服務 SKU,收取 Python UDF 費用。
  • 系統會根據叫用 Python UDF 時消耗的運算和記憶體量,按比例收取費用。
  • Python UDF 客戶也須支付建構或重建 UDF 容器映像檔的費用。這項費用與使用客戶程式碼和依附元件建構映像檔所用的資源成正比。
  • 如果 Python UDF 導致外部或網際網路網路輸出,您也會看到 Cloud Networking 的進階級網際網路輸出費用。

配額

請參閱使用者定義函式配額和限制