Konektor Hadoop BigQuery diinstal secara default di semua node cluster Managed Service untuk Apache Spark 1.0-1.2 di bagian /usr/lib/hadoop/lib/.
Konektor ini tersedia di lingkungan Spark dan PySpark.
Versi image Managed Service untuk Apache Spark 1.5+: Konektor BigQuery tidak diinstal secara default di Managed Service untuk Apache Spark versi image 1.5 dan yang lebih tinggi. Untuk menggunakannya dengan versi ini:
Instal konektor BigQuery menggunakan tindakan inisialisasi ini .
Tentukan konektor BigQuery dalam parameter
jarssaat mengirimkan tugas:--jars=gs://hadoop-lib/bigquery/bigquery-connector-hadoop3-latest.jarSertakan class konektor BigQuery dalam jar-with-dependencies aplikasi.
Untuk Menghindari Konflik: Jika aplikasi Anda menggunakan versi konektor yang berbeda dengan versi konektor yang di-deploy di cluster Managed Service untuk Apache Spark, Anda harus:
Menyertakan dan memindahkan class konektor dan dependensi konektor untuk versi yang Anda gunakan ke dalam jar aplikasi untuk menghindari konflik antara versi konektor Anda dan versi konektor yang di-deploy di cluster Managed Service untuk Apache Spark (lihat contoh pemindahan dependensi di Maven ini).
Class GsonBigQueryInputFormat
GsonBigQueryInputFormat menyediakan objek BigQuery dalam format JsonObject ke Hadoop melalui operasi utama berikut:
- Menggunakan kueri yang ditentukan pengguna untuk memilih objek BigQuery
- Membagi hasil kueri secara merata di antara node Hadoop
- Mengurai pemisahan ke dalam objek Java untuk diteruskan ke Mapper.
Class Hadoop Mapper menerima
JsonObjectrepresentasi dari setiap objek BigQuery yang dipilih.
Class BigQueryInputFormat menyediakan akses ke rekaman BigQuery melalui ekstensi class
Hadoop
InputFormat. Untuk menggunakan class BigQueryInputFormat:
Baris harus ditambahkan ke tugas Hadoop utama untuk menetapkan parameter dalam konfigurasi Hadoop.
Class InputFormat harus ditetapkan ke
GsonBigQueryInputFormat.
Bagian di bawah ini menunjukkan cara memenuhi persyaratan ini.
Parameter Input
- QualifiedInputTableId
- Tabel BigQuery yang akan dibaca, dalam bentuk:
optional-projectId:datasetId.tableId
Contoh:publicdata:samples.shakespeare - projectId
- projectId BigQuery yang menjadi tempat terjadinya semua operasi input.
Contoh:my-first-cloud-project
// Set the job-level projectId. conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId); // Configure input parameters. BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId); // Set InputFormat. job.setInputFormatClass(GsonBigQueryInputFormat.class);
Catatan:
jobmengacu padaorg.apache.hadoop.mapreduce.Job, tugas Hadoop yang akan dijalankan.confmengacu padaorg.apache.hadoop.Configurationuntuk tugas Hadoop.
Mapper
Class GsonBigQueryInputFormat membaca dari BigQuery dan meneruskan
objek BigQuery satu per satu sebagai input ke fungsi Mapper
Hadoop. Input mengambil bentuk pasangan yang terdiri dari hal berikut:
LongWritable, nomor rekamanJsonObject, rekaman BigQuery berformat Json
Mapper menerima LongWritable dan JsonObject pair sebagai input.
Berikut adalah cuplikan dari Mapper untuk contoh WordCount tugas.
// private static final LongWritable ONE = new LongWritable(1); // The configuration key used to specify the BigQuery field name // ("column name"). public static final String WORDCOUNT_WORD_FIELDNAME_KEY = "mapred.bq.samples.wordcount.word.key"; // Default value for the configuration entry specified by // WORDCOUNT_WORD_FIELDNAME_KEY. Examples: 'word' in // publicdata:samples.shakespeare or 'repository_name' // in publicdata:samples.github_timeline. public static final String WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT = "word"; /** * The mapper function for WordCount. */ public static class Map extends Mapper <LongWritable, JsonObject, Text, LongWritable> { private static final LongWritable ONE = new LongWritable(1); private Text word = new Text(); private String wordKey; @Override public void setup(Context context) throws IOException, InterruptedException { // Find the runtime-configured key for the field name we're looking for // in the map task. Configuration conf = context.getConfiguration(); wordKey = conf.get(WORDCOUNT_WORD_FIELDNAME_KEY, WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT); } @Override public void map(LongWritable key, JsonObject value, Context context) throws IOException, InterruptedException { JsonElement countElement = value.get(wordKey); if (countElement != null) { String wordInRecord = countElement.getAsString(); word.set(wordInRecord); // Write out the key, value pair (write out a value of 1, which will be // added to the total count for this word in the Reducer). context.write(word, ONE); } } }
Class IndirectBigQueryOutputFormat
IndirectBigQueryOutputFormat memberi Hadoop kemampuan untuk menulis nilai JsonObject langsung ke tabel BigQuery. Class ini menyediakan akses
ke rekaman BigQuery melalui ekstensi class Hadoop
OutputFormat. Agar dapat menggunakannya dengan benar, beberapa parameter harus ditetapkan dalam konfigurasi Hadoop, dan class OutputFormat harus ditetapkan ke IndirectBigQueryOutputFormat. Di bawah ini adalah contoh parameter yang akan ditetapkan dan baris kode yang diperlukan untuk menggunakan IndirectBigQueryOutputFormat dengan benar.
Parameter Output
- projectId
- projectId BigQuery yang menjadi tempat terjadinya semua operasi output.
Contoh: "my-first-cloud-project" - QualifiedOutputTableId
- Set data BigQuery yang akan digunakan untuk menulis hasil tugas akhir, dalam
bentuk optional-projectId:datasetId.tableId.
datasetId harus sudah ada di project Anda.
Set data outputDatasetId_hadoop_temporary akan dibuat di
BigQuery untuk hasil sementara. Pastikan hal ini tidak bertentangan
dengan set data yang ada.
Contoh:
test_output_dataset.wordcount_output
my-first-cloud-project:test_output_dataset.wordcount_output - outputTableFieldSchema
- Skema yang menentukan skema untuk tabel BigQuery output
- GcsOutputPath
- Jalur output untuk menyimpan data Cloud Storage sementara (
gs://bucket/dir/)
// Define the schema we will be using for the output BigQuery table. List<TableFieldSchema> outputTableFieldSchema = new ArrayList<TableFieldSchema>(); outputTableFieldSchema.add(new TableFieldSchema().setName("Word").setType("STRING")); outputTableFieldSchema.add(new TableFieldSchema().setName("Count").setType("INTEGER")); TableSchema outputSchema = new TableSchema().setFields(outputTableFieldSchema); // Create the job and get its configuration. Job job = new Job(parser.getConfiguration(), "wordcount"); Configuration conf = job.getConfiguration(); // Set the job-level projectId. conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId); // Configure input. BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId); // Configure output. BigQueryOutputConfiguration.configure( conf, outputQualifiedTableId, outputSchema, outputGcsPath, BigQueryFileFormat.NEWLINE_DELIMITED_JSON, TextOutputFormat.class); // (Optional) Configure the KMS key used to encrypt the output table. BigQueryOutputConfiguration.setKmsKeyName( conf, "projects/myproject/locations/us-west1/keyRings/r1/cryptoKeys/k1"); );
Pengurang
Class IndirectBigQueryOutputFormat menulis ke BigQuery.
Class ini mengambil kunci dan nilai JsonObject sebagai input dan hanya menulis nilai JsonObject ke BigQuery (kunci diabaikan). JsonObject harus berisi rekaman BigQuery berformat Json. Pengurang harus menampilkan pasangan nilai kunci dari
jenis apa pun (NullWritable digunakan dalam tugas contoh WordCount kami)
dan JsonObject. Pengurang untuk contoh tugas WordCount ditampilkan di bawah.
/** * Reducer function for WordCount. */ public static class Reduce extends Reducer<Text, LongWritable, JsonObject, NullWritable> { @Override public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { // Add up the values to get a total number of occurrences of our word. long count = 0; for (LongWritable val : values) { count = count + val.get(); } JsonObject jsonObject = new JsonObject(); jsonObject.addProperty("Word", key.toString()); jsonObject.addProperty("Count", count); // Key does not matter. context.write(jsonObject, NullWritable.get()); } }
Pembersihan
Setelah tugas selesai, bersihkan jalur ekspor Cloud Storage.
job.waitForCompletion(true); GsonBigQueryInputFormat.cleanupJob(job.getConfiguration(), job.getJobID());
Anda dapat melihat jumlah kata di tabel output BigQuery di Google Cloud konsol.
Kode Lengkap untuk contoh tugas WordCount
Kode di bawah ini adalah contoh tugas WordCount sederhana yang menggabungkan jumlah kata dari objek di BigQuery.
package com.google.cloud.hadoop.io.bigquery.samples;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat;
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration;
import com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Sample program to run the Hadoop Wordcount example over tables in BigQuery.
*/
public class WordCount {
// The configuration key used to specify the BigQuery field name
// ("column name").
public static final String WORDCOUNT_WORD_FIELDNAME_KEY =
"mapred.bq.samples.wordcount.word.key";
// Default value for the configuration entry specified by
// WORDCOUNT_WORD_FIELDNAME_KEY. Examples: 'word' in
// publicdata:samples.shakespeare or 'repository_name'
// in publicdata:samples.github_timeline.
public static final String WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT = "word";
// Guava might not be available, so define a null / empty helper:
private static boolean isStringNullOrEmpty(String toTest) {
return toTest == null || "".equals(toTest);
}
/**
* The mapper function for WordCount. For input, it consumes a LongWritable
* and JsonObject as the key and value. These correspond to a row identifier
* and Json representation of the row's values/columns.
* For output, it produces Text and a LongWritable as the key and value.
* These correspond to the word and a count for the number of times it has
* occurred.
*/
public static class Map
extends Mapper <LongWritable, JsonObject, Text, LongWritable> {
private static final LongWritable ONE = new LongWritable(1);
private Text word = new Text();
private String wordKey;
@Override
public void setup(Context context)
throws IOException, InterruptedException {
// Find the runtime-configured key for the field name we're looking for in
// the map task.
Configuration conf = context.getConfiguration();
wordKey = conf.get(WORDCOUNT_WORD_FIELDNAME_KEY, WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT);
}
@Override
public void map(LongWritable key, JsonObject value, Context context)
throws IOException, InterruptedException {
JsonElement countElement = value.get(wordKey);
if (countElement != null) {
String wordInRecord = countElement.getAsString();
word.set(wordInRecord);
// Write out the key, value pair (write out a value of 1, which will be
// added to the total count for this word in the Reducer).
context.write(word, ONE);
}
}
}
/**
* Reducer function for WordCount. For input, it consumes the Text and
* LongWritable that the mapper produced. For output, it produces a JsonObject
* and NullWritable. The JsonObject represents the data that will be
* loaded into BigQuery.
*/
public static class Reduce
extends Reducer<Text, LongWritable, JsonObject, NullWritable> {
@Override
public void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
// Add up the values to get a total number of occurrences of our word.
long count = 0;
for (LongWritable val : values) {
count = count + val.get();
}
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("Word", key.toString());
jsonObject.addProperty("Count", count);
// Key does not matter.
context.write(jsonObject, NullWritable.get());
}
}
/**
* Configures and runs the main Hadoop job. Takes a String[] of 5 parameters:
* [ProjectId] [QualifiedInputTableId] [InputTableFieldName]
* [QualifiedOutputTableId] [GcsOutputPath]
*
* ProjectId - Project under which to issue the BigQuery
* operations. Also serves as the default project for table IDs that don't
* specify a project for the table.
*
* QualifiedInputTableId - Input table ID of the form
* (Optional ProjectId):[DatasetId].[TableId]
*
* InputTableFieldName - Name of the field to count in the
* input table, e.g., 'word' in publicdata:samples.shakespeare or
* 'repository_name' in publicdata:samples.github_timeline.
*
* QualifiedOutputTableId - Input table ID of the form
* (Optional ProjectId):[DatasetId].[TableId]
*
* GcsOutputPath - The output path to store temporary
* Cloud Storage data, e.g., gs://bucket/dir/
*
* @param args a String[] containing ProjectId, QualifiedInputTableId,
* InputTableFieldName, QualifiedOutputTableId, and GcsOutputPath.
* @throws IOException on IO Error.
* @throws InterruptedException on Interrupt.
* @throws ClassNotFoundException if not all classes are present.
*/
public static void main(String[] args)
throws IOException, InterruptedException, ClassNotFoundException {
// GenericOptionsParser is a utility to parse command line arguments
// generic to the Hadoop framework. This example doesn't cover the specifics,
// but recognizes several standard command line arguments, enabling
// applications to easily specify a NameNode, a ResourceManager, additional
// configuration resources, etc.
GenericOptionsParser parser = new GenericOptionsParser(args);
args = parser.getRemainingArgs();
// Make sure we have the right parameters.
if (args.length != 5) {
System.out.println(
"Usage: hadoop jar bigquery_wordcount.jar [ProjectId] [QualifiedInputTableId] "
+ "[InputTableFieldName] [QualifiedOutputTableId] [GcsOutputPath]\n"
+ " ProjectId - Project under which to issue the BigQuery operations. Also serves "
+ "as the default project for table IDs that don't explicitly specify a project for "
+ "the table.\n"
+ " QualifiedInputTableId - Input table ID of the form "
+ "(Optional ProjectId):[DatasetId].[TableId]\n"
+ " InputTableFieldName - Name of the field to count in the input table, e.g., "
+ "'word' in publicdata:samples.shakespeare or 'repository_name' in "
+ "publicdata:samples.github_timeline.\n"
+ " QualifiedOutputTableId - Input table ID of the form "
+ "(Optional ProjectId):[DatasetId].[TableId]\n"
+ " GcsOutputPath - The output path to store temporary Cloud Storage data, e.g., "
+ "gs://bucket/dir/");
System.exit(1);
}
// Get the individual parameters from the command line.
String projectId = args[0];
String inputQualifiedTableId = args[1];
String inputTableFieldId = args[2];
String outputQualifiedTableId = args[3];
String outputGcsPath = args[4];
// Define the schema we will be using for the output BigQuery table.
List<TableFieldSchema> outputTableFieldSchema = new ArrayList<TableFieldSchema>();
outputTableFieldSchema.add(new TableFieldSchema().setName("Word").setType("STRING"));
outputTableFieldSchema.add(new TableFieldSchema().setName("Count").setType("INTEGER"));
TableSchema outputSchema = new TableSchema().setFields(outputTableFieldSchema);
// Create the job and get its configuration.
Job job = new Job(parser.getConfiguration(), "wordcount");
Configuration conf = job.getConfiguration();
// Set the job-level projectId.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);
// Configure input.
BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);
// Configure output.
BigQueryOutputConfiguration.configure(
conf,
outputQualifiedTableId,
outputSchema,
outputGcsPath,
BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
TextOutputFormat.class);
// (Optional) Configure the KMS key used to encrypt the output table.
BigQueryOutputConfiguration.setKmsKeyName(
conf,
"projects/myproject/locations/us-west1/keyRings/r1/cryptoKeys/k1");
conf.set(WORDCOUNT_WORD_FIELDNAME_KEY, inputTableFieldId);
// This helps Hadoop identify the Jar which contains the mapper and reducer
// by specifying a class in that Jar. This is required if the jar is being
// passed on the command line to Hadoop.
job.setJarByClass(WordCount.class);
// Tell the job what data the mapper will output.
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(GsonBigQueryInputFormat.class);
// Instead of using BigQueryOutputFormat, we use the newer
// IndirectBigQueryOutputFormat, which works by first buffering all the data
// into a Cloud Storage temporary file, and then on commitJob, copies all data from
// Cloud Storage into BigQuery in one operation. Its use is recommended for large jobs
// since it only requires one BigQuery "load" job per Hadoop/Spark job, as
// compared to BigQueryOutputFormat, which performs one BigQuery job for each
// Hadoop/Spark task.
job.setOutputFormatClass(IndirectBigQueryOutputFormat.class);
job.waitForCompletion(true);
// After the job completes, clean up the Cloud Storage export paths.
GsonBigQueryInputFormat.cleanupJob(job.getConfiguration(), job.getJobID());
// You can view word counts in the BigQuery output table at
// https://console.cloud.google.com/.
}
}Versi Java
Konektor BigQuery memerlukan Java 8.
Informasi Dependensi Apache Maven
<dependency> <groupId>com.google.cloud.bigdataoss</groupId> <artifactId>bigquery-connector</artifactId> <version>insert "hadoopX-X.X.X" connector version number here</version> </dependency>
Untuk mengetahui informasi mendetail, lihat catatan rilis konektor BigQuery dan referensi Javadoc.