Se connecter à des sources de données
Managed Service pour Apache Spark s'intègre aux bases de données, aux entrepôts de données et aux services de streaming pour traiter les données de plusieurs systèmes. Ce document explique comment connecter des clusters Managed Service pour Apache Spark à des sources de données à l'intérieur et à l'extérieur de Google Cloud.
Avant de commencer
- Connectez-vous à votre Google Cloud compte. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $de crédits sans frais pour exécuter, tester et déployer des charges de travail.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that you have the permissions required to complete this guide.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Managed Service for Apache Spark, BigQuery, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that you have the permissions required to complete this guide.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Managed Service for Apache Spark, BigQuery, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- Créez un cluster Managed Service pour Apache Spark.
- Créez un bucket Cloud Storage.
- Assurez-vous de disposer des autorisations requises pour accéder aux sources de données.
Rôles requis
Certains rôles IAM sont requis pour exécuter les exemples de cette page. En fonction des règles d'administration, ces rôles peuvent déjà avoir été attribués. Pour vérifier les attributions de rôles, consultez la section Devez-vous attribuer des rôles ?.
Pour en savoir plus sur l'attribution de rôles, consultez la section Gérer l'accès aux projets, aux dossiers et aux organisations.
Rôles utilisateur
Pour obtenir les autorisations nécessaires pour créer un cluster Managed Service pour Apache Spark, demandez à votre administrateur de vous accorder les rôles IAM suivants :
-
Éditeur Dataproc (
roles/dataproc.editor) sur le projet -
Utilisateur du compte de service (
roles/iam.serviceAccountUser) sur le compte de service Compute Engine par défaut
Rôle du compte de service
Pour vous assurer que le compte de service Compute Engine par défaut dispose des autorisations nécessaires pour créer un cluster Managed Service pour Apache Spark, demandez à votre administrateur d'accorder le rôle IAM Nœud de calcul Dataproc (roles/dataproc.worker) au compte de service Compute Engine par défaut sur le projet.
Se connecter à des Google Cloud services
Cette section explique comment connecter Managed Service pour Apache Spark à Google Cloud et à des sources de données externes.
Se connecter à Cloud Storage
Managed Service pour Apache Spark utilise Cloud Storage comme système de fichiers distribué par défaut. Pour lire et écrire des données dans des buckets Cloud Storage, utilisez le préfixe d'URI gs://. Aucune configuration supplémentaire n'est requise.
L'exemple suivant montre comment lire un fichier CSV à partir d'un bucket Cloud Storage, puis écrire un fichier Parquet dans le bucket.
# Read a CSV file from a Cloud Storage bucket into a Spark DataFrame
df = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("gs://BUCKET_NAME/path/to/data.csv")
df.show()
# Write a Spark DataFrame to a Cloud Storage bucket
df.write.format("parquet") \
.mode("overwrite") \
.save("gs://BUCKET_NAME/path/to/output.parquet")
Se connecter à BigQuery
Le connecteur BigQuery est préinstallé sur les versions d'image 2.1 et ultérieures de Managed Service pour Apache Spark. Il permet le transfert de données à grande échelle entre Managed Service pour Apache Spark et BigQuery.
L'exemple suivant montre comment lire une table BigQuery dans un DataFrame Spark, puis exécuter une requête.
# Set up the BigQuery connector.
spark.conf.set("spark.sql.sources.provider", "bigquery")
spark.conf.set("viewsEnabled", "true")
spark.conf.set("materializationDataset", "YOUR_BQ_DATASET")
# Read a BigQuery table into a Spark DataFrame,
df = spark.read.format("bigquery") \
.option("table", "bigquery-public-data.samples.shakespeare") \
.load()
df.createOrReplaceTempView("shakespeare")
# Perform a query on the data,
word_counts = spark.sql("SELECT word, SUM(word_count) as count FROM shakespeare GROUP BY word ORDER BY count DESC")
word_counts.show()
Se connecter à des bases de données externes à l'aide de JDBC
Vous pouvez connecter Managed Service pour Apache Spark à une base de données qui fournit un pilote Java Database Connectivity (JDBC), tel que PostgreSQL ou MySQL, à l'aide de la source de données JDBC intégrée à Spark.
Rendre le pilote JDBC disponible
Pour vous connecter à une base de données avec JDBC, mettez le fichier JAR du pilote à disposition sur le chemin de classe Spark.
Par tâche
Stockez le fichier JAR du pilote dans un bucket Cloud Storage et référencez-le avec l'option --jars lorsque vous envoyez une tâche. Spark distribue le fichier JAR aux nœuds nécessaires pour la tâche. Il s'agit de l'approche recommandée pour les dépendances spécifiques à une tâche.
À l'échelle du cluster
Si toutes les tâches d'un cluster nécessitent un pilote spécifique, utilisez une action d'initialisation lorsque vous créez le cluster pour copier le fichier JAR du pilote de Cloud Storage dans le répertoire de fichiers JAR Spark de chaque nœud.
Créez un script d'action d'initialisation.
#!/bin/bash set -e -x gsutil cp gs://YOUR_BUCKET_NAME/drivers/postgresql-42.7.3.jar /usr/lib/spark/jars/Créez le cluster et référencez le script.
gcloud dataproc clusters create my-jdbc-cluster \ --region=REGION \ --image-version=2.2-debian11 \ --initialization-actions=gs://YOUR_BUCKET_NAME/init-actions/install-jdbc-driver.sh
Lire dans une base de données PostgreSQL
Cet exemple utilise la méthode par tâche pour se connecter à une base de données PostgreSQL.
Téléchargez le fichier JAR du pilote JDBC PostgreSQL et importez-le dans un bucket Cloud Storage.
Envoyez votre tâche PySpark et référencez le chemin d'accès Cloud Storage du pilote avec l'option
--jars.gcloud dataproc jobs submit pyspark my_job.py \ --cluster=MY_CLUSTER \ --region=REGION \ --jars=gs://YOUR_BUCKET_NAME/drivers/postgresql-42.7.3.jarUtilisez le code suivant dans votre fichier
job.pypour lire et écrire dans la base de données.from pyspark.sql import SparkSession def main(): # WARNING: Do not hardcode credentials. Use a service like Secret Manager # to handle sensitive credentials. db_properties = { "user": "USERNAME", "password": "PASSWORD", "driver": "org.postgresql.Driver" } jdbc_url = "jdbc:postgresql://DB_HOST:DB_PORT/DATABASE" spark = SparkSession.builder.appName("Postgres JDBC Example").getOrCreate() # Read data from a PostgreSQL table df = spark.read.jdbc(url=jdbc_url, table="public.my_table", properties=db_properties) df.printSchema() df.show() # Write data to a new PostgreSQL table (df.write .jdbc(url=jdbc_url, table="public.new_table", mode="overwrite", properties=db_properties)) if __name__ == "__main__": main()
Se connecter à des bases de données externes à l'aide d'ODBC
Pour vous connecter à des sources de données où aucun pilote JDBC n'est disponible, utilisez un pilote Open Database Connectivity (ODBC). Utilisez une action d'initialisation pour installer le pilote et ses dépendances sur chaque nœud lorsque vous créez le cluster.
Cet exemple montre comment se connecter à une instance Microsoft SQL Server.
Créez le script d'action d'initialisation.
- Créez un script qui installe le pilote ODBC Microsoft SQL Server pour Debian.
#!/bin/bash # Initialization action for installing MS SQL ODBC driver on the cluster. set -e -x # Install dependencies for the driver. apt-get update apt-get install -y --no-install-recommends curl gnupg unixodbc-dev # Add Microsoft's official repository. curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - curl https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list # Install the driver. apt-get update ACCEPT_EULA=Y apt-get install -y msodbcsql18 # Install the pyodbc library for Python. pip install pyodbc ``` 1. Upload the script to a Cloud Storage bucket. ```shell gsutil cp install-odbc-mssql.sh gs://YOUR_BUCKET_NAME/init-actions/ ```Créez un cluster Managed Service pour Apache Spark qui exécute l'action d'initialisation.
Lorsque vous créez votre cluster, pointez vers le script d'action d'initialisation dans Cloud Storage.
gcloud dataproc clusters create my-odbc-cluster \ --region=REGION \ --image-version=2.2-debian11 \ --initialization-actions=gs://BUCKET_NAME/init-actions/install-odbc-mssql.shConnectez-vous et interrogez à l'aide de PySpark
Une fois le cluster en cours d'exécution, le pilote ODBC et la bibliothèque
pyodbcsont disponibles. Le code suivant utilisepyodbcsur le nœud du pilote pour extraire des données dans un DataFrame Pandas, puis les convertit en DataFrame Spark pour le traitement distribué.import pyodbc import pandas as pd from pyspark.sql import SparkSession def get_sql_data(connection_string, query): """ Connects to the database using pyodbc, executes a query, and returns the result as a Pandas DataFrame. """ cnxn = pyodbc.connect(connection_string) pdf = pd.read_sql(query, cnxn) cnxn.close() return pdf def main(): spark = SparkSession.builder.appName("ODBC Example").getOrCreate() # WARNING: Do not hardcode credentials. Use Secret Manager. server = 'SERVER.database.windows.net' database = 'DATABASE' username = 'USERNAME' password = 'PASSWORD' driver = '{ODBC Driver 18 for SQL Server}' # The driver name must match the installed driver. # Create the connection string. connection_string = f'DRIVER={driver};SERVER=tcp:{server},1433;DATABASE={database};UID={username};PWD={password};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;' sql_query = "SELECT * FROM Sales.Customer" # Fetch data as a Pandas DataFrame. pandas_df = get_sql_data(connection_string, sql_query) # Convert the Pandas DataFrame to a Spark DataFrame for distributed processing. spark_df = spark.createDataFrame(pandas_df) print("Successfully read data from SQL Server:") spark_df.printSchema() spark_df.show(5) # You can now perform distributed operations on the Spark DataFrame. print(f"Total number of customers: {spark_df.count()}") if __name__ == "__main__": main()
Bonnes pratiques
Cette section décrit les bonnes pratiques pour gérer les identifiants et les dépendances.
Gestion des identifiants
Ne codez pas en dur les mots de passe ni d'autres secrets dans votre code. Utilisez Secret Manager pour stocker les identifiants de manière sécurisée et y accéder à partir de vos tâches Managed Service pour Apache Spark.
Gestion des dépendances
Utilisez des actions d'initialisation pour installer des pilotes et des bibliothèques lorsque vous créez un cluster. Pour les environnements complexes, envisagez de créer une image Managed Service pour Apache Spark personnalisée avec toutes les dépendances préinstallées. Une image personnalisée réduit le temps de démarrage du cluster et garantit la cohérence.
Copiez les scripts d'action d'initialisation à partir de sources publiques dans votre propre bucket Cloud Storage versionné avant de les utiliser en production. Ne référencez pas directement les scripts publics, car ils peuvent être modifiés sans préavis.
Étape suivante
- En savoir plus sur Secret Manager.
- Découvrez comment créer une image personnalisée.
- En savoir plus sur les actions d'initialisation.