Job de MapReduce do Hadoop com o Bigtable

Neste exemplo, usamos o Hadoop para executar um job de MapReduce simples que conta o número de vezes que uma palavra aparece em um arquivo de texto. O job de MapReduce usa o Bigtable para armazenar os resultados da operação de mapeamento. O código deste exemplo está no repositório do GitHub GoogleCloudPlatform/cloud-bigtable-examples, no diretório java/dataproc-wordcount.

Configurar a autenticação

Para usar os exemplos do Java nesta página em um ambiente de desenvolvimento local, instale e inicialize a CLI gcloud e configure o Application Default Credentials com suas credenciais de usuário.

  1. Instale a CLI do Google Cloud.

  2. Ao usar um provedor de identidade (IdP) externo, primeiro faça login na gcloud CLI com sua identidade federada.

  3. Se você estiver usando um shell local, crie credenciais de autenticação local para sua conta de usuário:

    gcloud auth application-default login

    Não é necessário fazer isso se você estiver usando o Cloud Shell.

    Se um erro de autenticação for retornado e você estiver usando um provedor de identidade (IdP) externo, confirme se você fez login na CLI gcloud com sua identidade federada.

Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

Visão geral do exemplo de código

O exemplo de código oferece uma interface da linha de comando simples que usa um ou mais arquivos de texto e um nome de tabela como entrada, localiza todas as palavras que aparecem no arquivo e conta quantas vezes cada palavra aparece. A lógica do MapReduce aparece na classe WordCountHBase.

Primeiro, um mapeador tokeniza o conteúdo do arquivo de texto e gera pares de chave-valor, em que a chave é uma palavra do arquivo de texto e o valor é 1:

public static class TokenizerMapper extends
    Mapper<Object, Text, ImmutableBytesWritable, IntWritable> {

  private final static IntWritable one = new IntWritable(1);

  @Override
  public void map(Object key, Text value, Context context) throws IOException,
      InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    ImmutableBytesWritable word = new ImmutableBytesWritable();
    while (itr.hasMoreTokens()) {
      word.set(Bytes.toBytes(itr.nextToken()));
      context.write(word, one);
    }
  }
}

Com um redutor, são somados os valores para cada chave e gravados os resultados em uma tabela do Cloud Bigtable que você especificou. Cada chave de linha é uma palavra do arquivo de texto. Cada linha contém uma coluna cf:count, que contém o número de vezes que a chave de linha aparece no arquivo de texto.

public static class MyTableReducer extends
    TableReducer<ImmutableBytesWritable, IntWritable, ImmutableBytesWritable> {

  @Override
  public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {
    int sum = sum(values);
    Put put = new Put(key.get());
    put.addColumn(COLUMN_FAMILY, COUNT_COLUMN_NAME, Bytes.toBytes(sum));
    context.write(null, put);
  }

  public int sum(Iterable<IntWritable> values) {
    int i = 0;
    for (IntWritable val : values) {
      i += val.get();
    }
    return i;
  }
}