示例 SQL 查询

本文档包含对存储在已升级为使用 Log Analytics 的日志存储桶中的日志条目的查询示例。在这些存储桶中,您可以通过 Google Cloud 控制台的Log Analytics页面运行 SQL 查询。如需查看更多示例,请参阅 logging-analytics-samplessecurity-analytics GitHub 代码库。

本文档未介绍 SQL,也未介绍如何路由和存储日志条目。如需了解这些主题,请参阅后续步骤部分。

本页面上的示例查询的是日志视图。如需查询分析视图,请使用以下路径格式:`analytics_view.PROJECT_ID.LOCATION.ANALYTICS_VIEW_ID`。 在上述表达式中,PROJECT_ID 是您的项目 ID,LOCATIONANALYTICS_VIEW_ID 分别是您的分析视图的位置和名称。

SQL 语言支持

Log Analytics 页面中使用的查询支持 GoogleSQL 函数,但有一些例外情况。

通过 Log Analytics 页面发出的 SQL 查询不支持以下 SQL 命令:

  • DDL 和 DML 命令
  • JavaScript 用户定义的函数
  • BigQuery ML 函数
  • SQL 变量

仅当您使用 BigQuery StudioLooker Studio 页面或 bq 命令行工具查询关联的数据集时,系统才支持以下各项:

  • JavaScript 用户定义的函数
  • BigQuery ML 函数
  • SQL 变量

最佳做法

如需设置查询的时间范围,我们建议您使用时间范围选择器。例如,如需查看过去一周的数据,请从时间范围选择器中选择过去 7 天。您还可以使用时间范围选择器指定开始时间和结束时间、指定要查看的时间范围,以及更改时区。

如果您在 WHERE 子句中添加 timestamp 字段,则不会使用时间范围选择器设置。以下示例展示了如何按时间戳过滤:

-- Matches log entries whose timestamp is within the most recent 1 hour.
WHERE timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)

如需详细了解如何按时间过滤,请参阅时间函数时间戳函数

准备工作

本部分介绍了在使用 Log Analytics 之前必须完成的步骤。

配置日志存储桶

确保您的日志存储桶已升级为使用 Log Analytics:

  1. 在 Google Cloud 控制台中,前往日志存储页面:

    前往日志存储

    如果您使用搜索栏查找此页面,请选择子标题为 Logging 的结果。

  2. 对于包含您要查询的日志视图的每个日志存储桶,请确保 Log Analytics 可用列显示打开。如果显示升级,请点击升级并完成对话框。

配置 IAM 角色和权限

本部分介绍了使用 Log Analytics 所需的 IAM 角色或权限:

  • 如需获得使用 Log Analytics 和查询日志视图所需的权限,请让您的管理员为您授予项目的以下 IAM 角色:

    • 查询 _Required_Default 日志存储桶:Logs Viewer (roles/logging.viewer)
    • 查询项目中的所有日志视图:Logs View Accessor (roles/logging.viewAccessor)

    您可以通过以下方式限制主账号只能查看特定日志视图:向项目级别授予的 Logs View Accessor 角色添加 IAM 条件,或者向日志视图的政策文件添加 IAM 绑定。如需了解详情,请参阅控制对日志视图的访问权限

    这些权限与您在 Logs Explorer 页面上查看日志条目时需要的权限相同。如需了解您需要哪些额外角色才能查询用户定义的存储桶的视图,或者查询 _Default 日志存储桶的 _AllLogs 视图,请参阅 Cloud Logging 角色

  • 如需获得查询分析视图所需的权限,请让您的管理员为您授予项目的 Observability Analytics User (roles/observability.analyticsUser) IAM 角色。

如何使用本页面上的查询

  1. 在 Google Cloud 控制台中,前往 Log Analytics 页面:

    转到 Log Analytics

    如果您使用搜索栏查找此页面,请选择子标题为 Logging 的结果。

  2. 查询窗格中,点击  SQL,然后将查询复制并粘贴到 SQL 查询窗格中。

    在复制查询之前,请在 FROM 子句中替换以下字段

    • PROJECT_ID:项目的标识符。
    • LOCATION:日志视图或分析视图的位置。
    • BUCKET_ID:日志存储桶的名称或 ID。
    • LOG_VIEW_ID:日志视图的标识符,长度不得超过 100 个字符,并且只能包含字母、数字、下划线和连字符。

    以下内容展示了日志视图FROM 子句的格式:

    FROM `PROJECT_ID.LOCATION.BUCKET_ID.LOG_VIEW_ID`
    

    本页上的日志示例会查询日志视图。如需查询分析视图,请使用以下路径格式:`analytics_view.PROJECT_ID.LOCATION.ANALYTICS_VIEW_ID`。 在上述表达式中,PROJECT_ID 是您的项目 ID,LOCATIONANALYTICS_VIEW_ID 分别是您的分析视图的位置和名称。

