מחבר Hadoop BigQuery מותקן כברירת מחדל בכל הצמתים של אשכול Dataproc 1.0-1.2 בתיקייה /usr/lib/hadoop/lib/.
היא זמינה בסביבות Spark ו-PySpark.
גרסאות Dataproc image 1.5 ומעלה: מחבר BigQuery לא מותקן כברירת מחדל ב-Dataproc בגרסאות image 1.5 ומעלה. כדי להשתמש בה עם הגרסאות האלה:
מתקינים את המחבר של BigQuery באמצעות פעולת האתחול הזו.
מציינים את מחבר BigQuery בפרמטר
jarsכששולחים עבודה:--jars=gs://hadoop-lib/bigquery/bigquery-connector-hadoop3-latest.jarכוללים את המחלקות של מחבר BigQuery ב-jar-with-dependencies של האפליקציה.
כדי למנוע התנגשויות: אם האפליקציה שלכם משתמשת בגרסת מחבר ששונה מגרסת המחבר שפריסתם באשכול Dataproc, אתם צריכים:
יוצרים אשכול חדש עם פעולת אתחול שמתקינה את גרסת המחבר שבה האפליקציה משתמשת, או
כדי למנוע התנגשות בין גרסת המחבר שלכם לבין גרסת המחבר שפריסתם באשכול Dataproc, צריך לכלול את מחלקות המחבר ואת התלויות של המחבר בגרסה שבה אתם משתמשים בקובץ ה-JAR של האפליקציה שלכם ולהעביר אותן (ראו דוגמה להעברת תלויות ב-Maven).
המחלקות GsonBigQueryInputFormat
GsonBigQueryInputFormat מספק ל-Hadoop את האובייקטים של BigQuery בפורמט JsonObject באמצעות הפעולות העיקריות הבאות:
- שימוש בשאילתה שצוינה על ידי המשתמש כדי לבחור אובייקטים ב-BigQuery
- פיצול התוצאות של השאילתה באופן שווה בין צמתי Hadoop
- ניתוח החלוקות לאובייקטים של Java כדי להעביר אותם ל-Mapper.
המחלקת Hadoop Mapper מקבלת ייצוג
JsonObjectשל כל אובייקט BigQuery שנבחר.
הסיווג BigQueryInputFormat מספק גישה לרשומות BigQuery באמצעות הרחבה של הסיווג InputFormat של Hadoop. כדי להשתמש במחלקה BigQueryInputFormat:
צריך להוסיף שורות למשימת Hadoop הראשית כדי להגדיר פרמטרים בהגדרות של Hadoop.
צריך להגדיר את המחלקה InputFormat בתור
GsonBigQueryInputFormat.
בקטעים הבאים מוסבר איך לעמוד בדרישות האלה.
פרמטרים של קלט
- QualifiedInputTableId
- הטבלה ב-BigQuery שממנה קוראים, בתבנית:
optional-projectId:datasetId.tableId
דוגמה:publicdata:samples.shakespeare - projectId
- מזהה הפרויקט ב-BigQuery שבו מתבצעות כל פעולות הקלט.
לדוגמה:my-first-cloud-project
// Set the job-level projectId. conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId); // Configure input parameters. BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId); // Set InputFormat. job.setInputFormatClass(GsonBigQueryInputFormat.class);
הערות:
-
jobמתייחס ל-org.apache.hadoop.mapreduce.Job, משימת Hadoop להפעלה. -
confמתייחס ל-org.apache.hadoop.Configurationשל משימת Hadoop.
Mapper
המחלקות GsonBigQueryInputFormat קוראות מ-BigQuery ומעבירות אובייקטים של BigQuery אחד בכל פעם כקלט לפונקציה Mapper של Hadoop. הקלט הוא זוג שמורכב מהרכיבים הבאים:
LongWritable, מספר הרשומה-
JsonObject, הרשומה ב-BigQuery בפורמט JSON
הפונקציה Mapper מקבלת את LongWritable ואת JsonObject pair כקלט.
הנה קטע קוד מתוך Mapper של משימת WordCount לדוגמה.
// private static final LongWritable ONE = new LongWritable(1); // The configuration key used to specify the BigQuery field name // ("column name"). public static final String WORDCOUNT_WORD_FIELDNAME_KEY = "mapred.bq.samples.wordcount.word.key"; // Default value for the configuration entry specified by // WORDCOUNT_WORD_FIELDNAME_KEY. Examples: 'word' in // publicdata:samples.shakespeare or 'repository_name' // in publicdata:samples.github_timeline. public static final String WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT = "word"; /** * The mapper function for WordCount. */ public static class Map extends Mapper <LongWritable, JsonObject, Text, LongWritable> { private static final LongWritable ONE = new LongWritable(1); private Text word = new Text(); private String wordKey; @Override public void setup(Context context) throws IOException, InterruptedException { // Find the runtime-configured key for the field name we're looking for // in the map task. Configuration conf = context.getConfiguration(); wordKey = conf.get(WORDCOUNT_WORD_FIELDNAME_KEY, WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT); } @Override public void map(LongWritable key, JsonObject value, Context context) throws IOException, InterruptedException { JsonElement countElement = value.get(wordKey); if (countElement != null) { String wordInRecord = countElement.getAsString(); word.set(wordInRecord); // Write out the key, value pair (write out a value of 1, which will be // added to the total count for this word in the Reducer). context.write(word, ONE); } } }
IndirectBigQueryOutputFormat class
IndirectBigQueryOutputFormat מאפשר ל-Hadoop לכתוב ערכים של JsonObject ישירות לטבלה ב-BigQuery. המחלקה הזו מספקת גישה לרשומות של BigQuery באמצעות הרחבה של המחלקה OutputFormat של Hadoop. כדי להשתמש בו בצורה נכונה, צריך להגדיר כמה פרמטרים בהגדרות של Hadoop, ולהגדיר את המחלקה OutputFormat ל-IndirectBigQueryOutputFormat. בדוגמה הבאה מוצגים הפרמטרים שצריך להגדיר ושורות הקוד שנדרשות כדי להשתמש ב-IndirectBigQueryOutputFormat בצורה נכונה.
פרמטרים של פלט
- projectId
- מזהה הפרויקט ב-BigQuery שבו מתבצעות כל פעולות הפלט.
דוגמה: "my-first-cloud-project" - QualifiedOutputTableId
- מערך הנתונים ב-BigQuery שאליו ייכתבו התוצאות הסופיות של העבודה, בפורמט optional-projectId:datasetId.tableId.optional-projectId המזהה datasetId צריך להיות קיים כבר בפרויקט.
מערך הנתונים outputDatasetId_hadoop_temporary ייצור ב-BigQuery לתוצאות זמניות. חשוב לוודא שאין התנגשות עם מערך נתונים קיים.
דוגמאות:
test_output_dataset.wordcount_output
my-first-cloud-project:test_output_dataset.wordcount_output - outputTableFieldSchema
- סכימה שמגדירה את הסכימה של טבלת BigQuery של הפלט
- GcsOutputPath
- נתיב הפלט לאחסון נתוני Cloud Storage זמניים (
gs://bucket/dir/)
// Define the schema we will be using for the output BigQuery table. List<TableFieldSchema> outputTableFieldSchema = new ArrayList<TableFieldSchema>(); outputTableFieldSchema.add(new TableFieldSchema().setName("Word").setType("STRING")); outputTableFieldSchema.add(new TableFieldSchema().setName("Count").setType("INTEGER")); TableSchema outputSchema = new TableSchema().setFields(outputTableFieldSchema); // Create the job and get its configuration. Job job = new Job(parser.getConfiguration(), "wordcount"); Configuration conf = job.getConfiguration(); // Set the job-level projectId. conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId); // Configure input. BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId); // Configure output. BigQueryOutputConfiguration.configure( conf, outputQualifiedTableId, outputSchema, outputGcsPath, BigQueryFileFormat.NEWLINE_DELIMITED_JSON, TextOutputFormat.class); // (Optional) Configure the KMS key used to encrypt the output table. BigQueryOutputConfiguration.setKmsKeyName( conf, "projects/myproject/locations/us-west1/keyRings/r1/cryptoKeys/k1"); );
מפחית
הסיווג IndirectBigQueryOutputFormat כותב ל-BigQuery.
הפונקציה מקבלת מפתח וערך JsonObject כקלט, וכותבת רק את הערך של JsonObject ל-BigQuery (המפתח מתעלם). המחרוזת JsonObject צריכה להכיל רשומה ב-BigQuery בפורמט JSON. הפלט של Reducer צריך להיות צמד של מפתח מכל סוג (NullWritable משמש בעבודת WordCount לדוגמה) וערך JsonObject. בהמשך מוצג ה-Reducer של עבודת WordCount לדוגמה.
/** * Reducer function for WordCount. */ public static class Reduce extends Reducer<Text, LongWritable, JsonObject, NullWritable> { @Override public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { // Add up the values to get a total number of occurrences of our word. long count = 0; for (LongWritable val : values) { count = count + val.get(); } JsonObject jsonObject = new JsonObject(); jsonObject.addProperty("Word", key.toString()); jsonObject.addProperty("Count", count); // Key does not matter. context.write(jsonObject, NullWritable.get()); } }
הסרת המשאבים
אחרי שהמשימה מסתיימת, מנקים את נתיבי הייצוא ב-Cloud Storage.
job.waitForCompletion(true); GsonBigQueryInputFormat.cleanupJob(job.getConfiguration(), job.getJobID());
אפשר לראות את ספירת המילים בטבלת הפלט של BigQuery במסוףGoogle Cloud .
קוד מלא לדוגמה של עבודת WordCount
הקוד שבהמשך הוא דוגמה לעבודת WordCount פשוטה שמצטברת מספירות מילים מאובייקטים ב-BigQuery.
package com.google.cloud.hadoop.io.bigquery.samples;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat;
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration;
import com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Sample program to run the Hadoop Wordcount example over tables in BigQuery.
*/
public class WordCount {
// The configuration key used to specify the BigQuery field name
// ("column name").
public static final String WORDCOUNT_WORD_FIELDNAME_KEY =
"mapred.bq.samples.wordcount.word.key";
// Default value for the configuration entry specified by
// WORDCOUNT_WORD_FIELDNAME_KEY. Examples: 'word' in
// publicdata:samples.shakespeare or 'repository_name'
// in publicdata:samples.github_timeline.
public static final String WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT = "word";
// Guava might not be available, so define a null / empty helper:
private static boolean isStringNullOrEmpty(String toTest) {
return toTest == null || "".equals(toTest);
}
/**
* The mapper function for WordCount. For input, it consumes a LongWritable
* and JsonObject as the key and value. These correspond to a row identifier
* and Json representation of the row's values/columns.
* For output, it produces Text and a LongWritable as the key and value.
* These correspond to the word and a count for the number of times it has
* occurred.
*/
public static class Map
extends Mapper <LongWritable, JsonObject, Text, LongWritable> {
private static final LongWritable ONE = new LongWritable(1);
private Text word = new Text();
private String wordKey;
@Override
public void setup(Context context)
throws IOException, InterruptedException {
// Find the runtime-configured key for the field name we're looking for in
// the map task.
Configuration conf = context.getConfiguration();
wordKey = conf.get(WORDCOUNT_WORD_FIELDNAME_KEY, WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT);
}
@Override
public void map(LongWritable key, JsonObject value, Context context)
throws IOException, InterruptedException {
JsonElement countElement = value.get(wordKey);
if (countElement != null) {
String wordInRecord = countElement.getAsString();
word.set(wordInRecord);
// Write out the key, value pair (write out a value of 1, which will be
// added to the total count for this word in the Reducer).
context.write(word, ONE);
}
}
}
/**
* Reducer function for WordCount. For input, it consumes the Text and
* LongWritable that the mapper produced. For output, it produces a JsonObject
* and NullWritable. The JsonObject represents the data that will be
* loaded into BigQuery.
*/
public static class Reduce
extends Reducer<Text, LongWritable, JsonObject, NullWritable> {
@Override
public void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
// Add up the values to get a total number of occurrences of our word.
long count = 0;
for (LongWritable val : values) {
count = count + val.get();
}
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("Word", key.toString());
jsonObject.addProperty("Count", count);
// Key does not matter.
context.write(jsonObject, NullWritable.get());
}
}
/**
* Configures and runs the main Hadoop job. Takes a String[] of 5 parameters:
* [ProjectId] [QualifiedInputTableId] [InputTableFieldName]
* [QualifiedOutputTableId] [GcsOutputPath]
*
* ProjectId - Project under which to issue the BigQuery
* operations. Also serves as the default project for table IDs that don't
* specify a project for the table.
*
* QualifiedInputTableId - Input table ID of the form
* (Optional ProjectId):[DatasetId].[TableId]
*
* InputTableFieldName - Name of the field to count in the
* input table, e.g., 'word' in publicdata:samples.shakespeare or
* 'repository_name' in publicdata:samples.github_timeline.
*
* QualifiedOutputTableId - Input table ID of the form
* (Optional ProjectId):[DatasetId].[TableId]
*
* GcsOutputPath - The output path to store temporary
* Cloud Storage data, e.g., gs://bucket/dir/
*
* @param args a String[] containing ProjectId, QualifiedInputTableId,
* InputTableFieldName, QualifiedOutputTableId, and GcsOutputPath.
* @throws IOException on IO Error.
* @throws InterruptedException on Interrupt.
* @throws ClassNotFoundException if not all classes are present.
*/
public static void main(String[] args)
throws IOException, InterruptedException, ClassNotFoundException {
// GenericOptionsParser is a utility to parse command line arguments
// generic to the Hadoop framework. This example doesn't cover the specifics,
// but recognizes several standard command line arguments, enabling
// applications to easily specify a NameNode, a ResourceManager, additional
// configuration resources, etc.
GenericOptionsParser parser = new GenericOptionsParser(args);
args = parser.getRemainingArgs();
// Make sure we have the right parameters.
if (args.length != 5) {
System.out.println(
"Usage: hadoop jar bigquery_wordcount.jar [ProjectId] [QualifiedInputTableId] "
+ "[InputTableFieldName] [QualifiedOutputTableId] [GcsOutputPath]\n"
+ " ProjectId - Project under which to issue the BigQuery operations. Also serves "
+ "as the default project for table IDs that don't explicitly specify a project for "
+ "the table.\n"
+ " QualifiedInputTableId - Input table ID of the form "
+ "(Optional ProjectId):[DatasetId].[TableId]\n"
+ " InputTableFieldName - Name of the field to count in the input table, e.g., "
+ "'word' in publicdata:samples.shakespeare or 'repository_name' in "
+ "publicdata:samples.github_timeline.\n"
+ " QualifiedOutputTableId - Input table ID of the form "
+ "(Optional ProjectId):[DatasetId].[TableId]\n"
+ " GcsOutputPath - The output path to store temporary Cloud Storage data, e.g., "
+ "gs://bucket/dir/");
System.exit(1);
}
// Get the individual parameters from the command line.
String projectId = args[0];
String inputQualifiedTableId = args[1];
String inputTableFieldId = args[2];
String outputQualifiedTableId = args[3];
String outputGcsPath = args[4];
// Define the schema we will be using for the output BigQuery table.
List<TableFieldSchema> outputTableFieldSchema = new ArrayList<TableFieldSchema>();
outputTableFieldSchema.add(new TableFieldSchema().setName("Word").setType("STRING"));
outputTableFieldSchema.add(new TableFieldSchema().setName("Count").setType("INTEGER"));
TableSchema outputSchema = new TableSchema().setFields(outputTableFieldSchema);
// Create the job and get its configuration.
Job job = new Job(parser.getConfiguration(), "wordcount");
Configuration conf = job.getConfiguration();
// Set the job-level projectId.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);
// Configure input.
BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);
// Configure output.
BigQueryOutputConfiguration.configure(
conf,
outputQualifiedTableId,
outputSchema,
outputGcsPath,
BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
TextOutputFormat.class);
// (Optional) Configure the KMS key used to encrypt the output table.
BigQueryOutputConfiguration.setKmsKeyName(
conf,
"projects/myproject/locations/us-west1/keyRings/r1/cryptoKeys/k1");
conf.set(WORDCOUNT_WORD_FIELDNAME_KEY, inputTableFieldId);
// This helps Hadoop identify the Jar which contains the mapper and reducer
// by specifying a class in that Jar. This is required if the jar is being
// passed on the command line to Hadoop.
job.setJarByClass(WordCount.class);
// Tell the job what data the mapper will output.
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(GsonBigQueryInputFormat.class);
// Instead of using BigQueryOutputFormat, we use the newer
// IndirectBigQueryOutputFormat, which works by first buffering all the data
// into a Cloud Storage temporary file, and then on commitJob, copies all data from
// Cloud Storage into BigQuery in one operation. Its use is recommended for large jobs
// since it only requires one BigQuery "load" job per Hadoop/Spark job, as
// compared to BigQueryOutputFormat, which performs one BigQuery job for each
// Hadoop/Spark task.
job.setOutputFormatClass(IndirectBigQueryOutputFormat.class);
job.waitForCompletion(true);
// After the job completes, clean up the Cloud Storage export paths.
GsonBigQueryInputFormat.cleanupJob(job.getConfiguration(), job.getJobID());
// You can view word counts in the BigQuery output table at
// https://console.cloud.google.com/.
}
}גרסת Java
המחבר של BigQuery דורש Java 8.
מידע על תלות ב-Apache Maven
<dependency> <groupId>com.google.cloud.bigdataoss</groupId> <artifactId>bigquery-connector</artifactId> <version>insert "hadoopX-X.X.X" connector version number here</version> </dependency>
מידע מפורט זמין בהערות על הגרסה של מחבר BigQuery ובהפניה ל-Javadoc.