Spark Spanner 커넥터 사용

이 페이지에서는 Spark Spanner 커넥터를 사용하여 Apache SparkSpanner에서 데이터를 읽고 쓰는 Managed Service for Apache Spark 클러스터를 만드는 방법을 보여줍니다.

Spanner 커넥터는 Spark와 함께 Spanner Java 라이브러리를 사용하여 Spanner 데이터베이스에서 데이터를 읽고 씁니다. Spanner 커넥터는 Spanner 테이블그래프를 Spark DataFramesGraphFrames로 읽고 DataFrame 데이터를 Spanner 테이블에 쓸 수 있도록 지원합니다.

비용

이 문서에서는 비용이 청구될 수 있는 Google Cloud구성요소를 사용합니다.

  • Managed Service for Apache Spark
  • Spanner
  • Cloud Storage

프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용하세요.

Google Cloud 신규 사용자는 무료 체험판을 사용할 수 있습니다.

시작하기 전에

  1. Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Spanner, Managed Service for Apache Spark, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Spanner, Managed Service for Apache Spark, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  8. 필수 역할 부여
  9. Managed Service for Apache Spark 클러스터를 설정합니다.
  10. Singers 데이터베이스 테이블이 있는 Spanner 인스턴스를 설정합니다.

필수 역할 부여

이 페이지의 예시를 실행하려면 특정 IAM 역할이 필요합니다. 조직 정책에 따라 이러한 역할이 이미 부여되었을 수 있습니다. 역할 부여를 확인하려면 역할을 부여해야 하나요?를 참고하세요.

역할 부여에 대한 상세 설명은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.

Compute Engine 기본 서비스 계정에 Apache Spark용 관리 서비스를 만드는 데 필요한 권한이 있는지 확인하려면 관리자에게 프로젝트의 Compute Engine 기본 서비스 계정에 다음 IAM 역할을 부여해 달라고 요청하세요.

Managed Service for Apache Spark 클러스터 설정

Managed Service for Apache Spark 클러스터를 만들거나 2.1 이상 Managed Service for Apache Spark 이미지로 만든 기존 Managed Service for Apache Spark 클러스터를 사용합니다. 클러스터를 2.0 이하 이미지로 만든 경우 scope 속성이 cloud-platform 범위로 설정되어 있어야 합니다.

Singers 데이터베이스 테이블이 있는 Spanner 인스턴스 설정

Singers 테이블을 포함하는 데이터베이스가 있는 Spanner 인스턴스를 만듭니다. Spanner 인스턴스 ID와 데이터베이스 ID를 기록해 둡니다.

Spark에서 Spanner 커넥터 사용

Spanner 커넥터는 Spark 버전 3.1+에서 사용 가능합니다. Managed Service for Apache Spark 클러스터에 작업을 제출할 때 Cloud Storage 커넥터 JAR 파일 사양에 따라 커넥터 버전을 지정합니다.

예: Spanner 커넥터를 사용하는 gcloud CLI Spark 작업 제출

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \
    ... [other job submission flags]
  

다음을 바꿉니다.

CONNECTOR_VERSION: Spanner 커넥터 버전. GitHub GoogleCloudDataproc/spark-spanner-connector 저장소의 버전 목록에서 Spanner 커넥터 버전을 선택합니다.

Spanner 테이블 읽기

Python 또는 Scala를 사용하여 Spark 데이터 소스 API를 통해 Spanner 테이블 데이터를 Spark Dataframe으로 읽어올 수 있습니다.

PySpark

Managed Service for Apache Spark에 작업을 제출하거나 클러스터 마스터 노드의 spark-submit REPL에서 작업을 실행하여 클러스터에서 이 섹션의 PySpark 코드 예시를 실행할 수 있습니다.

Managed Service for Apache Spark 작업

  1. 로컬 텍스트 편집기를 사용하거나 Cloud Shell에서 미리 설치된 vi, vim, nano 텍스트 편집기를 사용하여 singers.py 파일을 만듭니다.
    1. 자리표시자 변수를 채운 후 다음 코드를 singers.py 파일에 붙여넣습니다. Spanner Data Boost 기능이 사용 설정되어 기본 Spanner 인스턴스에 거의 0에 가까운 영향을 줍니다.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      다음을 바꿉니다.

      1. PROJECT_ID: Google Cloud 프로젝트 ID입니다. 프로젝트 ID는 Google Cloud 콘솔 대시보드프로젝트 정보 섹션에 나열됩니다.
      2. INSTANCE_ID, DATABASE_ID, TABLE_NAME: Singers 데이터베이스 테이블이 있는 Spanner 인스턴스 설정을 참조하세요.
    2. singers.py 파일을 저장합니다.
  2. Google Cloud 콘솔, gcloud CLI, REST API를 사용하여 Managed Service for Apache Spark에 작업을 제출합니다.

    예시: Spanner 커넥터를 사용하는 gcloud CLI 작업 제출

    gcloud dataproc jobs submit pyspark singers.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
          

    다음을 바꿉니다.

    1. CLUSTER_NAME: 새 클러스터의 이름입니다.
    2. REGION: 워크로드를 실행하는 데 사용할 수 있는 Compute Engine 리전입니다.
    3. CONNECTOR_VERSION: Spanner 커넥터 버전. GitHub GoogleCloudDataproc/spark-spanner-connector 저장소의 버전 목록에서 Spanner 커넥터 버전을 선택합니다.

spark-submit 작업

  1. SSH를 사용하여 Managed Service for Apache Spark 클러스터 마스터 노드에 연결합니다.
    1. Google Cloud 콘솔에서 Managed Service for Apache Spark 클러스터 페이지로 이동한 다음 클러스터 이름을 클릭합니다.
    2. 클러스터 세부정보 페이지에서 VM 인스턴스 탭을 선택합니다. 그런 후 클러스터 마스터 노드 이름 오른쪽에 있는 SSH를 클릭합니다.
       Google Cloud 콘솔에서 클러스터 마스터 노드에 연결하는 데 사용되는 SSH 버튼이 표시된 Dataproc 클러스터 세부정보 페이지를 보여주는 스크린샷입니다.

      마스터 노드의 홈 디렉터리에 브라우저 창이 열립니다.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. 사전 설치된 vi, vim, nano 텍스트 편집기를 사용하여 마스터 노드에서 singers.py 파일을 만듭니다.
    1. 자리표시자 변수를 singers.py 파일에 채운 후 다음 코드를 singers.py 파일에 붙여넣습니다. Spanner Data Boost 기능이 사용 설정되어 기본 Spanner 인스턴스에 거의 0에 가까운 영향을 줍니다.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      다음을 바꿉니다.

      1. PROJECT_ID: Google Cloud 프로젝트 ID입니다. 프로젝트 ID는 Google Cloud 콘솔 대시보드프로젝트 정보 섹션에 나열됩니다.
      2. INSTANCE_ID, DATABASE_ID, TABLE_NAME: Singers 데이터베이스 테이블이 있는 Spanner 인스턴스 설정을 참조하세요.
    2. singers.py 파일을 저장합니다.
  3. spark-submit을 사용해서 singers.py를 실행하여 Spanner Singers 테이블을 만듭니다.
    spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      

    다음을 바꿉니다.

    1. CONNECTOR_VERSION: Spanner 커넥터 버전. GitHub GoogleCloudDataproc/spark-spanner-connector 저장소의 버전 목록에서 Spanner 커넥터 버전을 선택합니다.

    출력은 다음과 같습니다.

    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
    only showing top 20 rows
    

Scala

클러스터에서 Scala 코드 예시를 실행하려면 다음 단계를 수행합니다.

  1. SSH를 사용하여 Managed Service for Apache Spark 클러스터 마스터 노드에 연결합니다.
    1. Google Cloud 콘솔에서 Managed Service for Apache Spark 클러스터 페이지로 이동한 다음 클러스터 이름을 클릭합니다.
    2. 클러스터 세부정보 페이지에서 VM 인스턴스 탭을 선택합니다. 그런 후 클러스터 마스터 노드 이름 오른쪽에 있는 SSH를 클릭합니다.  Google Cloud 콘솔의 Dataproc 클러스터 세부정보 페이지

      마스터 노드의 홈 디렉터리에 브라우저 창이 열립니다.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. 사전 설치된 vi, vim, nano 텍스트 편집기를 사용하여 마스터 노드에서 singers.scala 파일을 만듭니다.
    1. 다음 코드를 singers.scala 파일에 붙여넣습니다. Spanner Data Boost 기능이 사용 설정되어 기본 Spanner 인스턴스에 거의 0에 가까운 영향을 줍니다.
      object singers {
        def main(): Unit = {
          /*
           * Uncomment (use the following code) if you are not running in spark-shell.
           *
          import org.apache.spark.sql.SparkSession
          val spark = SparkSession.builder()
            .appName("spark-spanner-demo")
            .getOrCreate()
          */
      
          // Load data in from Spanner. See
          // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
          // for option information.
          val singersDF =
            (spark.read.format("cloud-spanner")
              .option("projectId", "PROJECT_ID")
              .option("instanceId", "INSTANCE_ID")
              .option("databaseId", "DATABASE_ID")
              .option("table", "TABLE_NAME")
              .option("enableDataBoost", true)
              .load()
              .cache())
      
          singersDF.createOrReplaceTempView("Singers")
      
          // Load the Singers table.
          val result = spark.sql("SELECT * FROM Singers")
          result.show()
          result.printSchema()
        }
      }
        

      다음을 바꿉니다.

      1. PROJECT_ID: Google Cloud 프로젝트 ID입니다. 프로젝트 ID는 Google Cloud 콘솔 대시보드프로젝트 정보 섹션에 나열됩니다.
      2. INSTANCE_ID, DATABASE_ID, TABLE_NAME: Singers 데이터베이스 테이블이 있는 Spanner 인스턴스 설정을 참조하세요.
    2. singers.scala 파일을 저장합니다.
  3. spark-shell REPL을 실행합니다.
    $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
    

    다음을 바꿉니다.

    CONNECTOR_VERSION: Spanner 커넥터 버전. GitHub GoogleCloudDataproc/spark-spanner-connector 저장소의 버전 목록에서 Spanner 커넥터 버전을 선택합니다.

  4. :load singers.scala 명령어로 singers.scala를 실행하여 Spanner Singers 테이블을 만듭니다. 출력 목록에는 Singers 출력의 예시가 표시됩니다.
    > :load singers.scala
    Loading singers.scala...
    defined object singers
    > singers.main()
    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
      

