שימוש בספריות הלקוח של Cloud ל-Python

במדריך הזה יש הסבר מפורט על Cloud Shell שבו נעשה שימוש בספריות הלקוח של Google Cloud ל-Python כדי לבצע קריאות באופן פרוגרמטי לממשקי gRPC של Managed Service for Apache Spark, ליצור אשכול ולהגיש עבודה לאשכול.

בקטעים הבאים מוסבר על הפעולה של קוד ההדרכה שמופיע במאגר GoogleCloudPlatform/python-dataproc ב-GitHub.

הרצת ההדרכה של Cloud Shell

כדי להריץ את ההדרכה, לוחצים על Open in Cloud Shell (פתיחה ב-Cloud Shell).

פתיחה ב-Cloud Shell

הסבר על הקוד

בקטע הזה מוסבר איך קוד המדריך משתמש בספריות לקוח של Cloud ל-Python כדי לבצע אימות ב- Google Cloud, ליצור אשכול, לשלוח עבודת Spark ולנקות את הסביבה על ידי מחיקת האשכול.

Application Default Credentials

ההדרכה המפורטת לשימוש ב-Cloud Shell במדריך הזה מספקת אימות באמצעות פרטי הכניסה של הפרויקט Google Cloud . כשמריצים קוד באופן מקומי, מומלץ להשתמש בפרטי כניסה של חשבון שירות כדי לאמת את הקוד.

יצירת אשכול Managed Service for Apache Spark

הערכים הבאים מוגדרים כדי ליצור את האשכול:

  • הפרויקט שבו האשכול ייווצר
  • האזור שבו ייווצר האשכול
  • השם של האשכול
  • הגדרת האשכול, שבה מצוין מאסטר אחד ושני עובדים ראשיים

ההגדרות שנותרו באשכול יוגדרו כברירת מחדל. אפשר לשנות את הגדרות ברירת המחדל של האשכול. לדוגמה, אפשר להוסיף מכונות וירטואליות משניות (ברירת מחדל = 0) או לציין רשת VPC לא סטנדרטית לאשכול. מידע נוסף זמין במאמר בנושא CreateCluster.

def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file):
    # Create the cluster client.
    cluster_client = dataproc_v1.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    print(f"Cluster created successfully: {result.cluster_name}")

שליחת משרה

הערכים הבאים מוגדרים לשליחת העבודה:

  • הפרויקט שבו האשכול ייווצר
  • האזור שבו ייווצר האשכול
  • הגדרת העבודה, שבה מצוין שם האשכול ונתיב הקובץ (URI) של עבודת PySpark ב-Cloud Storage

מידע נוסף זמין במאמר בנושא SubmitJob.

# Create the job client.
job_client = dataproc_v1.JobControllerClient(
    client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

# Create the job config.
job = {
    "placement": {"cluster_name": cluster_name},
    "pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"},
}

operation = job_client.submit_job_as_operation(
    request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()

# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

output = (
    storage.Client()
    .get_bucket(matches.group(1))
    .blob(f"{matches.group(2)}.000000000")
    .download_as_bytes()
    .decode("utf-8")
)

print(f"Job finished successfully: {output}\r\n")

מחיקת האשכול

הערכים הבאים מוגדרים למחיקת האשכול:

  • הפרויקט שבו האשכול ייווצר
  • האזור שבו ייווצר האשכול
  • השם של האשכול

מידע נוסף זמין במאמר DeleteCluster.

# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(
    request={
        "project_id": project_id,
        "region": region,
        "cluster_name": cluster_name,
    }
)
operation.result()

print(f"Cluster {cluster_name} successfully deleted.")