Menggunakan Library Klien Cloud untuk Python

Tutorial ini mencakup panduan Cloud Shell yang menggunakan library klien Google Cloud untuk Python guna memanggil Managed Service untuk Apache Spark gRPC API secara terprogram untuk membuat cluster dan mengirimkan tugas ke cluster.

Bagian berikut menjelaskan pengoperasian kode panduan yang terdapat di repositori GitHub GoogleCloudPlatform/python-dataproc.

Menjalankan panduan Cloud Shell

Klik Open in Cloud Shell untuk menjalankan panduan.

Buka di Cloud Shell

Memahami kode

Kredensial Default Aplikasi

Panduan Cloud Shell dalam tutorial ini menyediakan autentikasi menggunakan kredensial your Google Cloud project. Saat menjalankan kode secara lokal, praktik yang direkomendasikan adalah menggunakan kredensial akun layanan untuk mengautentikasi kode Anda.

Membuat cluster Managed Service untuk Apache Spark

Nilai berikut ditetapkan untuk membuat cluster:

  • Project tempat cluster akan dibuat
  • Region tempat cluster akan dibuat
  • Nama cluster
  • Konfigurasi cluster, yang menentukan satu master dan dua worker utama

Setelan konfigurasi default digunakan untuk setelan cluster yang tersisa. Anda dapat mengganti setelan konfigurasi cluster default. Misalnya, Anda dapat menambahkan VM sekunder (default = 0) atau menentukan jaringan VPC non-default untuk cluster. Untuk mengetahui informasi selengkapnya, lihat CreateCluster.

def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file):
    # Create the cluster client.
    cluster_client = dataproc_v1.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    print(f"Cluster created successfully: {result.cluster_name}")

Mengirimkan tugas

Nilai berikut ditetapkan untuk mengirimkan tugas:

  • Project tempat cluster akan dibuat
  • Region tempat cluster akan dibuat
  • Konfigurasi tugas, yang menentukan nama cluster dan jalur file Cloud Storage (URI) tugas PySpark

Lihat SubmitJob untuk mengetahui informasi selengkapnya.

# Create the job client.
job_client = dataproc_v1.JobControllerClient(
    client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

# Create the job config.
job = {
    "placement": {"cluster_name": cluster_name},
    "pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"},
}

operation = job_client.submit_job_as_operation(
    request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()

# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

output = (
    storage.Client()
    .get_bucket(matches.group(1))
    .blob(f"{matches.group(2)}.000000000")
    .download_as_bytes()
    .decode("utf-8")
)

print(f"Job finished successfully: {output}\r\n")

Menghapus cluster

Nilai berikut ditetapkan untuk menghapus cluster:

  • Project tempat cluster akan dibuat
  • Region tempat cluster akan dibuat
  • Nama cluster

Untuk mengetahui informasi selengkapnya, lihat DeleteCluster.

# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(
    request={
        "project_id": project_id,
        "region": region,
        "cluster_name": cluster_name,
    }
)
operation.result()

print(f"Cluster {cluster_name} successfully deleted.")