Utilizzo delle librerie client di Cloud per Python

Questo tutorial include una procedura dettagliata di Cloud Shell che utilizza le librerie client di Google Cloud per Python per chiamare in modo programmatico le API gRPC di Managed Service for Apache Spark per creare un cluster e inviare un job al cluster.

Le sezioni seguenti spiegano il funzionamento del codice della procedura dettagliata contenuto nel repository GitHub GoogleCloudPlatform/python-dataproc.

Esegui la procedura dettagliata di Cloud Shell

Fai clic su Apri in Cloud Shell per eseguire la procedura dettagliata.

Apri in Cloud Shell

comprendi il codice

Credenziali predefinite dell'applicazione

La procedura dettagliata di Cloud Shell in questo tutorial fornisce l'autenticazione utilizzando le credenziali del tuo Google Cloud progetto. Quando esegui il codice in locale, la prassi consigliata è utilizzare le credenziali dell'account di servizio per autenticare il codice.

Crea un cluster Managed Service for Apache Spark

Vengono impostati i seguenti valori per creare il cluster:

  • Il progetto in cui verrà creato il cluster
  • La regione in cui verrà creato il cluster
  • Il nome del cluster
  • La configurazione del cluster, che specifica un nodo master e due nodi worker primari

Per le impostazioni rimanenti del cluster vengono utilizzate le impostazioni di configurazione predefinite. Puoi sostituire le impostazioni di configurazione predefinite del cluster. Ad esempio, puoi aggiungere VM secondarie (valore predefinito = 0) o specificare una rete VPC non predefinita per il cluster. Per saperne di più, vedi CreateCluster.

def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file):
    # Create the cluster client.
    cluster_client = dataproc_v1.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    print(f"Cluster created successfully: {result.cluster_name}")

Invia un job

Vengono impostati i seguenti valori per inviare il job:

  • Il progetto in cui verrà creato il cluster
  • La regione in cui verrà creato il cluster
  • La configurazione del job, che specifica il nome del cluster e il percorso del file (URI) di Cloud Storage del job PySpark

Per saperne di più, vedi SubmitJob.

# Create the job client.
job_client = dataproc_v1.JobControllerClient(
    client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

# Create the job config.
job = {
    "placement": {"cluster_name": cluster_name},
    "pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"},
}

operation = job_client.submit_job_as_operation(
    request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()

# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

output = (
    storage.Client()
    .get_bucket(matches.group(1))
    .blob(f"{matches.group(2)}.000000000")
    .download_as_bytes()
    .decode("utf-8")
)

print(f"Job finished successfully: {output}\r\n")

Elimina il cluster

Vengono impostati i seguenti valori per eliminare il cluster:

  • Il progetto in cui verrà creato il cluster
  • La regione in cui verrà creato il cluster
  • Il nome del cluster

Per saperne di più, vedi DeleteCluster.

# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(
    request={
        "project_id": project_id,
        "region": region,
        "cluster_name": cluster_name,
    }
)
operation.result()

print(f"Cluster {cluster_name} successfully deleted.")