Monitorar consultas contínuas

É possível monitorar as consultas contínuas do BigQuery usando as seguintes ferramentas do BigQuery:

Devido à natureza de longa duração das consultas contínuas do BigQuery, métricas que geralmente são geradas após a conclusão de uma consulta SQL podem estar ausentes ou imprecisas.

Usar visualizações INFORMATION_SCHEMA

É possível usar várias visualizações INFORMATION_SCHEMA para monitorar consultas contínuas e reservas de consulta contínuas.

Mais detalhes do job

Você pode usar a visualização JOBS para receber metadados dos jobs de consulta contínuos.

A consulta a seguir retorna os metadados de todas as consultas contínuas ativas. Os metadados incluem o carimbo de data/hora da marca d'água de saída, que representa o ponto até o qual a consulta contínua processou dados com sucesso.

  1. No console Google Cloud , acesse a página BigQuery.

    Acessar o BigQuery

  2. No Editor de consultas, execute esta consulta:

    SELECT
      start_time,
      job_id,
      user_email,
      query,
      state,
      reservation_id,
      continuous_query_info.output_watermark
    FROM `PROJECT_ID.region-REGION.INFORMATION_SCHEMA.JOBS`
    WHERE
      creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 day)
      AND continuous IS TRUE
      AND state = "RUNNING"
    ORDER BY
      start_time DESC

    Substitua:

Ver detalhes da atribuição de reserva

Você pode usar as visualizações ASSIGNMENTS e RESERVATIONS para receber detalhes da atribuição de reserva de consulta contínua.

Retorne detalhes de atribuição de reserva para consultas contínuas:

  1. No console Google Cloud , acesse a página BigQuery.

    Acessar o BigQuery

  2. No Editor de consultas, execute esta consulta:

    SELECT
      reservation.reservation_name,
      reservation.slot_capacity
    FROM
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.ASSIGNMENTS`
        AS assignment
    INNER JOIN
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.RESERVATIONS`
        AS reservation
      ON (assignment.reservation_name = reservation.reservation_name)
    WHERE
      assignment.assignee_id = 'PROJECT_ID'
      AND job_type = 'CONTINUOUS';

    Substitua:

    • ADMIN_PROJECT_ID: o ID do projeto de administração que contém a reserva.
    • LOCATION: o local da reserva.
    • PROJECT_ID: o ID do projeto atribuído à reserva. Somente informações sobre consultas contínuas em execução neste projeto são retornadas.

Ver informações de consumo de slots

Você pode usar as visualizações ASSIGNMENTS, RESERVATIONS e JOBS_TIMELINE para receber informações de consumo do slot de consulta contínua.

Retorna informações de consumo de slots para consultas contínuas:

  1. No console Google Cloud , acesse a página BigQuery.

    Acessar o BigQuery

  2. No Editor de consultas, execute esta consulta:

    SELECT
      jobs.period_start,
      reservation.reservation_name,
      reservation.slot_capacity,
      SUM(jobs.period_slot_ms) / 1000 AS consumed_total_slots
    FROM
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.ASSIGNMENTS`
        AS assignment
    INNER JOIN
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.RESERVATIONS`
        AS reservation
      ON (assignment.reservation_name = reservation.reservation_name)
    INNER JOIN
      `PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.JOBS_TIMELINE` AS jobs
      ON (
        UPPER(CONCAT('ADMIN_PROJECT_ID:LOCATION.', assignment.reservation_name))
        = UPPER(jobs.reservation_id))
    WHERE
      assignment.assignee_id = 'PROJECT_ID'
      AND assignment.job_type = 'CONTINUOUS'
      AND jobs.period_start
        BETWEEN TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
        AND CURRENT_TIMESTAMP()
    GROUP BY 1, 2, 3
    ORDER BY jobs.period_start DESC;

    Substitua:

    • ADMIN_PROJECT_ID: o ID do projeto de administração que contém a reserva.
    • LOCATION: o local da reserva.
    • PROJECT_ID: o ID do projeto atribuído à reserva. Somente informações sobre consultas contínuas em execução neste projeto são retornadas.

