本教程介绍如何连接 BigQuery 表或视图以从 Dataricks 笔记本读取和写入数据。其中使用Google Cloud 控制台和 Databricks 工作区介绍了相应步骤。虽然您也可以使用 gcloud
和 databricks
命令行工具执行这些步骤,但该指南不在本教程的介绍范围内。
Databricks on Google Cloud 是 Google Cloud上托管的 Databricks 环境,它在 Google Kubernetes Engine (GKE) 上运行,并提供与 BigQuery 和其他 Google Cloud 技术的内置集成。如果您刚开始接触 Databricks,请观看 Databricks 统一数据平台简介视频,大致了解 Databricks 湖仓一体平台。
部署 Databricks on Google Cloud
完成以下步骤即可为部署 Databricks on Google Cloud做好准备。
- 如需设置 Databoards 账号,请按照 Databricks 文档设置 Databricks on Google Cloud账号中的说明操作。
- 注册后,详细了解如何管理 Databricks 账号。
创建 Databrick 工作区、集群和笔记本
以下步骤介绍了如何创建 Dataricks 工作区、集群和 Python 笔记本,以编写代码访问 BigQuery。
确认 Datastone 前提条件。
创建您的第一个工作区。在 Datastones 账号控制台上,点击创建工作区。
为工作区名称指定
gcp-bq
,然后选择您的区域。如需确定您的 Google Cloud 项目 ID,请访问 Google Cloud 控制台,然后将值复制到Google Cloud 项目 ID 字段。
点击保存创建 Dataricks 工作区。
如需创建使用 Databricks 运行时 7.6 或更高版本的 Databricks 集群,请在左侧菜单栏中选择集群,然后点击顶部的创建集群。
指定集群名称及其大小,然后点击高级选项并指定 Google Cloud服务账号的邮箱。
点击创建集群。
如需为 Dataricks 创建 Python 笔记本,请按照创建笔记本中的说明操作。
从 Databricks 查询 BigQuery
使用上述配置时,您可以安全地将 Databricks 连接到 BigQuery。Databricks 使用开源 Google Spark 适配器的分支来访问 BigQuery。
Datastones 通过自动下推某些查询谓词(例如,过滤嵌套到 BigQuery 的列),可减少数据转移并加快查询速度。此外,增加了使用 query()
API 在 BigQuery 上首次运行 SQL 查询的功能,可减小生成的数据集的传输大小。
以下步骤介绍了如何访问 BigQuery 中的数据集,以及如何将您自己的数据写入 BigQuery。
访问 BigQuery 上的公共数据集
BigQuery 提供了可用的公共数据集列表。如需查询属于公共数据集的 BigQuery 莎士比亚数据集,请按照以下步骤操作:
如需读取 BigQuery 表,请在 Databricks 笔记本中使用以下代码段。
table = "bigquery-public-data.samples.shakespeare" df = spark.read.format("bigquery").option("table",table).load() df.createOrReplaceTempView("shakespeare")
按
Shift+Return
执行该代码。现在,您可以通过 Spark DataFrame (
df
) 查询 BigQuery 表。例如,使用以下命令可显示数据帧的前三行:df.show(3)
如需查询其他表,请更新
table
变量。Databricks 笔记本的一个重要特征是,您可以在单个笔记本中混合不同语言(如 Scala、Python 和 SQL)的单元。
以下 SQL 查询允许您在运行先前创建临时视图的单元后直观呈现莎士比亚中的字数统计。
%sql SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word ORDER BY word_count DESC LIMIT 12
上面的单元会对 Databricks 集群(而不是 BigQuery)中的数据帧运行 Spark SQL 查询。这种方法的优势在于,数据分析在 Spark 级层进行,系统不会发出进一步的 BigQuery API 调用,因此不会产生额外的 BigQuery 费用。
作为替代方案,您可以使用
query()
API 将 SQL 查询的执行委托给 BigQuery 并进行优化,以减小生成的数据帧的传输大小。与上述示例(在 Spark 中完成处理)不同,如果使用此方法,则价格和查询优化适用于在 BigQuery 上执行查询。以下示例使用 Scala、
query()
API 以及 BigQuery 中的公共莎士比亚数据集来计算莎士比亚作品中五个最常见的字词。在运行代码之前,您必须先在 BigQuery 中创建一个名为mdataset
的空数据集,代码可引用该数据集。如需了解详情,请参阅将数据写入 BigQuery。%scala // public dataset val table = "bigquery-public-data.samples.shakespeare" // existing dataset where the Google Cloud user has table creation permission val tempLocation = "mdataset" // query string val q = s"""SELECT word, SUM(word_count) AS word_count FROM ${table} GROUP BY word ORDER BY word_count DESC LIMIT 10 """ // read the result of a GoogleSQL query into a DataFrame val df2 = spark.read.format("bigquery") .option("query", q) .option("materializationDataset", tempLocation) .load() // show the top 5 common words in Shakespeare df2.show(5)
如需查看更多代码示例,请参阅 Dataricks BigQuery 示例笔记本。
将数据写入 BigQuery
BigQuery 表位于数据集中。 在将数据写入 BigQuery 表之前,您必须在 BigQuery 中创建新数据集。如需为 Databricks Python 笔记本创建数据集,请按以下步骤操作:
前往 Google Cloud 控制台中的 BigQuery 页面。
展开
操作选项,点击创建数据集,然后将其命名为together
。在 Databricks Python 笔记本中,使用以下代码段根据包含三个字符串条目的 Python 列表创建一个简单的 Spark 数据帧。
from pyspark.sql.types import StringType mylist = ["Google", "Databricks", "better together"] df = spark.createDataFrame(mylist, StringType())
在笔记本中添加另一个单元,以便将上一步中的 Spark 数据帧写入数据集
together
中的 BigQuery 表myTable
。系统会创建或覆盖该表。使用您之前指定的存储桶名称。bucket = YOUR_BUCKET_NAME table = "together.myTable" df.write .format("bigquery") .option("temporaryGcsBucket", bucket) .option("table", table) .mode("overwrite").save()
如需验证是否已成功写入数据,请通过 Spark DataFrame (
df
) 查询并显示 BigQuery 表:display(spark.read.format("bigquery").option("table", table).load)