创建数据产品模块

如需定义自己的业务逻辑和分析模型,请创建自定义 数据产品 模块。这样,您就可以对基础表或上游数据产品运行计算,并将结果打包到可部署的数据集中。

前提条件

我们建议在专用的 自定义命名空间 中创建自定义数据产品模块,以便更好地进行生命周期管理。此外,请确保您计划使用的源表存在于数据基础数据集内。

创建数据产品模块

定义数据产品模块需要执行以下步骤:

  • 通过使用条目扩展 data.modules.products 列表,在 config/config.yaml 文件中注册数据产品模块:
data:
  # Configuration for data foundation and product modules.
  modules:
    # List of data product modules.
    product:
        # Recommended naming for product_module_id:
        # custom_namespace_data_product_module_type
      - moduleId:  product_module_id
        # Type of the data product (namespaced).
        type:  custom_namespace.data_product_module_type
        # Map of module dependencies.
        dependsOn:
          sapModule: erp
          sapModuleCustNS:  foundation_module_id
        # Reference to the target dataset ID.
        dataTargetId: product_target
        # Whether the module is enabled.
        # enabled: true
        # Whether the foundation is external (does not create target dataset).
        # external: false
        # Custom table settings file, relative to 'config/' file directory
        # Recommended path: '{custom_namespace}/data_product/{data_product_module_type}/table_settings.yaml'
        # If omitted, defaults to '../src/data_modules/{custom_namespace}/data_product/{data_product_module_type}/table_settings.default.yaml'
        # tableSettings: "{custom_namespace}/data_product/{data_product_module_type}/table_settings.yaml"
        
  • 创建默认 tableSettings 文件(例如 src/data_modules/custom_namespace/data_product/data_product_module_type/table_settings.default.yaml)。

此 YAML 控制 表配置,例如具体化和 BigQuery 优化详细信息:

common:
  custom_sales_summary:
    materialization_type: "table"
    tags: ["custom", "sales", "reporting"]
    partition_details:
      column: "created_date"
      partition_type: "date"
      time_grain: "day"
    cluster_details:
      columns:
        - "customer_id"
  • 创建注解文件

系统会为每个数据产品输出工件(表、视图)创建注解文件 tablename.yaml,并以 YAML 格式描述列和字段。在编译期间,构建器会自动在产品的 annotations/ 文件夹(例如 src/data_modules/custom_namespace/data_product/data_product_module_type/annotations/custom_sales_summary.yaml)中搜索注释,直接将这些字符串合并到输出 Dataform 架构定义中,以便将它们保留在 BigQuery 表元数据中。

注解 src/data_modules/custom_namespace/data_product/data_product_module_type/annotations/tablename.yaml 文件的格式如下:

description: "Description of the table or view purpose"
fields:
  - name: "customer_id"                     # column name
    description: "Customer identifier"      # column description
  - name: "column2"
    description: "Description of Column 2"
  - name: "column3"
    description: "Description of Column 3"
  • 在数据产品文件夹 src/data_modules/custom_namespace/data_product/data_product_module_type/ 中创建一个 manifest.yaml 文件,以维护类型、表和模块依赖项。清单文件采用以下格式:

type: sales_performance
builder: sap_product     # Automatically resolves to the global SapProductBuilder fallback
dependencies:
  sapModule:
    type: sap
    supportedVersions:
      - ecc
      - s4

数据产品模块示例

航班示例 的命名空间 sap_bookingdatamodel 中实现 flights_usd 数据产品的步骤如下:

  • 通过使用条目扩展 data.modules.products 列表,在 config/config.yaml 文件中注册数据产品模块:
data:
  modules:
    product:
      - moduleId: sap_bookingdatamodel_flights_usd
        type: sap_bookingdatamodel.flights_usd
        dependsOn:
          sapModule: erp
          sapModuleCustNS: sap_bookingdatamodel
        dataTargetId: product_target
  • 接下来,使用以下内容创建 src/data_modules/custom_namespace/data_product/data_product_module_type/manifest.yaml
type: flights_usd
dependencies:
  sapModule:
    type: cortex.sap
    supported_versions:
      - ecc
      - s4
    tables:
      common:
        - tcurr
  sapModuleCustNS:
    # Type of the dependent Module.
    # use cortex.sap if you followed "Configure multiple instances of a data foundation module"
    # https://docs.cloud.google.com/cortex/docs/deployment-configuration#multiple-data-foundation-instances
    type: cortex.sap
    # use sap_bookingdatamodel.sap if you are connecting to custom-data foundation module:
    # https://docs.cloud.google.com/cortex/docs/extensibility-guide-data-foundation
    #type: sap_bookingdatamodel.sap
    supported_versions:
      - ecc
      - s4
    tables:
      common:
        - sflight
builder: sap_product
  • 在下一步中,创建引用的 表设置 文件,以配置 BigQuery 中输出表或视图的架构和元数据。