Também é possível monitorar reservas de consulta contínua usando outras ferramentas, como Metrics Explorer e gráficos de recursos administrativos. Para mais informações, consulte Monitore as reservas do BigQuery.

Usar o gráfico de execução de consultas

Você pode usar o gráfico de execução de consulta para receber insights de desempenho e estatísticas gerais para uma consulta contínua. Para mais informações, consulte Visualizar insights de desempenho da consulta.

Visualizar o histórico de tarefas

Você pode conferir os detalhes do job de consulta contínua no seu histórico profissional pessoal ou no histórico de job do projeto. Para mais informações, consulte Ver detalhes do job.

A lista histórica de jobs é classificada pela hora de início do job. Isso significa que as consultas contínuas que estão em execução há um tempo pode não estar perto do início da lista.

Usar a análise de jobs administrativos

No explorador de jobs administrativos, filtre seus jobs para mostrar consultas contínuas definindo o filtro Categoria do job como Consulta contínua.

Usar o Cloud Monitoring

É possível conferir métricas específicas das consultas contínuas do BigQuery usando o Cloud Monitoring. Para mais informações, consulte Criar painéis, gráficos e alertas e leia sobre as métricas disponíveis para visualização.

Alerta sobre consultas com falha

Em vez de verificar rotineiramente se as consultas contínuas falharam, é útil criar um alerta para notificar você sobre a falha. Uma maneira de fazer isso é criar uma métrica com base em registros do Cloud Logging personalizada com um filtro para seus jobs e uma política de alertas do Cloud Monitoring com base nessa métrica:

  1. Ao criar uma consulta contínua, use um prefixo de ID de job personalizado. Várias consultas contínuas podem compartilhar o mesmo prefixo. Por exemplo, você pode usar o prefixo prod- para indicar uma consulta de produção.
  2. No Google Cloud console, acesse a página Métricas com base em registros.

    Acessar "Métricas com base em registros"

  3. Clique em Criar métrica. O painel Criar métrica de registros é exibido.

  4. Em Tipo de métrica, selecione Contador.

  5. Na seção Detalhes, atribua um nome à métrica. Por exemplo, CUSTOM_JOB_ID_PREFIX-metric.

  6. Na seção Seleção de filtros, insira o seguinte no editor Criar filtro:

    resource.type = "bigquery_project"
    protoPayload.resourceName : "projects/PROJECT_ID/jobs/CUSTOM_JOB_ID_PREFIX"
    severity = ERROR
    

    Substitua:

  7. Clique em Criar métrica.

  8. No menu de navegação, clique em Métricas com base em registros. A métrica que você acabou de criar aparece na lista de métricas definidas pelo usuário.

  9. Na linha da métrica, clique em Mais ações e em Criar alerta com base na métrica.

  10. Clique em Próxima. Não é necessário mudar as configurações padrão na página Modo de configuração da política.

  11. Clique em Próxima. Não é necessário mudar as configurações padrão na página Configurar acionador de alertas.

  12. Selecione os canais de notificação e insira um nome para a política de alertas.

  13. Clique em Criar política.

Para testar seu alerta, execute uma consulta contínua com o prefixo de ID do job personalizado selecionado e cancele-o. Pode levar alguns minutos para que o alerta chegue ao seu canal de notificação.

Tentar novamente consultas com falha

Tentar novamente uma consulta contínua com falha pode ajudar a evitar situações em que um pipeline contínuo fica inativo por um longo período ou exige intervenção humana para ser reiniciado. Ao tentar de novo uma consulta contínua com falha, considere o seguinte:

  • Se é aceitável reprocessar uma quantidade de dados processados pela consulta anterior antes da falha.
  • Como lidar com a limitação de novas tentativas ou usar a espera exponencial.

Uma abordagem possível para automatizar a repetição de consultas é a seguinte:

  1. Crie um coletor do Cloud Logging com base em um filtro de inclusão que corresponda aos seguintes critérios para encaminhar registros a um tópico do Pub/Sub:

    resource.type = "bigquery_project"
    protoPayload.resourceName : "projects/PROJECT_ID/jobs/CUSTOM_JOB_ID_PREFIX"
    severity = ERROR
    

    Substitua:

  2. Crie uma função do Cloud Run que seja acionada em resposta ao Pub/Sub que recebe registros correspondentes ao seu filtro.

    A função do Cloud Run pode aceitar a carga útil de dados da mensagem do Pub/Sub e tentar iniciar uma nova consulta contínua usando a mesma sintaxe SQL da consulta com falha, mas no início, logo após a interrupção do job anterior.