如需在 BigQuery Studio 页面上使用本文档中显示的查询,或使用 bq 命令行工具,请修改 FROM 子句并输入关联数据集的路径。例如,如需查询项目 myproject 中名为 mydataset 的关联数据集上的 _AllLogs 视图,路径为 myproject.mydataset._AllLogs

常见使用场景

本部分列出了几个常见用例,可能有助于您创建自定义查询。

显示默认日志存储桶中的日志条目

如需查询 _Default 存储桶,请运行以下查询:

SELECT
  timestamp, severity, resource.type, log_name, text_payload, proto_payload, json_payload
FROM
  `PROJECT_ID.LOCATION._Default._AllLogs`
-- Limit to 1000 entries
LIMIT 1000

按正则表达式提取字段值

如需使用正则表达式从字符串中提取值,请使用函数 REGEXP_EXTRACT

SELECT
  -- Display the timestamp, and the part of the name that begins with test.
  timestamp, REGEXP_EXTRACT(JSON_VALUE(json_payload.jobName), r".*(test.*)$") AS name,
FROM
  `PROJECT_ID.LOCATION.BUCKET_ID.LOG_VIEW_ID`
WHERE
  -- Get the value of jobName, which is a subfield in a JSON structure.
  JSON_VALUE(json_payload.jobName) IS NOT NULL
ORDER BY timestamp DESC
LIMIT 20

如需了解详情,请参阅 REGEXP_EXTRACT 文档

对于子字符串匹配(如上一个查询),使用 CONTAINS_SUBSTR 函数可提高查询效率。

过滤日志条目

如需对查询应用过滤条件,请添加 WHERE 子句。您在此子句中使用的语法取决于字段的数据类型。本部分提供了多种不同数据类型的示例。

按载荷类型过滤日志条目

日志条目可以具有以下三种载荷类型之一。如需按载荷类型过滤日志条目,请使用以下子句之一:

  • 文本载荷

    -- Matches log entries that have a text payload.
    WHERE text_payload IS NOT NULL
    
  • JSON 载荷

    -- Matches log entries that have a JSON payload.
    WHERE json_payload IS NOT NULL
    
  • Proto 载荷

    -- Matches log entries that have a proto payload.
    -- Because proto_payload has a data type of RECORD, this statement tests
    -- whether a mandatory subfield exits.
    WHERE proto_payload.type IS NOT NULL
    

在查询结果中,json_payloadproto_payload 字段均以 JSON 格式呈现,您可以浏览这些字段。

按时间戳过滤日志数据

如需按时间戳过滤日志条目,建议您使用时间范围选择器。不过,您也可以在 WHERE 子句中指定 timestamp

-- Matches log entries whose timestamp is within the most recent hour
WHERE timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)

如需详细了解如何按时间过滤,请参阅时间函数时间戳函数

按资源过滤

如需按资源过滤日志数据,请向 WHERE 子句添加 resource.type 语句:

-- Matches log entries whose resource type is gce_instance
WHERE resource.type = "gce_instance"

按严重程度过滤

如需按严重程度过滤日志数据,请向 WHERE 子句添加 severity 语句:

-- Matches log entries whose severity is INFO or ERROR
WHERE severity IS NOT NULL AND severity IN ('INFO', 'ERROR')

您还可以按 severity_number(整数)过滤日志条目。例如,以下子句会匹配严重级别至少为 NOTICE 的所有日志条目:

-- Matches log entries whose severity level is at least NOTICE
WHERE severity_number IS NOT NULL AND severity_number > 200

如需了解枚举值,请参阅 LogSeverity

按日志名称过滤

如需按日志名称过滤日志数据,请向 WHERE 子句添加 log_namelog_id 语句:

  • 日志名称指定资源路径:

    -- Matches log entries that have the following log ID.
    WHERE log_name="projects/cloud-logs-test-project/logs/cloudaudit.googleapis.com%2Factivity"
    
  • 日志 ID 省略了资源路径:

    -- Matches log entries that have the following log id.
    WHERE log_id = "cloudaudit.googleapis.com/data_access"
    

按资源标签过滤日志条目

资源标签以 JSON 结构的形式存储。如需按 JSON 结构中某个字段的值进行过滤,请使用函数 JSON_VALUE

SELECT
  timestamp, JSON_VALUE(resource.labels.zone) AS zone, json_payload, resource, labels
FROM
  `PROJECT_ID.LOCATION.BUCKET_ID.LOG_VIEW_ID`
WHERE
  -- Matches log entries whose resource type is gce_instance and whose zone is
  -- us-central1-f. Because resource has data type JSON, you must use JSON_VALUE
  -- to get the value for subfields, like zone.
  resource.type = "gce_instance" AND
  JSON_VALUE(resource.labels.zone) = "us-central1-f"
ORDER BY timestamp ASC

上一个查询依赖于资源标签在日志条目中的存储格式。以下是资源字段的示例:

{
   type: "gce_instance"
   labels: {
      instance_id: "1234512345123451"
      project_id: "my-project"
      zone: "us-central1-f"
   }
}

如需了解可检索和转换 JSON 数据的所有函数,请参阅 JSON 函数

按 HTTP 请求过滤

如需仅查询包含 HTTP 请求字段的日志条目,请使用以下子句:

-- Matches log entries that have a HTTP request_method field.
-- Don't compare http_request to NULL. This field has a data type of RECORD.
WHERE http_request.request_method IS NOT NULL

您还可以使用 IN 语句:

-- Matches log entries whose HTTP request_method is GET or POST.
WHERE http_request.request_method IN ('GET', 'POST')

按 HTTP 状态过滤

如需仅查询具有 HTTP 状态的日志条目,请使用以下子句:

-- Matches log entries that have an http_request.status field.
WHERE http_request.status IS NOT NULL

按 JSON 数据类型中的字段进行过滤

如需仅在具有 JSON 数据类型的字段的子字段具有特定值时查询日志条目,请使用函数 JSON_VALUE 提取该值:

-- Compare the value of the status field to NULL.
WHERE JSON_VALUE(json_payload.status) IS NOT NULL

上述子句与以下子句略有不同:

-- Compare the status field to NULL.
WHERE json_payload.status IS NOT NULL

第一个子句用于测试状态字段的值是否为 NULL。第二个子句用于测试是否存在状态字段。假设某个日志视图包含两个日志条目。对于一个日志条目,json_payload 字段采用以下格式:

{
    status: {
        measureTime: "1661517845"
    }
}

对于另一个日志条目,json_payload 字段的结构有所不同:

{
    @type: "type.googleapis.com/google.cloud.scheduler.logging.AttemptFinished"
    jobName: "projects/my-project/locations/us-central1/jobs/test1"
    relativeUrl: "/food=cake"
    status: "NOT_FOUND"
    targetType: "APP_ENGINE_HTTP"
}

子句 WHERE json_payload.status IS NOT NULL 可匹配这两个日志条目。不过,子句 WHERE JSON_VALUE(json_payload.status) IS NOT NULL 仅与第二个日志条目匹配。

对日志条目进行分组和汇总

本部分以之前的示例为基础,说明如何对日志条目进行分组和汇总。如果您未指定分组,但指定了聚合,则系统会输出一个结果,因为 SQL 会将满足 WHERE 子句的所有行视为一个组。

每个 SELECT 表达式都必须包含在组字段中或进行汇总。

按时间戳对日志条目进行分组

如需按时间戳对数据进行分组,请使用 TIMESTAMP_TRUNC 函数,该函数会将时间戳截断为指定的粒度,例如 HOUR

SELECT
  -- Truncate the timestamp by hour.
  TIMESTAMP_TRUNC(timestamp, HOUR) AS hour,
  JSON_VALUE(json_payload.status) AS status,
  -- Count the number log entries in each group.
  COUNT(*) AS count
FROM
  `PROJECT_ID.LOCATION.BUCKET_ID.LOG_VIEW_ID`
WHERE
  -- Matches log entries that have a status field whose value isn't NULL.
  json_payload IS NOT NULL AND JSON_VALUE(json_payload.status) IS NOT NULL
GROUP BY
  -- Group by hour and status
  hour,status
ORDER BY hour ASC

如需了解详情,请参阅 TIMESTAMP_TRUNC 文档日期时间函数

按资源对日志条目进行分组

以下查询展示了如何按资源类型对日志条目进行分组,然后统计每个组中的日志条目数:

SELECT
   -- Count the number of log entries for each resource type
   resource.type, COUNT(*) AS count
FROM
  `PROJECT_ID.LOCATION.BUCKET_ID.LOG_VIEW_ID`
GROUP BY resource.type
LIMIT 100

按严重程度对日志条目进行分组

