"Managed Service for Apache Spark" is the new name for the product formerly known as "Dataproc on Compute Engine" (cluster deployment) and "Google Cloud Serverless for Apache Spark" (serverless deployment).
Google uses AI technology to translate content into your preferred language. AI translations can contain errors.
defquickstart(project_id,region,cluster_name,gcs_bucket,pyspark_file):# Create the cluster client.cluster_client=dataproc_v1.ClusterControllerClient(client_options={"api_endpoint":f"{region}-dataproc.googleapis.com:443"})# Create the cluster config.cluster={"project_id":project_id,"cluster_name":cluster_name,"config":{"master_config":{"num_instances":1,"machine_type_uri":"n1-standard-2"},"worker_config":{"num_instances":2,"machine_type_uri":"n1-standard-2"},},}# Create the cluster.operation=cluster_client.create_cluster(request={"project_id":project_id,"region":region,"cluster":cluster})result=operation.result()print(f"Cluster created successfully: {result.cluster_name}")
# Create the job client.job_client=dataproc_v1.JobControllerClient(client_options={"api_endpoint":f"{region}-dataproc.googleapis.com:443"})# Create the job config.job={"placement":{"cluster_name":cluster_name},"pyspark_job":{"main_python_file_uri":f"gs://{gcs_bucket}/{spark_filename}"},}operation=job_client.submit_job_as_operation(request={"project_id":project_id,"region":region,"job":job})response=operation.result()# Dataproc job output is saved to the Cloud Storage bucket# allocated to the job. Use regex to obtain the bucket and blob info.matches=re.match("gs://(.*?)/(.*)",response.driver_output_resource_uri)output=(storage.Client().get_bucket(matches.group(1)).blob(f"{matches.group(2)}.000000000").download_as_bytes().decode("utf-8"))print(f"Job finished successfully: {output}\r\n")
# Delete the cluster once the job has terminated.operation=cluster_client.delete_cluster(request={"project_id":project_id,"region":region,"cluster_name":cluster_name,})operation.result()print(f"Cluster {cluster_name} successfully deleted.")