Dataproc Spark Connect クライアントは、Apache Spark Connect クライアントのラッパーです。これにより、アプリケーションは Spark Connect プロトコルを使用して、リモートの Serverless for Apache Spark セッションと通信できます。このドキュメントでは、クライアントをインストール、構成、使用する方法について説明します。
始める前に
インタラクティブ セッションとセッション テンプレートの管理に必要な権限を含む Identity and Access Management ロールがあることを確認します。
Google Cloudの外部でクライアントを実行する場合は、認証情報を指定します。
GOOGLE_APPLICATION_CREDENTIALS環境変数をサービス アカウント キーファイルのパスに設定します。
クライアントをインストールまたはアンインストールする
pip を使用して dataproc-spark-connect パッケージをインストールまたはアンインストールできます。
インストール
クライアントの最新バージョンをインストールするには、次のコマンドを実行します。
pip install -U dataproc-spark-connect
アンインストール
クライアントをアンインストールするには、次のコマンドを実行します。
pip uninstall dataproc-spark-connect
クライアントを構成する
セッションのプロジェクトとリージョンを指定します。これらの値は、環境変数を使用するか、コードでビルダー API を使用して設定できます。
環境変数
GOOGLE_CLOUD_PROJECT と GOOGLE_CLOUD_REGION の環境変数を設定します。
Builder API
.projectId() メソッドと .location() メソッドを使用します。
spark = DataprocSparkSession.builder.projectId("my-project").location("us-central1").getOrCreate()
Spark セッションを開始する
Spark セッションを開始するには、必要なインポートを PySpark アプリケーションまたはノートブックに追加し、DataprocSparkSession.builder.getOrCreate() API を呼び出します。
DataprocSparkSessionクラスをインポートします。getOrCreate()メソッドを呼び出してセッションを開始します。from google.cloud.dataproc_spark_connect import DataprocSparkSession spark = DataprocSparkSession.builder.getOrCreate()
Spark プロパティを構成する
Spark プロパティを構成するには、1 つ以上の .config() メソッドをビルダーにチェーンします。
from google.cloud.dataproc_spark_connect import DataprocSparkSession
spark = DataprocSparkSession.builder.config('spark.executor.memory', '48g').config('spark.executor.cores', '8').getOrCreate()
高度な設定を使用する
高度な構成を行うには、Session クラスを使用して、サブネットワークやランタイム バージョンなどの設定をカスタマイズします。
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
session_config = Session()
session_config.environment_config.execution_config.subnetwork_uri = 'SUBNET'
session_config.runtime_config.version = '3.0'
spark = DataprocSparkSession.builder.projectId('my-project').location('us-central1').dataprocSessionConfig(session_config).getOrCreate()
名前付きセッションを再利用する
名前付きセッションを使用すると、複数のノートブック間で 1 つの Spark セッションを共有し、セッションの起動時間の遅延を繰り返すことを回避できます。
最初のノートブックで、カスタム ID を使用してセッションを作成します。
from google.cloud.dataproc_spark_connect import DataprocSparkSession session_id = 'my-ml-pipeline-session' spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate() df = spark.createDataFrame([(1, 'data')], ['id', 'value']) df.show()別のノートブックで、同じセッション ID を指定してセッションを再利用します。
from google.cloud.dataproc_spark_connect import DataprocSparkSession session_id = 'my-ml-pipeline-session' spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate() df = spark.createDataFrame([(2, 'more-data')], ['id', 'value']) df.show()
セッション ID は 4 ~ 63 文字で、先頭は英小文字にする必要があります。使用できるのは英小文字、数字、ハイフンのみです。ID の末尾をハイフンにすることはできません。ID が TERMINATED 状態のセッションは再利用できません。
Spark SQL マジック コマンドを使用する
このパッケージは、Jupyter ノートブックで Spark SQL クエリを実行するための sparksql-magic ライブラリをサポートしています。マジック コマンドはオプション機能です。
必要な依存関係をインストールします。
pip install IPython sparksql-magicマジック拡張機能を読み込みます。
%load_ext sparksql_magic省略可: デフォルト設定を構成します。
%config SparkSql.limit=20SQL クエリを実行します。
%%sparksql SELECT * FROM your_table
詳細オプションを使用するには、%%sparksql コマンドにフラグを追加します。たとえば、結果をキャッシュに保存してビューを作成するには、次のコマンドを実行します。
%%sparksql --cache --view result_view df
SELECT * FROM your_table WHERE condition = true
次のオプションが用意されています。
--cacheまたは-c: DataFrame をキャッシュに保存します。--eagerまたは-e: 事前読み込みを行うキャッシュ。--view VIEWまたは-v VIEW: 一時的なビューを作成します。--limit Nまたは-l N: デフォルトの行表示上限をオーバーライドします。variable_name: 結果を変数に保存します。