创建数据产品模块
如需定义自己的业务逻辑和分析模型,请创建自定义 数据产品 模块。这样,您就可以对基础表或上游数据产品运行计算,并将结果打包到可部署的数据集中。
前提条件
我们建议在专用的 自定义命名空间 中创建自定义数据产品模块,以便更好地进行生命周期管理。此外,请确保您计划使用的源表存在于数据基础数据集内。
创建数据产品模块
定义数据产品模块需要执行以下步骤:
- 通过使用条目扩展
data.modules.products列表,在config/config.yaml文件中注册数据产品模块:
data:
# Configuration for data foundation and product modules.
modules:
# List of data product modules.
product:
# Recommended naming for product_module_id:
# custom_namespace_data_product_module_type
- moduleId: product_module_id
# Type of the data product (namespaced).
type: custom_namespace.data_product_module_type
# Map of module dependencies.
dependsOn:
sapModule: erp
sapModuleCustNS: foundation_module_id
# Reference to the target dataset ID.
dataTargetId: product_target
# Whether the module is enabled.
# enabled: true
# Whether the foundation is external (does not create target dataset).
# external: false
# Custom table settings file, relative to 'config/' file directory
# Recommended path: '{custom_namespace}/data_product/{data_product_module_type}/table_settings.yaml'
# If omitted, defaults to '../src/data_modules/{custom_namespace}/data_product/{data_product_module_type}/table_settings.default.yaml'
# tableSettings: "{custom_namespace}/data_product/{data_product_module_type}/table_settings.yaml"
- 创建默认
tableSettings文件(例如src/data_modules/custom_namespace/data_product/data_product_module_type/table_settings.default.yaml)。
此 YAML 控制 表配置,例如具体化和 BigQuery 优化详细信息:
common:
custom_sales_summary:
materialization_type: "table"
tags: ["custom", "sales", "reporting"]
partition_details:
column: "created_date"
partition_type: "date"
time_grain: "day"
cluster_details:
columns:
- "customer_id"
- 创建注解文件
系统会为每个数据产品输出工件(表、视图)创建注解文件 tablename.yaml,并以 YAML 格式描述列和字段。在编译期间,构建器会自动在产品的 annotations/ 文件夹(例如 src/data_modules/custom_namespace/data_product/data_product_module_type/annotations/custom_sales_summary.yaml)中搜索注释,直接将这些字符串合并到输出 Dataform 架构定义中,以便将它们保留在 BigQuery 表元数据中。
注解 src/data_modules/custom_namespace/data_product/data_product_module_type/annotations/tablename.yaml 文件的格式如下:
description: "Description of the table or view purpose"
fields:
- name: "customer_id" # column name
description: "Customer identifier" # column description
- name: "column2"
description: "Description of Column 2"
- name: "column3"
description: "Description of Column 3"
- 在数据产品文件夹
src/data_modules/custom_namespace/data_product/data_product_module_type/中创建一个manifest.yaml文件,以维护类型、表和模块依赖项。清单文件采用以下格式:
type: sales_performance
builder: sap_product # Automatically resolves to the global SapProductBuilder fallback
dependencies:
sapModule:
type: sap
supportedVersions:
- ecc
- s4
数据产品模块示例
在 航班示例 的命名空间 sap_bookingdatamodel 中实现 flights_usd 数据产品的步骤如下:
- 通过使用条目扩展
data.modules.products列表,在config/config.yaml文件中注册数据产品模块:
data:
modules:
product:
- moduleId: sap_bookingdatamodel_flights_usd
type: sap_bookingdatamodel.flights_usd
dependsOn:
sapModule: erp
sapModuleCustNS: sap_bookingdatamodel
dataTargetId: product_target
- 接下来,使用以下内容创建
src/data_modules/custom_namespace/data_product/data_product_module_type/manifest.yaml
type: flights_usd
dependencies:
sapModule:
type: cortex.sap
supported_versions:
- ecc
- s4
tables:
common:
- tcurr
sapModuleCustNS:
# Type of the dependent Module.
# use cortex.sap if you followed "Configure multiple instances of a data foundation module"
# https://docs.cloud.google.com/cortex/docs/deployment-configuration#multiple-data-foundation-instances
type: cortex.sap
# use sap_bookingdatamodel.sap if you are connecting to custom-data foundation module:
# https://docs.cloud.google.com/cortex/docs/extensibility-guide-data-foundation
#type: sap_bookingdatamodel.sap
supported_versions:
- ecc
- s4
tables:
common:
- sflight
builder: sap_product
- 在下一步中,创建引用的 表设置 文件,以配置 BigQuery 中输出表或视图的架构和元数据。
在使用的示例中,使用以下内容创建 src/data_modules/custom_namespace/data_product/data_product_module_type/table_settings.default.yaml:
ecc:
flights_usd:
materializationType: incremental
tags: [sap, dataproduct, masterdata]
s4:
flights_usd:
materializationType: incremental
tags: [sap, dataproduct, masterdata]
- 为数据产品表创建注释,以使用说明丰富存储架构。
在使用的示例中,使用以下内容创建文件 src/data_modules/custom_namespace/data_product/data_product_module_type/annotations/flights_usd.yaml:
description: "Flight scheduling and pricing information, including currency conversion to USD."
fields:
- name: "client_mandt"
description: "Client (Mandant), PK"
- name: "airline_code_carrid"
description: "Airline Carrier ID, PK"
- name: "flight_connection_number_connid"
description: "Flight Number, PK"
- name: "flight_date_fldate"
description: "Flight Date"
- name: "price_usd"
description: "Price in USD"
- name: "price"
description: "Price in local currency"
- name: "currency"
description: "Local currency"
- 数据产品的业务逻辑存储在
js或sqlx文件中。
在给定的示例中,使用以下内容创建 src/data_modules/custom_namespace/data_product/data_product_module_type/definitions/flights_usd.js 文件:
// ___MODULE_CONTEXT___
// ___TABLE_CONFIG___
const moduleConfig = config.product[moduleContext.moduleId];
const sapModuleConfigDatasetId = moduleConfig.sources.sapModule.datasetId;
const sapModuleCustNSConfigDatasetId = moduleConfig.sources.sapModuleCustNS.datasetId;
const materializationType = tableConfig.materializationType || "incremental";
const incremental = require("includes/cortex/incremental.js");
const publish_config = require("includes/cortex/publish_config.js");
const publishConfig = publish_config.getPublishConfig(
materializationType,
tableConfig,
moduleConfig,
[
"client_mandt",
"airline_code_carrid",
"flight_connection_number_connid",
"flight_date_fldate"
]
);
publish("flight_usd", publishConfig).query(
(ctx) => `
WITH flight_base AS (
SELECT
mandt,
carrid,
connid,
fldate,
price,
currency,
-- Convert flight date string (YYYYMMDD) to an integer to calculate SAP's inverted date key
CAST(99999999 - CAST(fldate AS INT64) AS STRING) AS inverted_fldate
FROM ${ctx.ref(sapModuleCustNSConfigDatasetId, 'sflight')} AS flight
),
ranked_exchange_rates AS (
SELECT
f.mandt,
f.carrid,
f.connid,
f.fldate,
f.price,
f.currency,
t.ukurs,
-- Window function to grab the closest historical exchange rate
ROW_NUMBER() OVER (
PARTITION BY f.mandt, f.carrid, f.connid, f.fldate
ORDER BY t.gdatu ASC
) AS latest_rate_rank
FROM flight_base f
LEFT JOIN ${ctx.ref(sapModuleConfigDatasetId, 'tcurr')} AS t
ON f.mandt = t.mandt
AND t.kurst = 'M' -- 'M' is the standard SAP default for average exchange rates
AND t.fcurr = f.currency
AND t.tcurr = 'USD'
-- Chronological (rate_date <= flight_date) translates to (t.gdatu >= inverted_fldate)
AND t.gdatu >= f.inverted_fldate
)
SELECT
client_mandt,
airline_code_carrid,
flight_connection_number_connid,
flight_date_fldate,
price,
currency,
price_usd,
CURRENT_TIMESTAMP() AS bq_loaded_at
FROM (
SELECT
mandt AS client_mandt,
carrid AS airline_code_carrid,
connid AS flight_connection_number_connid,
PARSE_TIMESTAMP('%Y%m%d', fldate) AS flight_date_fldate,
price AS price,
currency AS currency,
-- Currency Conversion Logic
CASE
WHEN currency = 'USD' THEN price
WHEN ukurs IS NULL THEN NULL -- Handles cases where no exchange rate is found
-- If UKURS is negative, it's an indirect quotation (1 USD = X Local) -> Divide
WHEN ukurs < 0 THEN ROUND(price / ABS(ukurs), 2)
-- If UKURS is positive, it's a direct quotation (1 Local = X USD) -> Multiply
ELSE ROUND(price * ukurs, 2)
END AS price_usd
FROM ranked_exchange_rates
WHERE latest_rate_rank = 1
)
${incremental.getWhere(ctx, ["flight_date_fldate"])}
`
);
验证自定义命名空间扩展
如需验证 Google Cloud Cortex Framework 数据产品模块是否已成功创建,请按以下步骤操作:
- 执行 Cortex Framework 部署,引用更新后的
config.yaml文件。 - 按照部署后步骤执行 Dataform 操作,并在 BigQuery 中验证结果