הפעלת צינורות עיבוד נתונים של Dataflow באמצעות Managed Airflow

Managed Airflow (דור 3) | Managed Airflow (דור 2) | Managed Airflow (דור 1 מדור קודם)

בדף הזה מוסבר איך להשתמש ב-DataflowTemplateOperator כדי להפעיל צינורות של Dataflow מ-Managed Airflow. הצינור Cloud Storage Text to BigQuery הוא צינור אצווה שמאפשר להעלות קובצי טקסט שמאוחסנים ב-Cloud Storage, להמיר אותם באמצעות פונקציה מוגדרת על ידי המשתמש (UDF) ב-JavaScript שאתם מספקים, ולפלט את התוצאות ל-BigQuery.

פונקציה שהוגדרה על ידי המשתמש, קובץ קלט וסכימת JSON יועלו לקטגוריה של Cloud Storage. גרף DAG שמפנה לקבצים האלה יפעיל צינור Dataflow batch, שיחיל את הפונקציה בהגדרת המשתמש ואת קובץ סכימת ה-JSON על קובץ הקלט. לאחר מכן, התוכן הזה יועלה לטבלה ב-BigQuery

סקירה כללית

  • לפני שמתחילים את תהליך העבודה, צריך ליצור את הישויות הבאות:

    • טבלת BigQuery ריקה ממערך נתונים ריק, שתכיל את עמודות המידע הבאות: location,‏ average_temperature, ‏ month, ואופציונלית, inches_of_rain, ‏ is_current ו-latest_measurement.

    • קובץ JSON שינרמל את הנתונים מהקובץ לפורמט הנכון עבור הסכימה של טבלת BigQuery..txt אובייקט ה-JSON יכלול מערך של BigQuery Schema, שבו כל אובייקט יכיל שם עמודה, סוג קלט וציון אם זה שדה חובה.

    • קובץ קלט .txt שיכיל את הנתונים שיועלו בשיטת batch לטבלה ב-BigQuery.

    • פונקציה שמוגדרת על ידי המשתמש ונכתבת ב-JavaScript, שתמיר כל שורה בקובץ .txt למשתנים הרלוונטיים לטבלה שלנו.

    • קובץ Airflow DAG שיפנה למיקום של הקבצים האלה.

  • בשלב הבא, מעלים את הקובץ .txt, את קובץ ה-UDF‏ .js ואת קובץ הסכימה .json לקטגוריה של Cloud Storage. תצטרכו גם להעלות את ה-DAG לסביבת Managed Airflow.

  • אחרי שה-DAG מועלה, Airflow מריץ משימה ממנו. המשימה הזו תפעיל צינור Dataflow שיחיל את הפונקציה שהוגדרה על ידי המשתמש על הקובץ .txt ויעצב אותו בהתאם לסכימת ה-JSON.

  • לבסוף, הנתונים יועלו לטבלה ב-BigQuery שיצרתם קודם.

לפני שמתחילים

  • כדי לכתוב את הפונקציה המוגדרת על ידי המשתמש, צריך להכיר את JavaScript.
  • במדריך הזה, אנחנו יוצאים מנקודת הנחה שכבר יש לכם סביבת Managed Airflow. כדי ליצור סביבה, אפשר לעיין במאמר בנושא יצירת סביבה. אפשר להשתמש בכל גרסה של Managed Airflow עם המדריך הזה.
  • מפעילים את ממשקי ה-API של Managed Service for Apache Airflow,‏ Dataflow,‏ Cloud Storage ו-BigQuery.

    תפקידים שנדרשים להפעלת ממשקי API

    כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאה serviceusage.services.enable. איך מקצים תפקידים

    הפעלת ממשקי ה-API

  • חשוב לוודא שיש לכם את ההרשאות הבאות:

  • צריך לוודא שלחשבון השירות של הסביבה יש הרשאות ליצור משימות Dataflow, לגשת לקטגוריית Cloud Storage ולקרוא ולעדכן נתונים בטבלה ב-BigQuery.