以下查询展示了如何按严重程度对日志条目进行分组,然后统计每个组中的日志条目数量:

SELECT
  -- Count the number of log entries for each severity.
  severity, COUNT(*) AS count
FROM
  `PROJECT_ID.LOCATION.BUCKET_ID.LOG_VIEW_ID`
WHERE
  severity IS NOT NULL
GROUP BY severity
ORDER BY severity
LIMIT 100

log_id 对日志条目进行分组

以下查询展示了如何按日志 ID 对日志条目进行分组,然后统计每个组中的日志条目数:

SELECT
  -- Count the number of log entries for each log ID.
  log_id, COUNT(*) AS count
FROM
  `PROJECT_ID.LOCATION.BUCKET_ID.LOG_VIEW_ID`
GROUP BY log_id
ORDER BY count DESC
LIMIT 100

计算每个网址的 HTTP 请求的平均延迟时间

以下查询展示了如何按 HTTP 请求网址和位置对日志条目进行分组,然后统计每个组中的日志条目数:

SELECT
  -- Compute the average latency for each group. Because the labels field has a
  -- data type of JSON, use JSON_VALUE to get the value of checker_location.
  JSON_VALUE(labels.checker_location) AS location,
  AVG(http_request.latency.seconds) AS secs, http_request.request_url
FROM
  `PROJECT_ID.LOCATION.BUCKET_ID.LOG_VIEW_ID`
WHERE
  -- Matches log entries when the request_method field is GET.
  http_request IS NOT NULL AND http_request.request_method IN ('GET')
GROUP BY
  -- Group by request URL and location
  http_request.request_url, location
ORDER BY location
LIMIT 100

计算子网络测试的平均发送字节数

以下查询展示了如何按资源标签中指定的位置对日志条目进行分组,然后计算每个组中的日志条目数:

SELECT
  -- Compute the average number of bytes sent per location. Because labels has
  -- a data type of JSON, use JSON_VALUE to get the value of the location field.
  -- bytes_sent is a string. Must cast to a FLOAT64 before computing average.
  JSON_VALUE(resource.labels.location) AS location,
  AVG(CAST(JSON_VALUE(json_payload.bytes_sent) AS FLOAT64)) AS bytes
FROM
  `PROJECT_ID.LOCATION.BUCKET_ID.LOG_VIEW_ID`
WHERE
  resource.type = "gce_subnetwork" AND json_payload IS NOT NULL
GROUP BY
  -- Group by location
  location
LIMIT 100

如需了解详情,请参阅 JSON 函数转换函数

统计具有与某种模式匹配的字段的日志条目数

如需返回与正则表达式匹配的子字符串,请使用函数 REGEXP_EXTRACT

SELECT
  -- Extract the value that begins with test.
  -- Count the number of log entries for each name.
  REGEXP_EXTRACT(JSON_VALUE(json_payload.jobName), r".*(test.*)$") AS name,
  COUNT(*) AS count
FROM
  `PROJECT_ID.LOCATION.BUCKET_ID.LOG_VIEW_ID`
WHERE
  json_payload.jobName IS NOT NULL
GROUP BY name
ORDER BY count
LIMIT 20

如需查看更多示例,请参阅 REGEXP_EXTRACT 文档

本部分介绍了可用于搜索所查询视图的多个列的两种方法:

  • 基于词元的搜索:您需要指定搜索位置、搜索查询,然后使用 SEARCH 函数。由于 SEARCH 函数在搜索数据方面有特定规则,因此我们建议您阅读 SEARCH 文档

  • 基于子字符串的搜索:您提供搜索位置、字符串字面量,然后使用函数 CONTAINS_SUBSTR。系统会执行不区分大小写的测试,以确定表达式中是否存在字符串字面量。如果存在字符串字面量,CONTAINS_SUBSTR 函数会返回 TRUE,否则返回 FALSE。搜索值必须是 STRING 字面量,但不能是字面量 NULL

以下查询仅保留字段与“35.193.12.15”完全匹配的行:

SELECT
  timestamp, log_id, proto_payload, severity, resource.type, resource, labels
FROM
  `PROJECT_ID.LOCATION.BUCKET_ID.LOG_VIEW_ID` AS t
WHERE
  -- Search data access audit logs for the IP address that matches 35.193.12.15.
  -- The use of backticks prevents the string from being tokenized.
  proto_payload IS NOT NULL AND
  log_id = "cloudaudit.googleapis.com/data_access" AND
  SEARCH(t,"`35.193.12.15`")
ORDER BY timestamp ASC
LIMIT 20

