Connect to data sources

Dataproc integrates with databases, data warehouses, and streaming services to process data from multiple systems. This document shows you how to connect Dataproc clusters to data sources inside and outside of Google Cloud.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that you have the permissions required to complete this guide.

  4. Verify that billing is enabled for your Google Cloud project.

  5. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  7. Verify that you have the permissions required to complete this guide.

  8. Verify that billing is enabled for your Google Cloud project.

  9. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  10. Create a Dataproc cluster.
  11. Create a Cloud Storage bucket.
  12. Make sure that you have the required permissions to access data sources.

Required roles

Certain IAM roles are required to run the examples on this page. Depending on organization policies, these roles may have already been granted. To check role grants, see Do you need to grant roles?.

For more information about granting roles, see Manage access to projects,folders, and organizations.

User roles

To get the permissions that you need to create a Dataproc cluster, ask your administrator to grant you the following IAM roles :

Service account role

To ensure that the Compute Engine default service account has the necessary permissions to create a Dataproc cluster, ask your administrator to grant the Dataproc Worker (roles/dataproc.worker) IAM role to the Compute Engine default service account on the project.

Connect to Google Cloud services

This section shows you how to connect Dataproc to Google Cloud and external data sources.

Connect to Cloud Storage

Dataproc uses Cloud Storage as its default distributed file system. To read and write data from and to Cloud Storage buckets, use the gs:// URI prefix. No additional configuration is required.

The following example shows how to read a CSV file from a Cloud Storage bucket and then write a Parquet file to the bucket.

# Read a CSV file from a Cloud Storage bucket into a Spark DataFrame
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("gs://BUCKET_NAME/path/to/data.csv")

df.show()

# Write a Spark DataFrame to a Cloud Storage bucket
df.write.format("parquet") \
    .mode("overwrite") \
    .save("gs://BUCKET_NAME/path/to/output.parquet")

Connect to BigQuery

The BigQuery connector is pre-installed on Dataproc 2.1 and later image versions. The connector enables large-scale data transfer between Dataproc and BigQuery.

The following example shows how to read a BigQuery table into a Spark DataFrame, and then perform a query.

# Set up the BigQuery connector.
spark.conf.set("spark.sql.sources.provider", "bigquery")
spark.conf.set("viewsEnabled", "true")
spark.conf.set("materializationDataset", "YOUR_BQ_DATASET")

# Read a BigQuery table into a Spark DataFrame,
df = spark.read.format("bigquery") \
    .option("table", "bigquery-public-data.samples.shakespeare") \
    .load()

df.createOrReplaceTempView("shakespeare")

# Perform a query on the data,
word_counts = spark.sql("SELECT word, SUM(word_count) as count FROM shakespeare GROUP BY word ORDER BY count DESC")
word_counts.show()

Connect to external databases using JDBC

You can connect Dataproc to a database that provides a Java Database Connectivity (JDBC) driver, such as PostgreSQL or MySQL, using the Spark built-in JDBC data source.

Make the JDBC driver available

To connect to a database with JDBC, make the driver's JAR file available on the Spark classpath.

Per-job

Store the driver JAR in a Cloud Storage bucket, and reference it with the --jars flag when you submit a job. Spark distributes the JAR file to the necessary nodes for the job. This is the recommended approach for job-specific dependencies.

Cluster-wide

If all jobs on a cluster require a specific driver, use an initialization action when you create the cluster to copy the driver JAR from Cloud Storage into the Spark jars directory on every node.

  1. Create an initialization action script.

    #!/bin/bash
    set -e -x
    gsutil cp gs://YOUR_BUCKET_NAME/drivers/postgresql-42.7.3.jar /usr/lib/spark/jars/
    
  2. Create the cluster and reference the script.

    gcloud dataproc clusters create my-jdbc-cluster \
        --region=REGION \
        --image-version=2.2-debian11 \
        --initialization-actions=gs://YOUR_BUCKET_NAME/init-actions/install-jdbc-driver.sh
    

Read from a PostgreSQL database