יצירת טבלה ריקה ב-BigQuery עם הגדרת סכימה

יוצרים טבלה ב-BigQuery עם הגדרת סכימה. בהמשך המדריך הזה תשתמשו בהגדרת הסכימה הזו. הטבלה הזו ב-BigQuery תכיל את התוצאות של ההעלאה באצווה.

כדי ליצור טבלה ריקה עם הגדרת סכימה:

המסוף

  1. במסוף Google Cloud , עוברים לדף BigQuery:

    כניסה ל-BigQuery

  2. בחלונית הניווט, בקטע Resources, מרחיבים את הפרויקט.

  3. בחלונית הפרטים, לוחצים על יצירת מערך נתונים.

    לוחצים על הלחצן ליצירת מערך נתונים.

  4. בדף 'יצירת מערך נתונים', בקטע מזהה מערך הנתונים, נותנים שם למערך הנתונים average_weather. בכל שאר השדות משאירים את ערכי ברירת המחדל.

    ממלאים את מזהה מערך הנתונים בשם average_weather

  5. לוחצים על יצירת מערך נתונים.

  6. חוזרים לחלונית הניווט, בקטע Resources (משאבים), מרחיבים את הפרויקט. לאחר מכן, לוחצים על מערך הנתונים average_weather.

  7. בחלונית הפרטים, לוחצים על יצירת טבלה.

    לוחצים על 'יצירת טבלה'.

  8. בדף יצירת טבלה, בקטע מקור, בוחרים באפשרות טבלה ריקה.

  9. בדף יצירת טבלה, בקטע יעד:

    • בשדה Dataset name (שם מערך הנתונים), בוחרים את מערך הנתונים average_weather.

      בוחרים באפשרות Dataset (מערך נתונים) עבור מערך הנתונים average_weather

    • בשדה Table name (שם הטבלה), מזינים את השם average_weather.

    • מוודאים שההגדרה של Table type היא Native table.

  10. בקטע Schema (סכימה), מזינים את הגדרת הסכימה. אפשר להשתמש באחת מהגישות הבאות:

    • כדי להזין פרטי סכימה באופן ידני, מפעילים את האפשרות עריכה כטקסט ומזינים את סכימת הטבלה כמערך JSON. מקלידים את השדות הבאים:

      [
          {
              "name": "location",
              "type": "GEOGRAPHY",
              "mode": "REQUIRED"
          },
          {
              "name": "average_temperature",
              "type": "INTEGER",
              "mode": "REQUIRED"
          },
          {
              "name": "month",
              "type": "STRING",
              "mode": "REQUIRED"
          },
          {
              "name": "inches_of_rain",
              "type": "NUMERIC"
          },
          {
              "name": "is_current",
              "type": "BOOLEAN"
          },
          {
              "name": "latest_measurement",
              "type": "DATE"
          }
      ]
      
    • משתמשים באפשרות הוספת שדה כדי להזין את הסכימה באופן ידני:

      לוחצים על 'הוספת שדה' כדי להזין את השדות

  11. בקטע הגדרות של מחיצה ושל אשכול, משאירים את ערך ברירת המחדל, No partitioning.

  12. בקטע אפשרויות מתקדמות, בשדה הצפנה, משאירים את ערך ברירת המחדל, Google-owned and managed key.

  13. לוחצים על יצירת טבלה.

BQ

משתמשים בפקודה bq mk כדי ליצור מערך נתונים ריק וטבלה במערך הנתונים הזה.

מריצים את הפקודה הבאה כדי ליצור מערך נתונים של מזג האוויר הממוצע בעולם:

bq --location=LOCATION mk \
    --dataset PROJECT_ID:average_weather

מחליפים את מה שכתוב בשדות הבאים:

מריצים את הפקודה הבאה כדי ליצור טבלה ריקה במערך הנתונים הזה עם הגדרת הסכימה:

bq mk --table \
PROJECT_ID:average_weather.average_weather \
location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE

אחרי שיוצרים את הטבלה, אפשר לעדכן את תאריך התפוגה, התיאור והתוויות שלה. אפשר גם לשנות את הגדרת הסכימה.

Python

שומרים את הקוד הזה בשם dataflowtemplateoperator_create_dataset_and_table_helper.py מעדכנים את המשתנים בקוד כך שישקפו את הפרויקט והמיקום, ואז מפעילים פתרונות חכמים באמצעות הפקודה הבאה:

python dataflowtemplateoperator_create_dataset_and_table_helper.py

Python

כדי לבצע אימות ב-Managed Airflow, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.


# Make sure to follow the quickstart setup instructions beforehand.
# See instructions here:
# https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries

# Before running the sample, be sure to install the bigquery library
# in your local environment by running pip install google.cloud.bigquery

from google.cloud import bigquery

# TODO(developer): Replace with your values
project = "your-project"  # Your GCP Project
location = "US"  # the location where you want your BigQuery data to reside. For more info on possible locations see https://cloud.google.com/bigquery/docs/locations
dataset_name = "average_weather"


def create_dataset_and_table(project, location, dataset_name):
    # Construct a BigQuery client object.
    client = bigquery.Client(project)

    dataset_id = f"{project}.{dataset_name}"

    # Construct a full Dataset object to send to the API.
    dataset = bigquery.Dataset(dataset_id)

    # Set the location to your desired location for the dataset.
    # For more information, see this link:
    # https://cloud.google.com/bigquery/docs/locations
    dataset.location = location

    # Send the dataset to the API for creation.
    # Raises google.api_core.exceptions.Conflict if the Dataset already
    # exists within the project.
    dataset = client.create_dataset(dataset)  # Make an API request.

    print(f"Created dataset {client.project}.{dataset.dataset_id}")

    # Create a table from this dataset.

    table_id = f"{client.project}.{dataset_name}.average_weather"

    schema = [
        bigquery.SchemaField("location", "GEOGRAPHY", mode="REQUIRED"),
        bigquery.SchemaField("average_temperature", "INTEGER", mode="REQUIRED"),
        bigquery.SchemaField("month", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("inches_of_rain", "NUMERIC", mode="NULLABLE"),
        bigquery.SchemaField("is_current", "BOOLEAN", mode="NULLABLE"),
        bigquery.SchemaField("latest_measurement", "DATE", mode="NULLABLE"),
    ]

    table = bigquery.Table(table_id, schema=schema)
    table = client.create_table(table)  # Make an API request.
    print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")

יצירת קטגוריה של Cloud Storage

יוצרים מאגר (bucket) שיכיל את כל הקבצים שנדרשים לתהליך העבודה. ה-DAG שתיצרו בהמשך המדריך הזה יפנה לקבצים שתעלו לקטגוריית האחסון הזו. כדי ליצור קטגוריית אחסון חדשה:

המסוף

  1. פותחים את Cloud Storage במסוף Google Cloud .

    כניסה ל-Cloud Storage

  2. לוחצים על Create Bucket (יצירת קטגוריה) כדי לפתוח את הטופס ליצירת קטגוריה.

    1. מזינים את פרטי הקטגוריה ולוחצים על Continue כדי להשלים כל שלב:

      • מזינים Name (שם) ייחודי בהיקף גלובלי לקטגוריה. במדריך הזה נשתמש ב-bucketName כדוגמה.

      • בוחרים באפשרות Region בשביל סוג המיקום. לאחר מכן בוחרים Location שבו יישמרו נתוני הקטגוריה.

      • בוחרים באפשרות Standard כסוג האחסון (storage class) שמוגדר כברירת מחדל לנתונים.

      • בוחרים באפשרות Uniform בקרת גישה כדי לגשת לאובייקטים.

    2. לוחצים על סיום.

gcloud

משתמשים בפקודה gcloud storage buckets create:

gcloud storage buckets create gs://bucketName/

מחליפים את מה שכתוב בשדות הבאים:

  • bucketName: השם של הקטגוריה שיצרתם קודם במדריך הזה.

דוגמאות קוד

C#

כדי לבצע אימות ב-Managed Airflow, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.


using Google.Apis.Storage.v1.Data;
using Google.Cloud.Storage.V1;
using System;

public class CreateBucketSample
{
    public Bucket CreateBucket(
        string projectId = "your-project-id",
        string bucketName = "your-unique-bucket-name")
    {
        var storage = StorageClient.Create();
        var bucket = storage.CreateBucket(projectId, bucketName);
        Console.WriteLine($"Created {bucketName}.");
        return bucket;
    }
}

Go

כדי לבצע אימות ב-Managed Airflow, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
)

// createBucket creates a new bucket in the project.
func createBucket(w io.Writer, projectID, bucketName string) error {
	// projectID := "my-project-id"
	// bucketName := "bucket-name"
	ctx := context.Background()
	client, err := storage.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("storage.NewClient: %w", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*30)
	defer cancel()

	bucket := client.Bucket(bucketName)
	if err := bucket.Create(ctx, projectID, nil); err != nil {
		return fmt.Errorf("Bucket(%q).Create: %w", bucketName, err)
	}
	fmt.Fprintf(w, "Bucket %v created\n", bucketName)
	return nil
}

Java

כדי לבצע אימות ב-Managed Airflow, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.

import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

public class CreateBucket {
  public static void createBucket(String projectId, String bucketName) {
    // The ID of your GCP project
    // String projectId = "your-project-id";

    // The ID to give your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    Storage storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService();

    Bucket bucket = storage.create(BucketInfo.newBuilder(bucketName).build());

    System.out.println("Created bucket " + bucket.getName());
  }
}

Python

כדי לבצע אימות ב-Managed Airflow, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.

from google.cloud import storage


def create_bucket(bucket_name):
    """Creates a new bucket."""
    # bucket_name = "your-new-bucket-name"

    storage_client = storage.Client()

    bucket = storage_client.create_bucket(bucket_name)

    print(f"Bucket {bucket.name} created")

Ruby

כדי לבצע אימות ב-Managed Airflow, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.

def create_bucket bucket_name:
  # The ID to give your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.create_bucket bucket_name

  puts "Created bucket: #{bucket.name}"
end

יצירת סכימת BigQuery בפורמט JSON לטבלת הפלט

יוצרים קובץ סכימה בפורמט JSON של BigQuery שתואם לטבלת הפלט שיצרתם קודם. חשוב לשים לב ששמות השדות, הסוגים והמצבים צריכים להיות זהים לאלה שהוגדרו קודם בסכימת הטבלה ב-BigQuery. הקובץ הזה ינרמל את הנתונים מהקובץ .txt לפורמט שתואם לסכימה של BigQuery. מה שם הקובץ הזה? jsonSchema.json

{
    "BigQuery Schema": [
    {
        "name": "location",
        "type": "GEOGRAPHY",
        "mode": "REQUIRED"
    },
    {
        "name": "average_temperature",
        "type": "INTEGER",
        "mode": "REQUIRED"
    },
    {
        "name": "month",
        "type": "STRING",
        "mode": "REQUIRED"
    },
    {
        "name": "inches_of_rain",
        "type": "NUMERIC"
    },
    {
        "name": "is_current",
        "type": "BOOLEAN"
    },
    {
        "name": "latest_measurement",
        "type": "DATE"
    }]
}

יצירת קובץ JavaScript לעיצוב הנתונים

בקובץ הזה מגדירים את הפונקציה המוגדרת על ידי המשתמש (UDF) שמספקת את הלוגיקה להמרת שורות הטקסט בקובץ הקלט. שימו לב שהפונקציה הזו מקבלת כל שורת טקסט בקובץ הקלט כארגומנט נפרד, ולכן הפונקציה תפעל פעם אחת לכל שורה בקובץ הקלט. מה שם הקובץ הזה? transformCSVtoJSON.js


function transformCSVtoJSON(line) {
  var values = line.split(',');
  var properties = [
    'location',
    'average_temperature',
    'month',
    'inches_of_rain',
    'is_current',
    'latest_measurement',
  ];
  var weatherInCity = {};

  for (var count = 0; count < values.length; count++) {
    if (values[count] !== 'null') {
      weatherInCity[properties[count]] = values[count];
    }
  }

  return JSON.stringify(weatherInCity);
}

יצירת קובץ קלט

הקובץ הזה יכיל את המידע שאתם רוצים להעלות לטבלה ב-BigQuery. מעתיקים את הקובץ הזה באופן מקומי ונותנים לו את השם inputFile.txt.

POINT(40.7128 74.006),45,'July',null,true,2020-02-16
POINT(41.8781 87.6298),23,'October',13,false,2015-02-13
POINT(48.8566 2.3522),80,'December',null,true,null
POINT(6.5244 3.3792),15,'March',14,true,null

העלאת הקבצים לקטגוריה

מעלים את הקבצים הבאים לקטגוריה של Cloud Storage שיצרתם קודם:

  • סכימת BigQuery בפורמט JSON‏ (.json)
  • פונקציה מוגדרת על ידי המשתמש ב-JavaScript‏ (transformCSVtoJSON.js)
  • קובץ הקלט של הטקסט שרוצים לעבד (.txt)

המסוף

  1. במסוף Google Cloud , נכנסים לדף Buckets של Cloud Storage.

    כניסה לדף Buckets

  2. ברשימת הקטגוריות, לוחצים על הקטגוריה הרצויה.

  3. בכרטיסייה Objects של הקטגוריה, מבצעים אחת מהפעולות הבאות:

    • גוררים את הקבצים הרצויים משולחן העבודה או ממנהל הקבצים ומשחררים אותם בחלונית הראשית של Google Cloud המסוף.

    • לוחצים על הלחצן העלאת קבצים, בוחרים את הקבצים שרוצים להעלות בתיבת הדו-שיח שמופיעה ולוחצים על פתיחה.

gcloud

מריצים את הפקודה gcloud storage cp:

gcloud storage cp OBJECT_LOCATION gs://bucketName

מחליפים את מה שכתוב בשדות הבאים:

  • bucketName: השם של הקטגוריה שיצרתם קודם במדריך הזה.
  • OBJECT_LOCATION: הנתיב המקומי לאובייקט. לדוגמה, Desktop/transformCSVtoJSON.js.

דוגמאות קוד

Python

כדי לבצע אימות ב-Managed Airflow, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.

from google.cloud import storage


def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"
    # The path to your file to upload
    # source_file_name = "local/path/to/file"
    # The ID of your GCS object
    # destination_blob_name = "storage-object-name"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    # Optional: set a generation-match precondition to avoid potential race conditions
    # and data corruptions. The request to upload is aborted if the object's
    # generation number does not match your precondition. For a destination
    # object that does not yet exist, set the if_generation_match precondition to 0.
    # If the destination object already exists in your bucket, set instead a
    # generation-match precondition using its generation number.
    generation_match_precondition = 0

    blob.upload_from_filename(source_file_name, if_generation_match=generation_match_precondition)

    print(
        f"File {source_file_name} uploaded to {destination_blob_name}."
    )

Ruby

כדי לבצע אימות ב-Managed Airflow, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.

def upload_file bucket_name:, local_file_path:, file_name: nil
  # The ID of your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  # The path to your file to upload
  # local_file_path = "/local/path/to/file.txt"

  # The ID of your GCS object
  # file_name = "your-file-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.bucket bucket_name, skip_lookup: true

  file = bucket.create_file local_file_path, file_name

  puts "Uploaded #{local_file_path} as #{file.name} in bucket #{bucket_name}"
end

הגדרה של DataflowTemplateOperator

לפני שמריצים את ה-DAG, צריך להגדיר את משתני Airflow הבאים.

משתנה Airflow ערך
project_id מזהה הפרויקט. דוגמה: example-project.
gce_zone התחום (zone) ב-Compute Engine שבו צריך ליצור את אשכול Dataflow. דוגמה: us-central1-a מידע נוסף על אזורים חוקיים זמין במאמר אזורים ותחומים.
bucket_path המיקום של קטגוריה של Cloud Storage שיצרתם קודם. לדוגמה: gs://example-bucket.

עכשיו תשתמשו בקבצים שיצרתם קודם כדי ליצור DAG שמפעיל את תהליך העבודה של Dataflow. מעתיקים את ה-DAG הזה ושומרים אותו באופן מקומי בשם composer-dataflow-dag.py.



"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.

This DAG relies on four Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
  created.
For more info on zones where Dataflow is available see:
https://cloud.google.com/dataflow/docs/resources/locations
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataflow import (
    DataflowTemplatedJobStartOperator,
)
from airflow.utils.dates import days_ago

bucket_path = "{{var.value.bucket_path}}"
project_id = "{{var.value.project_id}}"
gce_zone = "{{var.value.gce_zone}}"


default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "tempLocation": bucket_path + "/tmp/",
    },
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "composer_dataflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    start_template_job = DataflowTemplatedJobStartOperator(
        # The task id of your job
        task_id="dataflow_operator_transform_csv_to_bq",
        # The name of the template that you're using.
        # Below is a list of all the templates you can use.
        # For versions in non-production environments, use the subfolder 'latest'
        # https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
        template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
        # Use the link above to specify the correct parameters for your template.
        parameters={
            "javascriptTextTransformFunctionName": "transformCSVtoJSON",
            "JSONPath": bucket_path + "/jsonSchema.json",
            "javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
            "inputFilePattern": bucket_path + "/inputFile.txt",
            "outputTable": project_id + ":average_weather.average_weather",
            "bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
        },
    )

העלאת ה-DAG ל-Cloud Storage

מעלים את ה-DAG לתיקייה /dags בקטגוריה של הסביבה. אחרי שההעלאה מסתיימת בהצלחה, אפשר לראות אותה בלחיצה על הקישור DAGs Folder בדף Managed Airflow Environments.

תיקיית ה-DAG בסביבה שלכם מכילה את ה-DAG

צפייה בסטטוס של המשימה

  1. עוברים אל ממשק האינטרנט של Airflow.
  2. בדף DAGs, לוחצים על שם ה-DAG (למשל composerDataflowDAG).
  3. בדף הפרטים של DAG, לוחצים על תצוגת גרף.
  4. בדיקת הסטטוס:

    • Failed: המשימה מוקפת בתיבה אדומה. אפשר גם להעביר את מצביע העכבר מעל המשימה ולחפש את הסטטוס: נכשל.

    • Success: המשימה מוקפת בתיבה ירוקה. אפשר גם להעביר את מצביע העכבר מעל המשימה ולבדוק אם מופיע State: Success.

אחרי כמה דקות, אפשר לבדוק את התוצאות ב-Dataflow וב-BigQuery.

הצגת העבודה ב-Dataflow

  1. נכנסים לדף Dataflow במסוף Google Cloud .

    מעבר אל Dataflow

  2. שם העבודה הוא dataflow_operator_transform_csv_to_bq עם מזהה ייחודי שמצורף לסוף השם עם מקף, כך:

    למשימת Dataflow יש מזהה ייחודי

  3. כדי לראות את פרטי המשרה, לוחצים על השם.

    לראות את כל פרטי המשרה

הצגת התוצאות ב-BigQuery

  1. במסוף Google Cloud , עוברים לדף BigQuery.

    כניסה ל-BigQuery

  2. אפשר להגיש שאילתות באמצעות SQL סטנדרטי. כדי לראות את השורות שנוספו לטבלה, משתמשים בשאילתה הבאה:

    SELECT * FROM projectId.average_weather.average_weather
    

המאמרים הבאים