借助 Cloud Storage 批处理源插件,您可以从 Cloud Storage 存储分区中读取数据,并将其引入 Cloud Data Fusion 以进行进一步处理和转换。它支持您加载多种文件格式的数据,包括:
- 结构化:CSV、Avro、Parquet、ORC
- 半结构化:JSON、XML
- 其他:文本、二进制
准备工作
Cloud Data Fusion 通常有两个服务账号:
- 设计时服务账号:Cloud Data Fusion API Service Agent
- 执行时服务账号:Compute Engine 服务账号
在使用 Cloud Storage 批量来源插件之前,请向每个服务账号授予以下角色或权限。
Cloud Data Fusion API Service Agent
此服务账号已拥有所有必需的权限,您无需添加其他权限。
Compute Engine 服务账号
在您的 Google Cloud 项目中,向 Compute Engine 服务账号授予以下 IAM 角色或权限:
- Storage Legacy Bucket Reader (
roles/storage.legacyBucketReader)。此预定义角色包含所需的storage.buckets.get权限。 Storage Object Viewer (
roles/storage.legacyBucketReader)。此预定义角色包含以下必需权限:storage.objects.getstorage.objects.list
配置插件
- 前往 Cloud Data Fusion 网页界面,然后点击 Studio。
- 检查是否已选择数据流水线 - 批量(而非实时)。
- 在来源菜单中,点击 GCS。Cloud Storage 节点会显示在流水线中。
- 如需配置来源,请前往 Cloud Storage 节点,然后点击属性。
输入以下属性。如需查看完整列表,请参阅属性。
- 为 Cloud Storage 节点输入标签,例如
Cloud Storage tables。 输入连接详细信息。您可以设置新的临时连接,也可以设置现有的可重复使用的连接。
新增关联项
如需添加与 Cloud Storage 的一次性连接,请按以下步骤操作:
- 保持使用连接处于关闭状态。
- 在项目 ID 字段中,将值保留为自动检测。
在服务账号类型字段中,将值保留为文件路径,并将服务账号文件路径保留为自动检测。
可重复使用的连接
如需重复使用现有连接,请按以下步骤操作:
- 开启使用连接。
- 点击浏览连接。
点击连接名称,例如 Cloud Storage Default。
可选:如果不存在连接,并且您想创建新的可重复使用的连接,请点击添加连接,然后参阅本页新连接标签页中的步骤。
在参考名称字段中,输入要用于谱系的名称,例如
data-fusion-gcs-campaign。在路径字段中,输入要读取的路径,例如
gs://BUCKET_PATH。在格式字段中,为正在读取的数据选择以下文件格式之一:
- avro
- blob(blob 格式需要包含名为 body 的字节类型字段的架构)
- csv
- delimited
- json
- parquet
- 文本(文本格式需要包含名为“正文”的字符串类型字段的架构)
- tsv
- 您在环境中部署的任何格式插件的名称
可选:如需测试连接,请点击获取架构。
可选:在样本大小字段中,输入要检查所选数据类型的最大行数,例如
1000。可选:在替换字段中,输入要跳过的列名称及其相应的数据类型。
可选:输入高级属性,例如最小拆分大小或正则表达式路径过滤条件(请参阅属性)。
可选:在临时存储桶名称字段中,输入 Cloud Storage 存储桶的名称。
- 为 Cloud Storage 节点输入标签,例如
可选:点击验证,并解决发现的所有错误。
点击关闭。 系统会保存这些属性,您可以继续在 Cloud Data Fusion Studio 中构建数据流水线。
属性
| 属性 | 已启用宏 | 必需属性 | 说明 |
|---|---|---|---|
| 标签 | 否 | 是 | 数据流水线中节点的名称。 |
| 使用连接 | 否 | 否 | 浏览可重复使用的来源连接。如需详细了解如何添加、导入和修改浏览连接时显示的连接,请参阅管理连接。 |
| 连接 | 是 | 是 | 如果使用连接处于开启状态,此字段中会显示您选择的可重复使用的连接的名称。 |
| 项目 ID | 是 | 否 | 仅在关闭使用连接时使用。项目的全局唯一标识符。 默认值为 auto-detect。 |
| 服务账号类型 | 是 | 否 | 选择以下某个选项:
|
| 服务账号文件路径 | 是 | 否 | 仅当服务账号类型值为文件路径时使用。用于授权的服务账号密钥在本地文件系统中的路径。如果作业在 Managed Service for Apache Spark 集群上运行,请将该值设置为自动检测。如果作业在其他类型的集群上运行,则该文件必须存在于集群中的每个节点上。 默认值为 auto-detect。 |
| 服务账号 JSON | 是 | 否 | 仅当服务账号类型值为 JSON 时使用。 服务账号的 JSON 文件内容。 |
| 参考名称 | 否 | 是 | 用于唯一标识此来源以供其他服务(例如沿袭和注释元数据)使用的名称。 |
| 路径 | 是 | 是 | 要读取的文件的路径。如果指定了目录,请以反斜杠 (/) 结束路径。例如,gs://bucket/path/to/directory/。如需匹配文件名模式,您可以使用星号 (*) 作为通配符。如果未找到或未匹配任何文件,流水线将失败。 |
| 格式 | 否 | 是 | 要读取的数据的格式。格式必须是以下格式之一:
|
| 样本规模 | 是 | 否 | 用于自动检测数据类型的调查行数上限。默认值为 1000。 |
| 替换 | 是 | 否 | 一个列列表,其中包含相应的数据,系统会跳过对这些数据的自动数据类型检测。 |
| 定界符 | 是 | 否 | 当格式为定界时要使用的定界符。对于其他格式,系统会忽略此属性。 |
| 启用引用值 | 是 | 否 | 是否将英文引号之间的内容视为值。此属性仅用于 csv、tsv 或分隔格式。例如,如果此属性设置为 true,则以下内容会输出两个字段:1, "a, b, c"。
第一个字段的值为 1。第二个具有 a, b, c。引号字符会被截掉。换行符定界符不能位于英文引号内。插件会假定英文引号已正确闭合,例如 "a, b, c"。不关闭引号 ("a,b,c,) 会导致错误。默认值为 False。 |
| 将第一行用作标题 | 是 | 否 | 是否将每个文件的第一行用作列标题。支持的格式包括文本、CSV、TSV 和分隔。 默认值为 False。 |
| 最小拆分规模 | 是 | 否 | 每个输入分区的最小大小(以字节为单位)。较小的分区可以提高并行级别,但需要更多资源和开销。
如果格式值为 blob,则无法拆分数据。 |
| 拆分大小上限 | 是 | 否 | 每个输入分区的最大大小(以字节为单位)。较小的分区可以提高并行级别,但需要更多资源和开销。
如果 Format 值为 blob,则无法拆分数据。默认值为 128 MB。 |
| 正则表达式路径过滤条件 | 是 | 否 | 文件路径必须匹配的正则表达式,才能包含在输入中。系统会比较完整路径,而不仅仅是文件名。如果未指定任何文件,则不会进行文件过滤。如需详细了解正则表达式语法,请参阅模式。 |
| 路径字段 | 是 | 否 | 用于放置记录读取来源文件的路径的输出字段。如果未指定,则输出记录中不包含路径。如果指定了此值,则该字段必须以字符串形式存在于输出架构中。 |
| 仅限路径文件名 | 是 | 否 | 如果设置了 Path 字段属性,则仅使用文件名,而不使用路径的 URI。 默认值为 False。 |
| 以递归方式读取文件 | 是 | 否 | 是否要从相应路径以递归方式读取文件。 默认值为 False。 |
| 允许输入为空 | 是 | 否 | 是否允许包含无数据的输入路径。如果设置为 False,当没有数据可供读取时,插件会出错。如果设置为 True,则不会抛出任何错误,并且不会读取任何记录。 默认值为 False。 |
| 数据文件已加密 | 是 | 否 | 文件是否已加密。如需了解详情,请参阅数据文件加密。 默认值为 False。 |
| 加密元数据文件后缀名 | 是 | 否 | 加密元数据文件的文件后缀名。 默认值为 metadata。 |
| 文件系统属性 | 是 | 否 | 读取数据时要与 InputFormat 搭配使用的其他属性。 |
| 文件编码 | 是 | 否 | 要读取的文件的字符编码。 默认值为 UTF-8。 |
| 输出架构 | 是 | 否 | 如果设置了 Path 字段属性,则该属性必须以字符串形式存在于架构中。 |
数据文件加密
本部分介绍了数据文件加密属性。如果将其设置为 true,则会使用 Tink 库提供的流式 AEAD 来解密文件。每个数据文件都必须附带一个包含加密信息的元数据文件。例如,如果加密数据文件位于 gs://BUCKET/PATH_TO_DIRECTORY/file1.csv.enc
,则其元数据文件必须位于 gs://BUCKET/
PATH_TO_DIRECTORY/file1.csv.enc.metadata。元数据文件包含一个具有以下属性的 JSON 对象:
| 属性 | 说明 |
|---|---|
kms |
用于加密数据加密密钥的 Cloud Key Management Service URI。 |
aad |
加密中使用的 Base64 编码的经过身份验证的额外数据。 |
key set |
一个 JSON 对象,表示来自 Tink 库的序列化密钥集信息。 |
示例
/* Counting example */ { "kms": "gcp-kms://projects/my-key-project/locations/us-west1/keyRings/my-key-ring/cryptoKeys/mykey", "aad": "73iT4SUJBM24umXecCCf3A==", "keyset": { "keysetInfo": { "primaryKeyId": 602257784, "keyInfo": [{ "typeUrl": "type.googleapis.com/google.crypto.tink.AesGcmHkdfStreamingKey", "outputPrefixType": "RAW", "keyId": 602257784, "status": "ENABLED" }] }, "encryptedKeyset": "CiQAz5HH+nUA0Zuqnz4LCnBEVTHS72s/zwjpcnAMIPGpW6kxLggSrAEAcJKHmXeg8kfJ3GD4GuFeWDZzgGn3tfolk6Yf5d7rxKxDEChIMWJWGhWlDHbBW5B9HqWfKx2nQWSC+zjM8FLefVtPYrdJ8n6Eg8ksAnSyXmhN5LoIj6az3XBugtXvCCotQHrBuyoDY+j5ZH9J4tm/bzrLEjCdWAc+oAlhsUAV77jZhowJr6EBiyVuRVfcwLwiscWkQ9J7jjHc7ih9HKfnqAZmQ6iWP36OMrEn" } }
版本说明
后续步骤
- 详细了解 Cloud Data Fusion 中的插件。