This example uses the per-job method to connect to a PostgreSQL database.

  1. Download the PostgreSQL JDBC driver JAR, and upload it to a Cloud Storage bucket.

  2. Submit your PySpark job and reference the driver's Cloud Storage path with the --jars flag.

    gcloud dataproc jobs submit pyspark my_job.py \
        --cluster=MY_CLUSTER \
        --region=REGION \
        --jars=gs://YOUR_BUCKET_NAME/drivers/postgresql-42.7.3.jar
    
  3. Use the following code in your job.py file to read from and write to the database.

    from pyspark.sql import SparkSession
    
    def main():
        # WARNING: Do not hardcode credentials. Use a service like Secret Manager
        # to handle sensitive credentials.
        db_properties = {
            "user": "USERNAME",
            "password": "PASSWORD",
            "driver": "org.postgresql.Driver"
        }
    
        jdbc_url = "jdbc:postgresql://DB_HOST:DB_PORT/DATABASE"
    
        spark = SparkSession.builder.appName("Postgres JDBC Example").getOrCreate()
    
        # Read data from a PostgreSQL table
        df = spark.read.jdbc(url=jdbc_url, table="public.my_table", properties=db_properties)
    
        df.printSchema()
        df.show()
    
        # Write data to a new PostgreSQL table
        (df.write
            .jdbc(url=jdbc_url,
                  table="public.new_table",
                  mode="overwrite",
                  properties=db_properties))
    
    if __name__ == "__main__":
        main()
    

Connect to external databases using ODBC

To connect to data sources where a JDBC driver is unavailable, use an Open Database Connectivity (ODBC) driver. Use an initialization action to install the driver and its dependencies on each node when you create the cluster.

This example demonstrates how to connect to a Microsoft SQL Server instance.

  1. Create the initialization action script.

    1. Create a script that installs the Microsoft SQL Server ODBC driver for Debian.
        #!/bin/bash
        # Initialization action for installing MS SQL ODBC driver on the cluster.
    
        set -e -x
    
        # Install dependencies for the driver.
        apt-get update
        apt-get install -y --no-install-recommends curl gnupg unixodbc-dev
    
        # Add Microsoft's official repository.
        curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
        curl https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list
    
        # Install the driver.
        apt-get update
        ACCEPT_EULA=Y apt-get install -y msodbcsql18
    
        # Install the pyodbc library for Python.
        pip install pyodbc
        ```
    
    1.  Upload the script to a Cloud Storage bucket.
    
    ```shell
        gsutil cp install-odbc-mssql.sh gs://YOUR_BUCKET_NAME/init-actions/
        ```
    
  2. Create a Dataproc cluster that runs the initialization action.

    When you create your cluster, point to the initialization action script in Cloud Storage.

    gcloud dataproc clusters create my-odbc-cluster \
        --region=REGION \
        --image-version=2.2-debian11 \
        --initialization-actions=gs://BUCKET_NAME/init-actions/install-odbc-mssql.sh
    
  3. Connect and query using PySpark

    After the cluster is running, the ODBC driver and pyodbc library are available. The following code uses pyodbc on the driver node to fetch data into a Pandas DataFrame, and then converts it into a Spark DataFrame for distributed processing.

    import pyodbc
    import pandas as pd
    from pyspark.sql import SparkSession
    
    def get_sql_data(connection_string, query):
        """
        Connects to the database using pyodbc, executes a query,
        and returns the result as a Pandas DataFrame.
        """
        cnxn = pyodbc.connect(connection_string)
        pdf = pd.read_sql(query, cnxn)
        cnxn.close()
        return pdf
    
    def main():
        spark = SparkSession.builder.appName("ODBC Example").getOrCreate()
    
        # WARNING: Do not hardcode credentials. Use Secret Manager.
        server = 'SERVER.database.windows.net'
        database = 'DATABASE'
        username = 'USERNAME'
        password = 'PASSWORD'
        driver = '{ODBC Driver 18 for SQL Server}' # The driver name must match the installed driver.
    
        # Create the connection string.
        connection_string = f'DRIVER={driver};SERVER=tcp:{server},1433;DATABASE={database};UID={username};PWD={password};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;'
    
        sql_query = "SELECT * FROM Sales.Customer"
    
        # Fetch data as a Pandas DataFrame.
        pandas_df = get_sql_data(connection_string, sql_query)
    
        # Convert the Pandas DataFrame to a Spark DataFrame for distributed processing.
        spark_df = spark.createDataFrame(pandas_df)
    
        print("Successfully read data from SQL Server:")
        spark_df.printSchema()
        spark_df.show(5)
    
        # You can now perform distributed operations on the Spark DataFrame.
        print(f"Total number of customers: {spark_df.count()}")
    
    if __name__ == "__main__":
        main()
    

Best practices

This section describes best practices for managing credentials and dependencies.

Credential management

Don't hardcode passwords or other secrets in your code. Use Secret Manager to store credentials securely and access them from your Dataproc jobs.

Dependency management

Use initialization actions to install drivers and libraries when you create a cluster. For complex environments, consider building a custom Dataproc image with all dependencies pre-installed. A custom image reduces cluster startup time and ensures consistency.

Copy initialization action scripts from public sources to your own versioned Cloud Storage bucket before you use them in production. Don't reference public scripts directly, as they can change without notice.

What's next