Conectar-se às fontes de dados
O Managed Service for Apache Spark se integra a bancos de dados, data warehouses e serviços de streaming para processar dados de vários sistemas. Este documento mostra como conectar clusters do Managed Service for Apache Spark a fontes de dados dentro e fora do Google Cloud.
Antes de começar
- Faça login na sua Google Cloud conta do. Se você começou a usar o Google Cloud, crie uma conta para avaliar o desempenho dos nossos produtos em situações reais. Clientes novos também recebem US $300 em créditos para executar, testar e implantar cargas de trabalho.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that you have the permissions required to complete this guide.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Managed Service for Apache Spark, BigQuery, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that you have the permissions required to complete this guide.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Managed Service for Apache Spark, BigQuery, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- Crie um cluster do Managed Service for Apache Spark.
- Crie um bucket do Cloud Storage.
- Verifique se você tem as permissões necessárias para acessar as fontes de dados.
Funções exigidas
Algumas funções do IAM são necessárias para executar os exemplos nesta página. Dependendo das políticas da organização, essas funções já podem ter sido concedidas. Para verificar as concessões de funções, consulte Você precisa conceder funções?.
Para mais informações sobre a concessão de funções, consulte Gerenciar o acesso a projetos,pastas e organizações.
Papéis do usuário
Para receber as permissões necessárias para criar um cluster do Managed Service for Apache Spark, peça ao administrador para conceder a você as seguintes funções do IAM:
-
Editor do Dataproc (
roles/dataproc.editor) no projeto -
Usuário da conta de serviço (
roles/iam.serviceAccountUser) na conta de serviço padrão do Compute Engine
Papel de conta de serviço
Para garantir que a conta de serviço padrão do Compute Engine tenha as permissões necessárias
para criar um cluster do Managed Service for Apache Spark,
peça ao administrador para conceder a
função do IAM de Worker do Dataproc (roles/dataproc.worker)
à conta de serviço padrão do Compute Engine no projeto.
Conectar-se aos Google Cloud serviços
Esta seção mostra como conectar o Managed Service for Apache Spark ao Google Cloud e a fontes de dados externas.
Conectar-se ao Cloud Storage
O Managed Service for Apache Spark usa o Cloud Storage como sistema de arquivos distribuído padrão. Para ler e gravar dados de e para buckets do Cloud Storage, use o prefixo de URI gs://. Nenhuma configuração adicional é necessária.
O exemplo a seguir mostra como ler um arquivo CSV de um bucket do Cloud Storage e gravar um arquivo Parquet no bucket.
# Read a CSV file from a Cloud Storage bucket into a Spark DataFrame
df = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("gs://BUCKET_NAME/path/to/data.csv")
df.show()
# Write a Spark DataFrame to a Cloud Storage bucket
df.write.format("parquet") \
.mode("overwrite") \
.save("gs://BUCKET_NAME/path/to/output.parquet")
Conecte-se ao BigQuery
O conector do BigQuery é pré-instalado no Managed Service for Apache Spark 2.1 e em versões de imagem mais recentes. O conector permite a transferência de dados em grande escala entre o Managed Service for Apache Spark e o BigQuery.
O exemplo a seguir mostra como ler uma tabela do BigQuery em um DataFrame do Spark e executar uma consulta.
# Set up the BigQuery connector.
spark.conf.set("spark.sql.sources.provider", "bigquery")
spark.conf.set("viewsEnabled", "true")
spark.conf.set("materializationDataset", "YOUR_BQ_DATASET")
# Read a BigQuery table into a Spark DataFrame,
df = spark.read.format("bigquery") \
.option("table", "bigquery-public-data.samples.shakespeare") \
.load()
df.createOrReplaceTempView("shakespeare")
# Perform a query on the data,
word_counts = spark.sql("SELECT word, SUM(word_count) as count FROM shakespeare GROUP BY word ORDER BY count DESC")
word_counts.show()
Conectar-se a bancos de dados externos usando JDBC
É possível conectar o Managed Service for Apache Spark a um banco de dados que fornece um driver Java Database Connectivity (JDBC), como PostgreSQL ou MySQL, usando a fonte de dados JDBC integrada do Spark.
Disponibilizar o driver JDBC
Para se conectar a um banco de dados com JDBC, disponibilize o arquivo JAR do driver no classpath do Spark.
Por job
Armazene o JAR do driver em um bucket do Cloud Storage e referencie-o com a flag --jars ao enviar um job. O Spark distribui o arquivo JAR para os nós necessários para o job. Essa é a abordagem recomendada para dependências específicas do job.
Em todo o cluster
Se todos os jobs em um cluster exigirem um driver específico, use uma ação de inicialização ao criar o cluster para copiar o JAR do driver do Cloud Storage para o diretório de jars do Spark em cada nó.
Crie um script de ação de inicialização.
#!/bin/bash set -e -x gsutil cp gs://YOUR_BUCKET_NAME/drivers/postgresql-42.7.3.jar /usr/lib/spark/jars/Crie o cluster e referencie o script.
gcloud dataproc clusters create my-jdbc-cluster \ --region=REGION \ --image-version=2.2-debian11 \ --initialization-actions=gs://YOUR_BUCKET_NAME/init-actions/install-jdbc-driver.sh
Ler de um banco de dados do PostgreSQL
Este exemplo usa o método por job para se conectar a um banco de dados do PostgreSQL.
Faça o download do JAR do driver JDBC do PostgreSQL e faça o upload dele para um bucket do Cloud Storage.
Envie o job do PySpark e referencie o caminho do Cloud Storage do driver com a flag
--jars.gcloud dataproc jobs submit pyspark my_job.py \ --cluster=MY_CLUSTER \ --region=REGION \ --jars=gs://YOUR_BUCKET_NAME/drivers/postgresql-42.7.3.jarUse o código a seguir no arquivo
job.pypara ler e gravar no banco de dados.from pyspark.sql import SparkSession def main(): # WARNING: Do not hardcode credentials. Use a service like Secret Manager # to handle sensitive credentials. db_properties = { "user": "USERNAME", "password": "PASSWORD", "driver": "org.postgresql.Driver" } jdbc_url = "jdbc:postgresql://DB_HOST:DB_PORT/DATABASE" spark = SparkSession.builder.appName("Postgres JDBC Example").getOrCreate() # Read data from a PostgreSQL table df = spark.read.jdbc(url=jdbc_url, table="public.my_table", properties=db_properties) df.printSchema() df.show() # Write data to a new PostgreSQL table (df.write .jdbc(url=jdbc_url, table="public.new_table", mode="overwrite", properties=db_properties)) if __name__ == "__main__": main()
Conectar-se a bancos de dados externos usando ODBC
Para se conectar a fontes de dados em que um driver JDBC não está disponível, use um driver Open Database Connectivity (ODBC). Use uma ação de inicialização para instalar o driver e as dependências dele em cada nó ao criar o cluster.
Este exemplo demonstra como se conectar a uma instância do Microsoft SQL Server.
Crie o script de ação de inicialização.
- Crie um script que instala o driver ODBC do Microsoft SQL Server para Debian.
#!/bin/bash # Initialization action for installing MS SQL ODBC driver on the cluster. set -e -x # Install dependencies for the driver. apt-get update apt-get install -y --no-install-recommends curl gnupg unixodbc-dev # Add Microsoft's official repository. curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - curl https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list # Install the driver. apt-get update ACCEPT_EULA=Y apt-get install -y msodbcsql18 # Install the pyodbc library for Python. pip install pyodbc ``` 1. Upload the script to a Cloud Storage bucket. ```shell gsutil cp install-odbc-mssql.sh gs://YOUR_BUCKET_NAME/init-actions/ ```Crie um cluster do Managed Service for Apache Spark que execute a ação de inicialização.
Ao criar o cluster, aponte para o script de ação de inicialização no Cloud Storage.
gcloud dataproc clusters create my-odbc-cluster \ --region=REGION \ --image-version=2.2-debian11 \ --initialization-actions=gs://BUCKET_NAME/init-actions/install-odbc-mssql.shConectar e consultar usando o PySpark
Depois que o cluster estiver em execução, o driver ODBC e a biblioteca
pyodbcestarão disponíveis. O código a seguir usapyodbcno nó do driver para buscar dados em um DataFrame do pandas e, em seguida, o converte em um DataFrame do Spark para processamento distribuído.import pyodbc import pandas as pd from pyspark.sql import SparkSession def get_sql_data(connection_string, query): """ Connects to the database using pyodbc, executes a query, and returns the result as a Pandas DataFrame. """ cnxn = pyodbc.connect(connection_string) pdf = pd.read_sql(query, cnxn) cnxn.close() return pdf def main(): spark = SparkSession.builder.appName("ODBC Example").getOrCreate() # WARNING: Do not hardcode credentials. Use Secret Manager. server = 'SERVER.database.windows.net' database = 'DATABASE' username = 'USERNAME' password = 'PASSWORD' driver = '{ODBC Driver 18 for SQL Server}' # The driver name must match the installed driver. # Create the connection string. connection_string = f'DRIVER={driver};SERVER=tcp:{server},1433;DATABASE={database};UID={username};PWD={password};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;' sql_query = "SELECT * FROM Sales.Customer" # Fetch data as a Pandas DataFrame. pandas_df = get_sql_data(connection_string, sql_query) # Convert the Pandas DataFrame to a Spark DataFrame for distributed processing. spark_df = spark.createDataFrame(pandas_df) print("Successfully read data from SQL Server:") spark_df.printSchema() spark_df.show(5) # You can now perform distributed operations on the Spark DataFrame. print(f"Total number of customers: {spark_df.count()}") if __name__ == "__main__": main()
Práticas recomendadas
Esta seção descreve as práticas recomendadas para gerenciar credenciais e dependências.
Gerenciamento de credenciais
Não codifique senhas ou outros secrets no código. Use o Secret Manager para armazenar credenciais com segurança e acessá-las nos jobs do Managed Service for Apache Spark.
Gerenciamento de dependências
Use ações de inicialização para instalar drivers e bibliotecas ao criar um cluster. Para ambientes complexos, considere criar uma imagem personalizada do Managed Service for Apache Spark com todas as dependências pré-instaladas. Uma imagem personalizada reduz o tempo de inicialização do cluster e garante a consistência.
Copie scripts de ação de inicialização de fontes públicas para seu próprio bucket do Cloud Storage com controle de versão antes de usá-los na produção. Não referencie scripts públicos diretamente, porque eles podem mudar sem aviso.
A seguir
- Saiba mais sobre o Secret Manager.
- Saiba como criar uma imagem personalizada.
- Saiba mais sobre as ações de inicialização.