创建数据产品模块
您需要创建自定义 数据产品 模块,才能定义自己的业务逻辑和分析模型,以便对基础表执行计算并将其打包到可部署的数据集中。
前提条件
创建自定义数据产品模块时,我们强烈建议您使用专用的自定义命名空间对其进行打包。此外,请确保您计划使用的源表存在于数据基础数据集内。
创建数据产品模块
定义数据产品模块需要执行以下步骤:
- 通过使用条目扩展
data.modules.products列表,在config/config.yaml文件中注册数据产品模块:
[...]
data:
[...]
# Configuration for data foundation and product modules.
modules:
# List of foundation modules.
foundation:
[...]
# List of data product modules.
product:
[...]
- moduleId: product_module_id
type: custom_namespace.flight_usd
dependsOn:
sapModule: erp
sapModuleCustNS: foundation_module_id
dataTargetId: product_target
enabled: true
tableSettings: "table_settings.yaml"
# Optional, references file in `config/custom_namespace_path/data_product/product_module_id/`
# If omitted, defaults to src/data_modules/custom_namespace_path/data_product/table_settings.default.yaml.
[...]
- 创建
tableSettings文件(例如config/custom_namespace_path/data_product/product_module_id/table_settings.yaml)。此 YAML 控制表配置,例如具体化和 BigQuery 优化详细信息:
common:
custom_sales_summary:
materialization_type: "table"
tags: ["custom", "sales", "reporting"]
partition_details:
column: "created_date"
partition_type: "date"
time_grain: "day"
cluster_details:
columns:
- "customer_id"
- 创建注解文件
系统会为每个数据产品输出工件(表、视图)创建注解文件 <tablename>.yaml,并以 YAML 格式描述列和字段。在编译期间,构建器会自动搜索产品 annotations/ 文件夹(例如 annotations/custom_sales_summary.yaml)中的注释,并将这些字符串直接合并到输出 Dataform 架构定义中,以便将它们保留在 BigQuery 表元数据中。
注解 config/custom_namespace_path/data_product/product_module_id/annotations/'tablename'.yaml 文件的格式如下:
description: "Description of the table or view purpose"
fields:
- name: "customer_id" # column name
description: "Customer identifier" # column description
- name: "column2"
description: "Description of Column 2"
- name: "column3"
description: "Description of Column 3"
- 在数据产品文件夹
config/custom_namespace_path/data_product/product_module_id/中创建manifest.yaml文件,以维护类型、表和模块依赖项。清单文件遵循以下格式:
type: sales_performance
builder: sap_product # Automatically resolves to the global SapProductBuilder fallback
dependencies:
sapModule:
type: sap
supportedVersions:
- ecc
- s4
数据产品模块示例
对于航班示例,我们将创建 src/data_modules/custom_namespace_path/data_product/product_module_id/manifest.yaml,其内容如下
type: product_module_id
dependencies:
sapModule:
type: cortex.sap
supported_versions:
- ecc
- s4
tables:
common:
- tcurr
sapModuleCustNS:
type: custom_namespace .sap
supported_versions:
- ecc
- s4
tables:
common:
- sflight
builder: sap_product
- 在下一步中,扩展数据产品表的引用表设置文件。
在使用的示例中,创建 config/custom_namespace_path/data_product/product_module_id/table_settings.yaml,其内容如下:
ecc:
flights_usd:
materializationType: incremental
tags: [sap, dataproduct, masterdata]
s4:
flights_usd:
materializationType: incremental
tags: [sap, dataproduct, masterdata]
- 为数据产品表创建注释,以使用说明丰富存储架构。
在使用的示例中,创建文件 src/data_modules/custom_namespace_path/data_product/product_module_id/annotations/flights_usd.yaml,其内容如下:
description: "Flight scheduling and pricing information, including currency conversion to USD."
fields:
- name: "client_mandt"
description: "Client (Mandant), PK"
- name: "airline_code_carrid"
description: "Airline Carrier ID, PK"
- name: "flight_connection_number_connid"
description: "Flight Number, PK"
- name: "flight_date_fldate"
description: "Flight Date"
- name: "price_usd"
description: "Price in USD"
- name: "price"
description: "Price in local currency"
- name: "currency"
description: "Local currency"
- 数据产品的业务逻辑存储在
js或sqlx文件中。
在给定示例中,创建 src/data_modules/custom_namespace_path/data_product/product_module_id/definitions/flights_usd.js 文件,其内容如下:
// ___MODULE_CONTEXT___
// ___TABLE_CONFIG___
const moduleConfig = config.product[moduleContext.moduleId];
const sapModuleConfigDatasetId = moduleConfig.sources.sapModule.datasetId;
const sapModuleCustNSConfigDatasetId = moduleConfig.sources.sapModuleCustNS.datasetId;
const materializationType = tableConfig.materializationType || "incremental";
const incremental = require("includes/cortex/incremental.js");
const publish_config = require("includes/cortex/publish_config.js");
const publishConfig = publish_config.getPublishConfig(
materializationType,
tableConfig,
moduleConfig,
[
"client_mandt",
"airline_code_carrid",
"flight_connection_number_connid",
"flight_date_fldate"
]
);
publish("flight_usd", publishConfig).query(
(ctx) => `
WITH flight_base AS (
SELECT
mandt,
carrid,
connid,
fldate,
price,
currency,
-- Convert flight date string (YYYYMMDD) to an integer to calculate SAP's inverted date key
CAST(99999999 - CAST(fldate AS INT64) AS STRING) AS inverted_fldate
FROM ${ctx.ref(sapModuleCustNSConfigDatasetId, 'sflight')} AS flight
),
ranked_exchange_rates AS (
SELECT
f.mandt,
f.carrid,
f.connid,
f.fldate,
f.price,
f.currency,
t.ukurs,
-- Window function to grab the closest historical exchange rate
ROW_NUMBER() OVER (
PARTITION BY f.mandt, f.carrid, f.connid, f.fldate
ORDER BY t.gdatu ASC
) AS latest_rate_rank
FROM flight_base f
LEFT JOIN ${ctx.ref(sapModuleConfigDatasetId, 'tcurr')} AS t
ON f.mandt = t.mandt
AND t.kurst = 'M' -- 'M' is the standard SAP default for average exchange rates
AND t.fcurr = f.currency
AND t.tcurr = 'USD'
-- Chronological (rate_date <= flight_date) translates to (t.gdatu >= inverted_fldate)
AND t.gdatu >= f.inverted_fldate
)
SELECT
client_mandt,
airline_code_carrid,
flight_connection_number_connid,
flight_date_fldate,
price,
currency,
price_usd,
CURRENT_TIMESTAMP() AS bq_loaded_at
FROM (
SELECT
mandt AS client_mandt,
carrid AS airline_code_carrid,
connid AS flight_connection_number_connid,
PARSE_TIMESTAMP('%Y%m%d', fldate) AS flight_date_fldate,
price AS price,
currency AS currency,
-- Currency Conversion Logic
CASE
WHEN currency = 'USD' THEN price
WHEN ukurs IS NULL THEN NULL -- Handles cases where no exchange rate is found
-- If UKURS is negative, it's an indirect quotation (1 USD = X Local) -> Divide
WHEN ukurs < 0 THEN ROUND(price / ABS(ukurs), 2)
-- If UKURS is positive, it's a direct quotation (1 Local = X USD) -> Multiply
ELSE ROUND(price * ukurs, 2)
END AS price_usd
FROM ranked_exchange_rates
WHERE latest_rate_rank = 1
)
${incremental.getWhere(ctx, ["flight_date_fldate"])}
`
);
验证自定义命名空间扩展
如需验证是否已使用命名空间、数据基础或数据产品模块成功扩展 Google Cloud Cortex Framework,请按以下步骤操作:
如需部署数据产品模块,请运行
uv run targets build、deploy或build-and-deploy,如部署页面中所述。在 BigQuery 控制台中打开 Dataform 界面,然后前往代码库和工作区。
在 Dataform 界面中,确保控制台中未显示任何编译错误。
验证准备好的扩展是否已部署到路径
definitions/data_foundation/custom_namespace_path/和definitions/data_product/product_module_id/。按照说明执行 Dataform 流水线。
在 BigQuery 中验证产品数据集是否包含数据产品表,以及该表是否已填充数据。