Por exemplo, é possível usar uma função semelhante a esta:

Python

Antes de testar esta amostra, siga as instruções de configuração do Python no Guia de início rápido do BigQuery: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API BigQuery em Python.

Para autenticar no BigQuery, configure o Application Default Credentials. Para mais informações, acesse Configurar a autenticação para bibliotecas de cliente.

import base64
import json
import logging
import re
import uuid

import google.auth
import google.auth.transport.requests
import requests


def retry_continuous_query(event, context):
    logging.info("Cloud Function started.")

    if "data" not in event:
        logging.info("No data in Pub/Sub message.")
        return

    try:
        # Decode and parse the Pub/Sub message data
        log_entry = json.loads(base64.b64decode(event["data"]).decode("utf-8"))

        # Extract the SQL query and other necessary data
        proto_payload = log_entry.get("protoPayload", {})
        metadata = proto_payload.get("metadata", {})
        job_change = metadata.get("jobChange", {})
        job = job_change.get("job", {})
        job_config = job.get("jobConfig", {})
        query_config = job_config.get("queryConfig", {})
        sql_query = query_config.get("query")
        job_stats = job.get("jobStats", {})
        end_timestamp = job_stats.get("endTime")
        failed_job_id = job.get("jobName")

        # Check if required fields are missing
        if not all([sql_query, failed_job_id, end_timestamp]):
            logging.error("Required fields missing from log entry.")
            return

        logging.info(f"Retrying failed job: {failed_job_id}")

        # Adjust the timestamp in the SQL query
        timestamp_match = re.search(
            r"\s*TIMESTAMP\(('.*?')\)(\s*\+ INTERVAL 1 MICROSECOND)?", sql_query
        )

        if timestamp_match:
            original_timestamp = timestamp_match.group(1)
            new_timestamp = f"'{end_timestamp}'"
            sql_query = sql_query.replace(original_timestamp, new_timestamp)
        elif "CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE" in sql_query:
            new_timestamp = f"TIMESTAMP('{end_timestamp}') + INTERVAL 1 MICROSECOND"
            sql_query = sql_query.replace(
                "CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE", new_timestamp
            )

        # Get access token
        credentials, project = google.auth.default(
            scopes=["https://www.googleapis.com/auth/cloud-platform"]
        )
        request = google.auth.transport.requests.Request()
        credentials.refresh(request)
        access_token = credentials.token

        # API endpoint
        url = f"https://bigquery.googleapis.com/bigquery/v2/projects/{project}/jobs"

        # Request headers
        headers = {
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "application/json",
        }

        # Generate a random UUID
        random_suffix = str(uuid.uuid4())[:8]  # Take the first 8 characters of the UUID

        # Combine the prefix and random suffix
        job_id = f"CUSTOM_JOB_ID_PREFIX{random_suffix}"

        # Request payload
        data = {
            "configuration": {
                "query": {
                    "query": sql_query,
                    "useLegacySql": False,
                    "continuous": True,
                    "connectionProperties": [
                        {"key": "service_account", "value": "SERVICE_ACCOUNT"}
                    ],
                    # ... other query parameters ...
                },
                "labels": {"bqux_job_id_prefix": "CUSTOM_JOB_ID_PREFIX"},
            },
            "jobReference": {
                "projectId": project,
                "jobId": job_id,  # Use the generated job ID here
            },
        }

        # Make the API request
        response = requests.post(url, headers=headers, json=data)

        # Handle the response
        if response.status_code == 200:
            logging.info("Query job successfully created.")
        else:
            logging.error(f"Error creating query job: {response.text}")

    except Exception as e:
        logging.error(
            f"Error processing log entry or retrying query: {e}", exc_info=True
        )

    logging.info("Cloud Function finished.")

A seguir