在使用的示例中,使用以下内容创建 src/data_modules/custom_namespace/data_product/data_product_module_type/table_settings.default.yaml

ecc:
  flights_usd:
    materializationType: incremental
    tags: [sap, dataproduct, masterdata]
s4:
  flights_usd:
    materializationType: incremental
    tags: [sap, dataproduct, masterdata]

  • 为数据产品表创建注释,以使用说明丰富存储架构。

在使用的示例中,使用以下内容创建文件 src/data_modules/custom_namespace/data_product/data_product_module_type/annotations/flights_usd.yaml

description: "Flight scheduling and pricing information, including currency conversion to USD."
fields:
  - name: "client_mandt"
    description: "Client (Mandant), PK"
  - name: "airline_code_carrid"
    description: "Airline Carrier ID, PK"
  - name: "flight_connection_number_connid"
    description: "Flight Number, PK"
  - name: "flight_date_fldate"
    description: "Flight Date"
  - name: "price_usd"
    description: "Price in USD"
  - name: "price"
    description: "Price in local currency"
  - name: "currency"
    description: "Local currency"
  • 数据产品的业务逻辑存储在 jssqlx 文件中。

在给定的示例中,使用以下内容创建 src/data_modules/custom_namespace/data_product/data_product_module_type/definitions/flights_usd.js 文件:

// ___MODULE_CONTEXT___
// ___TABLE_CONFIG___

const moduleConfig = config.product[moduleContext.moduleId];
const sapModuleConfigDatasetId = moduleConfig.sources.sapModule.datasetId;
const sapModuleCustNSConfigDatasetId = moduleConfig.sources.sapModuleCustNS.datasetId;

const materializationType = tableConfig.materializationType || "incremental";

const incremental = require("includes/cortex/incremental.js");
const publish_config = require("includes/cortex/publish_config.js");

const publishConfig = publish_config.getPublishConfig(
   materializationType,
   tableConfig,
   moduleConfig,
   [
       "client_mandt",
       "airline_code_carrid",
       "flight_connection_number_connid",
       "flight_date_fldate"
   ]
);

publish("flight_usd", publishConfig).query(
   (ctx) => `
WITH flight_base AS (
   SELECT
       mandt,
       carrid,
       connid,
       fldate,
       price,
       currency,
       -- Convert flight date string (YYYYMMDD) to an integer to calculate SAP's inverted date key
       CAST(99999999 - CAST(fldate AS INT64) AS STRING) AS inverted_fldate
   FROM   ${ctx.ref(sapModuleCustNSConfigDatasetId, 'sflight')} AS flight
),
ranked_exchange_rates AS (
   SELECT
       f.mandt,
       f.carrid,
       f.connid,
       f.fldate,
       f.price,
       f.currency,
       t.ukurs,
       -- Window function to grab the closest historical exchange rate
       ROW_NUMBER() OVER (
           PARTITION BY f.mandt, f.carrid, f.connid, f.fldate
           ORDER BY t.gdatu ASC
       ) AS latest_rate_rank
   FROM flight_base f
   LEFT JOIN ${ctx.ref(sapModuleConfigDatasetId, 'tcurr')} AS t
     ON f.mandt = t.mandt
    AND t.kurst = 'M'       -- 'M' is the standard SAP default for average exchange rates
    AND t.fcurr = f.currency
    AND t.tcurr = 'USD'
    -- Chronological (rate_date <= flight_date) translates to (t.gdatu >= inverted_fldate)
    AND t.gdatu >= f.inverted_fldate
)

SELECT
   client_mandt,
   airline_code_carrid,
   flight_connection_number_connid,
   flight_date_fldate,
   price,
   currency,
   price_usd,
   CURRENT_TIMESTAMP() AS bq_loaded_at
FROM (
  SELECT
    mandt              AS client_mandt,
    carrid             AS airline_code_carrid,
    connid             AS flight_connection_number_connid,
    PARSE_TIMESTAMP('%Y%m%d', fldate) AS flight_date_fldate,
    price              AS price,
    currency           AS currency,
    -- Currency Conversion Logic
    CASE
       WHEN currency = 'USD' THEN price
       WHEN ukurs IS NULL   THEN NULL -- Handles cases where no exchange rate is found
       -- If UKURS is negative, it's an indirect quotation (1 USD = X Local) -> Divide
       WHEN ukurs < 0       THEN ROUND(price / ABS(ukurs), 2)
       -- If UKURS is positive, it's a direct quotation (1 Local = X USD) -> Multiply
       ELSE ROUND(price * ukurs, 2)
     END AS price_usd
  FROM ranked_exchange_rates
  WHERE latest_rate_rank = 1
)
${incremental.getWhere(ctx, ["flight_date_fldate"])}
`
);

验证自定义命名空间扩展

如需验证 Google Cloud Cortex Framework 数据产品模块是否已成功创建,请按以下步骤操作: