Menggunakan Library Klien Cloud untuk Python

Tutorial ini mencakup panduan Cloud Shell yang menggunakan library klien Google Cloud untuk Python untuk memanggil Managed Service untuk Apache Spark gRPC API secara terprogram guna membuat cluster dan mengirimkan tugas ke cluster.

Bagian berikut menjelaskan operasi kode panduan yang ada di repositori GitHub GoogleCloudPlatform/python-dataproc.

Jalankan panduan Cloud Shell

Klik Open in Cloud Shell untuk menjalankan panduan.

Buka di Cloud Shell

Memahami kode

Bagian ini menjelaskan cara kode tutorial menggunakan Library Klien Cloud untuk Python guna melakukan autentikasi dengan Google Cloud, membuat cluster, mengirimkan tugas Spark, dan membersihkan dengan menghapus cluster.

Kredensial Default Aplikasi

Panduan Cloud Shell dalam tutorial ini menyediakan autentikasi dengan menggunakan kredensial project Google Cloud Anda. Saat Anda menjalankan kode secara lokal, praktik yang direkomendasikan adalah menggunakan kredensial akun layanan untuk mengautentikasi kode Anda.

Membuat cluster Managed Service untuk Apache Spark

Nilai berikut ditetapkan untuk membuat cluster:

  • Project tempat cluster akan dibuat
  • Region tempat cluster akan dibuat
  • Nama cluster
  • Konfigurasi cluster, yang menentukan satu master dan dua worker primer

Setelan konfigurasi default digunakan untuk setelan cluster yang tersisa. Anda dapat mengganti setelan konfigurasi cluster default. Misalnya, Anda dapat menambahkan VM sekunder (default = 0) atau menentukan jaringan VPC non-default untuk cluster. Untuk mengetahui informasi selengkapnya, lihat CreateCluster.

def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file):
    # Create the cluster client.
    cluster_client = dataproc_v1.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    print(f"Cluster created successfully: {result.cluster_name}")

Mengirim tugas

Nilai berikut ditetapkan untuk mengirimkan tugas:

  • Project tempat cluster akan dibuat
  • Region tempat cluster akan dibuat
  • Konfigurasi tugas, yang menentukan nama cluster dan jalur file (URI) Cloud Storage tugas PySpark

Lihat SubmitJob untuk mengetahui informasi selengkapnya.

# Create the job client.
job_client = dataproc_v1.JobControllerClient(
    client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

# Create the job config.
job = {
    "placement": {"cluster_name": cluster_name},
    "pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"},
}

operation = job_client.submit_job_as_operation(
    request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()

# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

output = (
    storage.Client()
    .get_bucket(matches.group(1))
    .blob(f"{matches.group(2)}.000000000")
    .download_as_bytes()
    .decode("utf-8")
)

print(f"Job finished successfully: {output}\r\n")

Menghapus cluster

Nilai berikut ditetapkan untuk menghapus cluster:

  • Project tempat cluster akan dibuat
  • Region tempat cluster akan dibuat
  • Nama cluster

Untuk mengetahui informasi selengkapnya, lihat DeleteCluster.

# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(
    request={
        "project_id": project_id,
        "region": region,
        "cluster_name": cluster_name,
    }
)
operation.result()

print(f"Cluster {cluster_name} successfully deleted.")