如果查询字符串中省略了反引号,系统会根据 SEARCH 文档中定义的规则拆分查询字符串。例如,运行以下语句时,查询字符串会被拆分为四个token:“35”“193”“12”和“15”:

  SEARCH(t,"35.193.12.15")

当单个字段匹配所有四个token时,前面的 SEARCH 语句会匹配一行。token 的顺序无关紧要。

您可以在一个查询中添加多个 SEARCH 语句。例如,在上面的查询中,您可以将日志 ID 的过滤条件替换为如下语句:

  SEARCH(t,"`cloudaudit.googleapis.com/data_access`")

前一条语句会搜索日志视图中日志条目的每个字段,而原始语句仅搜索日志条目的 log_id 字段。

如需对多个字段执行多次搜索,请使用空格分隔各个字符串。例如,以下语句会匹配字段包含“Hello World”“happy”和“days”的行:

  SEARCH(t,"`Hello World` happy days")

最后,您可以搜索特定字段,而不是搜索整个表。例如,以下语句仅搜索名为 text_payloadjson_payload 的列:

   SEARCH((text_payload, json_payload) ,"`35.222.132.245`")

如需了解 SEARCH 函数的参数如何处理,请参阅 BigQuery 参考页面搜索函数

例如,以下查询会提取具有特定 IP 地址且时间戳位于特定时间范围内的所有数据访问审核日志条目。最后,查询会对结果进行排序,然后显示 20 个最旧的结果:

SELECT
  timestamp, log_id, proto_payload, severity, resource.type, resource, labels
FROM
  `PROJECT_ID.LOCATION.BUCKET_ID.LOG_VIEW_ID` AS t
WHERE
  -- Search data access audit logs for the IP address that matches 35.193.12.15.
  -- CONTAINS_SUBSTR performs a contains-test.
  proto_payload IS NOT NULL AND
  log_id = "cloudaudit.googleapis.com/data_access" AND
  CONTAINS_SUBSTR(t,"35.193.12.15")
ORDER BY timestamp ASC
LIMIT 20

查询多个视图

查询语句可扫描一个或多个表或表达式,并返回计算的结果行。例如,您可以使用查询语句以各种方式合并不同表或数据集中的 SELECT 语句的结果,然后从合并后的数据中选择列。

查询多个视图时,这些视图必须位于同一位置。例如,如果两个视图位于 us-east1 位置,则一个查询可以同时查询这两个视图。您还可以查询位于 us 多区域中的两个视图。不过,如果视图的位置为 global,则该视图可以位于任何实际位置。因此,如果两个视图的位置为 global,则它们之间的联接可能会失败。

按跟踪记录 ID 联接两个日志视图

如需合并两个表中的信息,请使用以下 join 运算符之一:

SELECT
  -- Do an inner join on two tables by using the span ID and trace ID.
  -- Don't join only by span ID, as this field isn't globally unique.
  -- From the first view, show the timestamp, severity, and JSON payload.
  -- From the second view, show the JSON payload.
  a.timestamp, a.severity, a.json_payload, b.json_payload, a.span_id, a.trace
FROM  `PROJECT_ID.LOCATION.BUCKET_ID.LOG_VIEW_1` a
JOIN  `PROJECT_ID.LOCATION.BUCKET_ID.LOG_VIEW_2` b
ON
  a.span_id = b.span_id AND
  a.trace = b.trace
LIMIT 100

使用并集语句查询两个日志视图

如需合并两个或更多 SELECT 语句的结果并舍弃重复的行,请使用 UNION 运算符。如需保留重复的行,请使用 UNION ALL 运算符:

SELECT
  timestamp, log_name, severity, json_payload, resource, labels
-- Create a union of two log views
FROM(
  SELECT * FROM `PROJECT_ID.LOCATION.BUCKET_ID.LOG_VIEW_1`
  UNION ALL
  SELECT * FROM `PROJECT_ID.LOCATION.BUCKET_ID.LOG_VIEW_2`
)
-- Sort the union by timestamp.
ORDER BY timestamp ASC
LIMIT 100

移除重复的日志条目

在运行查询之前,Log Analytics 不会移除重复的日志条目。 这种行为与使用Logs Explorer查询日志条目时不同,后者会通过比较日志名称、时间戳和插入 ID 字段来移除重复条目。

您可以使用行级验证来移除重复的日志条目。

如需了解详情,请参阅问题排查:我的 Log Analytics 结果中存在重复的日志条目

后续步骤

如需了解如何路由和存储日志条目,请参阅以下文档:

如需查看 SQL 参考文档,请参阅以下文档: