Este documento descreve como escrever dados do Dataflow para o BigQuery.
Vista geral
Para a maioria dos exemplos de utilização, considere usar a opção Managed I/O para escrever no BigQuery. A E/S gerida oferece funcionalidades como atualizações automáticas e uma API de configuração consistente. Quando escreve no BigQuery, a E/S gerida escolhe automaticamente o melhor método de escrita para tarefas em lote ou de streaming.
Se precisar de uma otimização do desempenho mais avançada, considere usar o conector BigQueryIO. Para mais informações, consulte a secção
Use o conector BigQueryIO neste documento.
Desempenho
A tabela seguinte mostra as métricas de desempenho para várias cargas de trabalho. Estas cargas de trabalho foram executadas num e2-standard2 worker, usando o SDK do Apache Beam 2.49.0 para Java. Não usaram o Runner v2.
| 100 M de registos | 1 kB | 1 coluna | Débito (bytes) | Débito (elementos) |
|---|---|---|
| Escrita de armazenamento | 55 MBps | 54 000 elementos por segundo |
| Carregamento Avro | 78 MBps | 77 000 elementos por segundo |
| Json Load | 54 MBps | 53 000 elementos por segundo |
Estas métricas baseiam-se em pipelines de processamento em lote simples. Destinam-se a comparar o desempenho entre conectores de E/S e não são necessariamente representativos de pipelines do mundo real. O desempenho do pipeline do Dataflow é complexo e é uma função do tipo de VM, dos dados que estão a ser processados, do desempenho das origens e dos destinos externos, e do código do utilizador. As métricas baseiam-se na execução do SDK Java e não são representativas das características de desempenho de outros SDKs de idiomas. Para mais informações, consulte o artigo Desempenho do Beam IO.
Use o conetor BigQueryIO
O conetor BigQuery I/O suporta os seguintes métodos para escrever no BigQuery:
STORAGE_WRITE_API. Neste modo, o conector faz escritas diretas no armazenamento do BigQuery, usando a API BigQuery Storage Write. A API Storage Write combina o carregamento de streaming e o carregamento em lote numa única API de alto desempenho. Este modo garante a semântica de exatamente uma vez.STORAGE_API_AT_LEAST_ONCE. Este modo também usa a API Storage Write, mas oferece semântica pelo menos uma vez. Este modo resulta numa latência mais baixa para a maioria dos pipelines. No entanto, é possível fazer escritas duplicadas.FILE_LOADS. Neste modo, o conetor escreve os dados de entrada em ficheiros de preparação no Cloud Storage. Em seguida, executa uma tarefa de carregamento do BigQuery para carregar os dados para o BigQuery. O modo é a predefinição para os conjuntos de dados delimitadosPCollections, que são mais comuns em pipelines de processamento em lote.STREAMING_INSERTS. Neste modo, o conetor usa a API Legacy Streaming. Este modo é o predefinido paraPCollectionsilimitados, mas não é recomendado para novos projetos.
Ao escolher um método de escrita, considere os seguintes pontos:
- Para tarefas de streaming, considere usar
STORAGE_WRITE_APIouSTORAGE_API_AT_LEAST_ONCE, porque estes modos escrevem diretamente no armazenamento do BigQuery, sem usar ficheiros de preparação intermédios. - Se executar o pipeline usando o modo de streaming at-least-once, defina o modo de escrita como
STORAGE_API_AT_LEAST_ONCE. Esta definição é mais eficiente e corresponde à semântica do modo de streaming, pelo menos, uma vez. - O carregamento de ficheiros e a API Storage Write têm quotas e limites diferentes.
- As tarefas de carregamento usam o conjunto de slots do BigQuery partilhado ou slots reservados. Para usar ranhuras reservadas, execute a tarefa de carregamento num projeto com uma atribuição de reserva do tipo
PIPELINE. As tarefas de carregamento são gratuitas se usar o conjunto de slots do BigQuery partilhado. No entanto, o BigQuery não faz garantias sobre a capacidade disponível do conjunto partilhado. Para mais informações, consulte o artigo Introdução às reservas.
Paralelismo
Para o
FILE_LOADSe oSTORAGE_WRITE_APIem pipelines de streaming, o conetor fragmenta os dados num número de ficheiros ou streams. Em geral, recomendamos que chamewithAutoShardingpara ativar a divisão automática.Para
FILE_LOADSem pipelines em lote, o conetor escreve dados em ficheiros particionados, que são, em seguida, carregados em paralelo para o BigQuery.Para
STORAGE_WRITE_APIem pipelines de processamento em lote, cada trabalhador cria um ou mais streams para escrever no BigQuery, determinado pelo número total de fragmentos.Para
STORAGE_API_AT_LEAST_ONCE, existe uma única stream de gravação predefinida. Vários trabalhadores são anexados a esta stream.
Práticas recomendadas
A API Storage Write tem limites de quota. O conector processa estes limites para a maioria dos pipelines. No entanto, alguns cenários podem esgotar as streams da API Storage Write disponíveis. Por exemplo, este problema pode ocorrer num pipeline que usa a divisão automática e a escala automática com um grande número de destinos, especialmente em tarefas de longa duração com cargas de trabalho altamente variáveis. Se este problema ocorrer, considere usar
STORAGE_WRITE_API_AT_LEAST_ONCE, o que evita o problema.Use as métricas da Google Cloud Platform para monitorizar a utilização da quota da API Storage Write.
Quando usa carregamentos de ficheiros, o Avro tem normalmente um desempenho superior ao JSON. Para usar o Avro, ligue para
withAvroFormatFunction.Por predefinição, as tarefas de carregamento são executadas no mesmo projeto que a tarefa do Dataflow. Para especificar um projeto diferente, chame
withLoadJobProjectId.Quando usar o SDK Java, considere criar uma classe que represente o esquema da tabela do BigQuery. Em seguida, chame
useBeamSchemano seu pipeline para converter automaticamente entre os tiposRowdo Apache Beam eTableRowdo BigQuery. Para ver um exemplo de uma classe de esquema, consulteExampleModel.java.Se carregar tabelas com esquemas complexos que contenham milhares de campos, considere chamar
withMaxBytesPerPartitionpara definir um tamanho máximo mais pequeno para cada tarefa de carregamento.Por predefinição, o
BigQueryIOusa definições da API Storage Write que são razoáveis para a maioria dos pipelines. No entanto, se tiver problemas de desempenho, pode definir opções de pipeline para ajustar estas definições. Para mais informações, consulte o artigo Ajuste a API Storage Write na documentação do Apache Beam.
Pipelines de streaming
As seguintes recomendações aplicam-se a pipelines de streaming.
Para pipelines de streaming, recomendamos que use a API Storage Write (
STORAGE_WRITE_APIouSTORAGE_API_AT_LEAST_ONCE).Um pipeline de streaming pode usar carregamentos de ficheiros, mas esta abordagem tem desvantagens:
- Requer janelas para escrever os ficheiros. Não pode usar a janela global.
- O BigQuery carrega ficheiros com base no melhor esforço quando usa o conjunto de capacidade de processamento partilhado. Pode haver um atraso significativo entre o momento em que um registo é escrito e o momento em que fica disponível no BigQuery.
- Se uma tarefa de carregamento falhar, por exemplo, devido a dados incorretos ou a uma incompatibilidade de esquema, toda a pipeline falha.
Considere usar
STORAGE_WRITE_API_AT_LEAST_ONCEsempre que possível. Pode resultar na escrita de registos duplicados no BigQuery, mas é menos dispensioso e mais escalável do queSTORAGE_WRITE_API.Em geral, evite usar
STREAMING_INSERTS. As inserções de streaming são mais caras do que a API Storage Write e não têm um desempenho tão bom.A divisão de dados pode melhorar o desempenho nos pipelines de streaming. Para a maioria dos pipelines, a divisão automática é um bom ponto de partida. No entanto, pode ajustar a divisão da seguinte forma:
- Para
STORAGE_WRITE_API, chamewithNumStorageWriteApiStreamspara definir o número de streams de escrita. - Para
FILE_LOADS, ligue parawithNumFileShardspara definir o número de fragmentos de ficheiros.
- Para
Se usar inserções de streaming, recomendamos que defina
retryTransientErrorscomo a política de repetição.
Pipelines em lote
As seguintes recomendações aplicam-se a pipelines em lote.
Para a maioria dos pipelines de processamento em lote grandes, recomendamos que experimente primeiro o
FILE_LOADS. Um pipeline de lotes pode usarSTORAGE_WRITE_API, mas é provável que exceda os limites de quota em grande escala (mais de 1000 vCPUs) ou se estiverem a ser executados pipelines concorrentes. O Apache Beam não limita o número máximo de streams de escrita para tarefas em lote, pelo que a tarefa acaba por atingir os limites da API BigQuery Storage.STORAGE_WRITE_APIQuando usa o
FILE_LOADS, pode esgotar o conjunto de slots do BigQuery partilhado ou o seu conjunto de slots reservados. Se encontrar este tipo de falha, experimente as seguintes abordagens:- Reduza o número máximo de trabalhadores ou o tamanho dos trabalhadores para a tarefa.
- Compre mais horários reservados.
- Considere usar
STORAGE_WRITE_API.
Os pipelines pequenos a médios (<1000 vCPUs) podem beneficiar da utilização de
STORAGE_WRITE_API. Para estas tarefas mais pequenas, pondere usar oSTORAGE_WRITE_APIse quiser uma fila de mensagens rejeitadas ou quando o conjunto de slots partilhadosFILE_LOADSnão for suficiente.Se puder tolerar dados duplicados, considere usar o elemento
STORAGE_WRITE_API_AT_LEAST_ONCE. Este modo pode resultar na gravação de registos duplicados no BigQuery, mas pode ser menos dispendioso do que a opçãoSTORAGE_WRITE_API.Os diferentes modos de escrita podem ter um desempenho diferente com base nas características do seu pipeline. Faça experiências para encontrar o melhor modo de gravação para a sua carga de trabalho.
Processe erros ao nível da linha
Esta secção descreve como processar erros que podem ocorrer ao nível da linha, por exemplo, devido a dados de entrada mal formados ou incompatibilidades de esquemas.
Para a API Storage Write, todas as linhas que não podem ser escritas são colocadas
numa tabela PCollection separada. Para obter esta coleção, chame getFailedStorageApiInserts no objeto WriteResult. Para ver um exemplo desta abordagem, consulte o artigo
Fazer stream de dados para o BigQuery.
É uma boa prática
enviar os erros para uma fila ou uma tabela de mensagens não entregues para processamento posterior. Para mais
informações sobre este padrão, consulte o
BigQueryIO padrão de mensagens não entregues.
Para o FILE_LOADS, se ocorrer um erro ao carregar os dados, a tarefa de carregamento falha e o pipeline gera uma exceção de tempo de execução. Pode ver o erro nos registos do Dataflow ou consultar o histórico de tarefas do BigQuery.
O conector de E/S não devolve informações sobre linhas individuais com falhas.
Para mais informações sobre a resolução de problemas de erros, consulte o artigo Erros do conetor do BigQuery.
Exemplos
Os exemplos seguintes mostram como usar o Dataflow para escrever no
BigQuery. Estes exemplos usam o conetor BigQueryIO.
Escrever numa tabela existente
O exemplo seguinte cria um pipeline em lote que escreve um PCollection<MyData> no BigQuery, onde MyData é um tipo de dados personalizado.
O método BigQueryIO.write() devolve um tipo BigQueryIO.Write<T>, que é usado para configurar a operação de escrita. Para mais informações, consulte o artigo
Escrever numa tabela
na documentação do Apache Beam. Este exemplo de código escreve numa tabela existente (CREATE_NEVER) e anexa as novas linhas à tabela (WRITE_APPEND).
Java
Para se autenticar no Dataflow, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.
Escrever numa tabela nova ou existente
O exemplo seguinte cria uma nova tabela se a tabela de destino não existir, definindo a disposição de criação como CREATE_IF_NEEDED. Quando usa esta opção, tem de fornecer um esquema da tabela. O conetor usa este esquema se criar uma nova tabela.
Java
Para se autenticar no Dataflow, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.
Transmita dados para o BigQuery
O exemplo seguinte mostra como fazer streaming de dados usando a semântica exatamente uma vez, definindo o modo de escrita como STORAGE_WRITE_API
Nem todos os pipelines de streaming requerem semântica exatamente uma vez. Por exemplo, pode remover manualmente duplicados da tabela de destino. Se a possibilidade de registos duplicados for aceitável para o seu cenário, considere usar a semântica pelo menos uma vez definindo o método de escrita como STORAGE_API_AT_LEAST_ONCE. Geralmente, este método é mais eficiente e resulta numa latência inferior para a maioria dos pipelines.
Java
Para se autenticar no Dataflow, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.
O que se segue?
- Saiba mais sobre a E/S gerida.
- Saiba mais sobre as práticas recomendadas do Pub/Sub para o BigQuery.