Spanner 그래프 읽기

Spanner 커넥터에서는 GraphFrames로 직접 내보내기는 물론 개별 노드 및 에지 DataFrames로 그래프 내보내기가 지원됩니다.

다음 예시에서는 Spanner를 GraphFrame으로 내보냅니다. 여기에서는 Spanner 커넥터 jar에 포함된 Python SpannerGraphConnector 클래스를 사용하여 Spanner Graph를 읽습니다.

from pyspark.sql import SparkSession

connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar"

spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example")
         .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12")
         .config("spark.jars", connector_jar)
         .getOrCreate())
spark.sparkContext.addPyFile(connector_jar)

from spannergraph import SpannerGraphConnector

connector = (SpannerGraphConnector()
             .spark(spark)
             .project("PROJECT_ID")
             .instance("INSTANCE_ID")
             .database("DATABASE_ID")
             .graph("GRAPH_ID"))

g = connector.load_graph()
g.vertices.show()
g.edges.show()

다음을 바꿉니다.

  • CONNECTOR_VERSION: Spanner 커넥터 버전. GitHub GoogleCloudDataproc/spark-spanner-connector 저장소의 버전 목록에서 Spanner 커넥터 버전을 선택합니다.
  • PROJECT_ID: Google Cloud 프로젝트 ID입니다. 프로젝트 ID는 Google Cloud 콘솔 대시보드프로젝트 정보 섹션에 나열됩니다.
  • INSTANCE_ID, DATABASE_ID, TABLE_NAME: 인스턴스, 데이터베이스, 그래프 ID를 삽입합니다.

GraphFrames 대신 노드 및 에지 DataFrames를 내보내려면 load_dfs를 사용하세요.

df_vertices, df_edges, df_id_map = connector.load_dfs()

Spanner 테이블 쓰기

Spanner 커넥터는 Spark 데이터 소스 API를 사용하여 Spark Dataframe을 Spanner 테이블에 쓸 수 있도록 지원합니다.

DataFrame을 Spanner 테이블에 쓰는 예

코드를 저장하고 실행하기 전에 변수를 채웁니다.

"""Spanner PySpark write example."""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Spanner Write App').getOrCreate()

columns = ['id', 'name', 'email']
data = [(1, 'John Doe', 'john.doe@example.com'), (2, 'Jane Doe', 'jane.doe@example.com')]
df = spark.createDataFrame(data, columns)

df.write.format('cloud-spanner') \
    .option("projectId", "PROJECT_ID")
    .option("instanceId", "INSTANCE_ID")
    .option("databaseId", "DATABASE_ID")
    .option("table", "TABLE_NAME")
    .mode("append") \
    .save()

다음을 바꿉니다.

  • PROJECT_ID: Google Cloud 프로젝트 ID입니다. 프로젝트 ID는 Google Cloud 콘솔 대시보드프로젝트 정보 섹션에 나열됩니다.
  • INSTANCE_ID, DATABASE_ID, TABLE_NAME: 인스턴스, 데이터베이스, 테이블 ID를 삽입합니다.

삭제

Google Cloud 계정에 비용이 계속 청구되지 않도록 하려면 Managed Service for Apache Spark 클러스터를 중지하거나 삭제하고 Spanner 인스턴스를 삭제하면 됩니다.

다음 단계