ייבוא מטא-נתונים ממקור מותאם אישית באמצעות Workflows

במאמר הזה מוסבר איך לייבא מטא-נתונים ממקורות של צד שלישי אל Dataplex Universal Catalog. כדי לעשות את זה, צריך להגדיר ולהפעיל צינור קישוריות מנוהל ב-Workflows. בצינור הזה מתבצעת פעולת חילוץ של מטא-נתונים ממקור הנתונים המותאם אישית שלכם, וייבוא שלהם אל Dataplex Universal Catalog, וכך נוצרות קבוצות של רשומות שנדרשות.

מידע נוסף על קישוריות מנוהלת זמין במאמר סקירה כללית על קישוריות מנוהלת.

לפני שמתחילים

לפני שמייבאים מטא-נתונים, צריך להשלים את המשימות שבקטע הזה.

יצירת מחבר

מחבר מחלץ את המטא-נתונים ממקור הנתונים ויוצר קובץ ייבוא של מטא-נתונים שאפשר לייבא באמצעות Dataplex Universal Catalog. המחבר הוא תמונה של Artifact Registry שאפשר להפעיל ב-Google Cloud Serverless for Apache Spark.

הגדרת Google Cloud משאבים

  1. Enable the Workflows, Dataproc, Cloud Storage, Dataplex, Secret Manager, Artifact Registry, and Cloud Scheduler APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

    אם אתם לא מתכננים להריץ את צינור הנתונים לפי לוח זמנים, אתם לא צריכים להפעיל את Cloud Scheduler API.

  2. יוצרים סודות ב-Secret Manager כדי לאחסן את פרטי הכניסה למקור הנתונים של הצד השלישי.

  3. הגדרת רשת של ענן וירטואלי פרטי (VPC) להרצת עומסי עבודה של Serverless for Apache Spark.

  4. יוצרים קטגוריה של Cloud Storage לאחסון קובצי ייבוא המטא-נתונים.

  5. יוצרים את המשאבים הבאים של Dataplex Universal Catalog:

    1. יוצרים סוגים מותאמים אישית של היבטים לרשומות שרוצים לייבא.

    2. יוצרים סוגים מותאמים אישית של רשומות לרשומות שרוצים לייבא.

התפקידים הנדרשים

חשבון שירות מייצג את הזהות של תהליך עבודה, וקובע אילו הרשאות יש לתהליך העבודה ולאילו משאבים הוא יכול לגשת. Google Cloud אתם צריכים חשבון שירות ל-Workflows (כדי להפעיל את צינור הנתונים) ול-Serverless for Apache Spark (כדי להפעיל את המחבר).

אתם יכולים להשתמש בחשבון השירות של Compute Engine שמוגדר כברירת מחדל (PROJECT_NUMBER-compute@developer.gserviceaccount.com), או ליצור חשבון שירות משלכם (או חשבונות) כדי להריץ את צינור הקישוריות המנוהל.

המסוף

  1. נכנסים לדף IAM במסוף Google Cloud .

    כניסה לדף IAM

  2. בוחרים את הפרויקט שאליו רוצים לייבא את המטא-נתונים.

  3. לוחצים על Grant Access (הענקת גישה) ומזינים את כתובת האימייל של חשבון השירות.

  4. מקצים לחשבון השירות את התפקידים הבאים:

    • Logs Writer
    • Dataplex Entry Group Owner
    • הבעלים של משימת המטא-נתונים של Dataplex
    • Dataplex Catalog Editor
    • עורך של Dataproc
    • Dataproc Worker
    • Secret Manager Secret Accessor – בהרשאה הסודית שמאחסנת את פרטי הכניסה למקור הנתונים
    • משתמש באובייקט אחסון – בקטגוריה של Cloud Storage
    • Artifact Registry Reader – במאגר Artifact Registry שמכיל את תמונת המחבר
    • משתמש בחשבון שירות – אם משתמשים בחשבונות שירות שונים, צריך להעניק לחשבון השירות שמריץ את Workflows את התפקיד הזה בחשבון השירות שמריץ את משימות האצווה של Serverless for Apache Spark.
    • Workflows Invoker – אם רוצים לתזמן את הפייפליין
  5. שומרים את השינויים.

gcloud

  1. נותנים לחשבון השירות תפקידים. מריצים את הפקודות הבאות:

    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/logging.logWriter
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.entryGroupOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.metadataJobOwner
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataplex.catalogEditor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataproc.editor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/dataproc.worker
    

    מחליפים את מה שכתוב בשדות הבאים:

    • PROJECT_ID: השם של פרויקט היעד Google Cloudשאליו רוצים לייבא את המטא-נתונים.
    • SERVICE_ACCOUNT_ID: חשבון השירות, למשל my-service-account@my-project.iam.gserviceaccount.com.
  2. מקצים לחשבון השירות את התפקידים הבאים ברמת המשאב:

    gcloud secrets add-iam-policy-binding SECRET_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/secretmanager.secretaccessor
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID" \
        --role=roles/storage.objectUser \
        --condition=resource.name.startsWith('projects/_/buckets/BUCKET_ID')
    gcloud artifacts repositories add-iam-policy-binding REPOSITORY \
        --location=REPOSITORY_LOCATION \
        --member=SERVICE_ACCOUNT_ID} \
        --role=roles/artifactregistry.reader
    

    מחליפים את מה שכתוב בשדות הבאים:

    • SECRET_ID: המזהה של הסוד שבו מאוחסנים פרטי הכניסה למקור הנתונים. הפורמט הוא projects/PROJECT_ID/secrets/SECRET_ID.
    • BUCKET_ID: שם הקטגוריה ב-Cloud Storage.
    • REPOSITORY: מאגר Artifact Registry שמכיל את תמונת המחבר.
    • REPOSITORY_LOCATION: המיקום שבו המאגר מתארח. Google Cloud
  3. מקצים לחשבון השירות שמריץ את Workflows את התפקיד roles/iam.serviceAccountUser בחשבון השירות שמריץ את משימות האצווה של Serverless for Apache Spark. צריך להקצות את התפקיד הזה גם אם משתמשים באותו חשבון שירות גם ב-Workflows וגם ב-Serverless for Apache Spark.

    gcloud iam service-accounts add-iam-policy-binding \
        serviceAccount:SERVICE_ACCOUNT_ID \
        --member='SERVICE_ACCOUNT_ID' \
        --role='roles/iam.serviceAccountUser'
    

    אם משתמשים בחשבונות שירות שונים, הערך של הדגל --member הוא חשבון השירות שמריץ את משימות האצווה של Serverless for Apache Spark.

  4. אם רוצים לתזמן את צינור עיבוד הנתונים, צריך להקצות לחשבון השירות את התפקיד הבא:

    gcloud projects add-iam-policy-binding PROJECT_ID \
     --member="SERVICE_ACCOUNT_ID" \
     --role=roles/workflows.invoker
    

ייבוא מטא-נתונים

כדי לייבא מטא-נתונים, יוצרים ואז מפעילים תהליך עבודה שמריץ את צינור הקישוריות המנוהל. אפשר גם ליצור לוח זמנים להרצת צינור הנתונים.

המסוף

  1. יוצרים את תהליך העבודה. עליך לספק את הפרטים הבאים:

    • חשבון שירות: חשבון השירות שהגדרתם בקטע תפקידים נדרשים במסמך הזה.
    • הצפנה: בוחרים באפשרות Google-managed encryption key.

    • הגדרת תהליך עבודה: צריך לספק את קובץ ההגדרה הבא:

      main:
        params: [args]
        steps:
          - init:
              assign:
              - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
              - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
              - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
              - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
      
          - check_networking:
              switch:
                - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                  raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
                - condition: ${NETWORK_URI == "" and SUBNETWORK_URI == ""}
                  steps:
                   - submit_extract_job_with_default_network_uri:
                        assign:
                          - NETWORK_TYPE: "networkUri"
                          - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}  
                - condition: ${NETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_network_uri:
                        assign:
                          - NETWORKING: ${NETWORK_URI}
                          - NETWORK_TYPE: "networkUri"
                - condition: ${SUBNETWORK_URI != ""}
                  steps:
                    - submit_extract_job_with_subnetwork_uri:
                        assign:
                          - NETWORKING: ${SUBNETWORK_URI}
                          - NETWORK_TYPE: "subnetworkUri"
              next: check_create_target_entry_group
      
          - check_create_target_entry_group:
              switch:
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                  next: create_target_entry_group
                - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                  next: prepare_pyspark_job_body
      
          - create_target_entry_group:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: prepare_pyspark_job_body
      
          - prepare_pyspark_job_body:
              assign:
                - pyspark_batch_body:
                    mainPythonFileUri: file:///main.py
                    args:
                      - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                      - ${"--target_location_id=" + args.CLOUD_REGION}
                      - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                      - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                      - ${"--output_folder=" + WORKFLOW_ID}
                      - ${args.ADDITIONAL_CONNECTOR_ARGS}
              next: add_jar_file_uri_if_present
      
          - add_jar_file_uri_if_present:
              switch:
                - condition: ${args.JAR_FILE_URI != "" and args.JAR_FILE_URI != null}
                  assign:
                    - pyspark_batch_body.jarFileUris : ${args.JAR_FILE_URI}
              next: generate_extract_job_link
      
          - generate_extract_job_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                  severity: "INFO"
              next: submit_pyspark_extract_job
      
          - submit_pyspark_extract_job:
              call: http.post
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                headers:
                  Content-Type: "application/json"
                query:
                  batchId: ${WORKFLOW_ID}
                body:
                  pysparkBatch: ${pyspark_batch_body}
                  runtimeConfig:
                      containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                  environmentConfig:
                      executionConfig:
                          serviceAccount: ${args.SERVICE_ACCOUNT}
                          stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                          ${NETWORK_TYPE}: ${NETWORKING}
                          networkTags: ${NETWORK_TAGS}
              result: RESPONSE_MESSAGE
              next: check_pyspark_extract_job
      
          - check_pyspark_extract_job:
              call: http.get
              args:
                url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: PYSPARK_EXTRACT_JOB_STATUS
              next: check_pyspark_extract_job_done
      
          - check_pyspark_extract_job_done:
              switch:
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                  next: generate_import_logs_link
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
                - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                  raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              next: pyspark_extract_job_wait
      
          - pyspark_extract_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_pyspark_extract_job
      
          - generate_import_logs_link:
              call: sys.log
              args:
                  data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                  severity: "INFO"
              next: submit_import_job
      
          - submit_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
                body:
                  type: IMPORT
                  import_spec:
                    source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                    entry_sync_mode: FULL
                    aspect_sync_mode: INCREMENTAL
                    log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                    scope:
                      entry_groups: 
                        - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                      entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                      aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
              result: IMPORT_JOB_RESPONSE
              next: get_job_start_time
      
          - get_job_start_time:
              assign:
                - importJobStartTime: ${sys.now()}
              next: import_job_startup_wait
      
          - import_job_startup_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: initial_get_import_job
      
          - initial_get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_status_available
      
          - check_import_job_status_available:
              switch:
                - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                  next: kill_import_job
                - condition: ${"status" in IMPORT_JOB_STATUS.body}
                  next: check_import_job_done
              next: import_job_status_wait
      
          - import_job_status_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: check_import_job_status_available
      
          - check_import_job_done:
              switch:
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                  next: the_end
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                  raise: ${IMPORT_JOB_STATUS}
                - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                  next: kill_import_job
              next: import_job_wait
      
          - get_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: IMPORT_JOB_STATUS
              next: check_import_job_done
      
          - import_job_wait:
              call: sys.sleep
              args:
                seconds: 30
              next: get_import_job
      
          - kill_import_job:
              call: http.post
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              next: get_killed_import_job
      
          - get_killed_import_job:
              call: http.get
              args:
                url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
                auth:
                  type: OAuth2
                  scopes: "https://www.googleapis.com/auth/cloud-platform"
              result: KILLED_IMPORT_JOB_STATUS
              next: killed
      
          - killed:
              raise: ${KILLED_IMPORT_JOB_STATUS}
      
          - the_end:
              return: ${IMPORT_JOB_STATUS}
      
  2. כדי להריץ את צינור עיבוד הנתונים לפי דרישה, צריך להריץ את תהליך העבודה.

    מציינים את הארגומנטים הבאים של זמן הריצה:

    {
        "TARGET_PROJECT_ID": "PROJECT_ID",
        "CLOUD_REGION": "LOCATION_ID",
        "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
        "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
        "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
        "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID",
        "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS],
        "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
        "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
        "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
        "IMPORT_JOB_LOG_LEVEL": "INFO",
        "JAR_FILE_URI": "",
        "NETWORK_TAGS": [],
        "NETWORK_URI": "",
        "SUBNETWORK_URI": ""
     }
    

    מחליפים את מה שכתוב בשדות הבאים:

    • PROJECT_ID: השם של פרויקט היעד Google Cloudשאליו רוצים לייבא את המטא-נתונים.
    • LOCATION_ID: מיקום היעד Google Cloud שבו יפעלו המשימות של Serverless for Apache Spark וייבוא המטא-נתונים, ושבו יתבצע ייבוא המטא-נתונים.
    • ENTRY_GROUP_ID: המזהה של קבוצת הרשומות שאליה רוצים לייבא את המטא-נתונים. מזהה קבוצת הרשומות יכול להכיל אותיות קטנות, מספרים ומקפים.

      השם המלא של המשאב של קבוצת הרשומות הזו הוא projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

    • CREATE_ENTRY_GROUP_BOOLEAN: אם רוצים שצינור הנתונים ייצור את קבוצת הרשומות אם היא עדיין לא קיימת בפרויקט, צריך להגדיר את הערך הזה ל-true.
    • BUCKET_ID: השם של קטגוריית Cloud Storage לאחסון קובץ ייבוא המטא-נתונים שנוצר על ידי המחבר. כל הרצה של תהליך עבודה יוצרת תיקייה חדשה.
    • SERVICE_ACCOUNT_ID: חשבון השירות שהגדרתם בקטע תפקידים נדרשים במאמר הזה. חשבון השירות מפעיל את המחבר ב-Serverless for Apache Spark.
    • ADDITIONAL_CONNECTOR_ARGUMENTS: רשימה של ארגומנטים נוספים להעברה למחבר. דוגמאות מופיעות במאמר פיתוח מחבר בהתאמה אישית לייבוא מטא-נתונים. מקיפים כל ארגומנט במירכאות כפולות ומפרידים בין הארגומנטים באמצעות פסיקים.
    • CONTAINER_IMAGE: קובץ האימג' של הקונטיינר המותאם אישית של המחבר שמתארח ב-Artifact Registry.
    • ENTRY_TYPES: רשימה של סוגי רשומות שנכללות בייבוא, בפורמט projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. המיקום LOCATION_ID צריך להיות זהה למיקוםGoogle Cloud שאליו מייבאים את המטא-נתונים, או global.
    • ASPECT_TYPES: רשימה של סוגי היבטים שנכללים בייבוא, בפורמט projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. המיקום LOCATION_ID צריך להיות זהה למיקוםGoogle Cloud שאליו מייבאים את המטא-נתונים, או global.
    • אופציונלי: עבור הארגומנט NETWORK_TAGS, מספקים רשימה של תגי רשת.
    • אופציונלי: בארגומנט NETWORK_URI, מציינים את ה-URI של רשת ה-VPC שמקושרת למקור הנתונים. אם מציינים רשת, לא מציינים את הארגומנט subnetwork.
    • אופציונלי: בארגומנט SUBNETWORK_URI, מציינים את ה-URI של רשת המשנה שמקושרת למקור הנתונים. אם מציינים רשת משנה, לא צריך לציין את ארגומנט הרשת.

    בהתאם לכמות המטא-נתונים שמייבאים, יכול להיות שייקח כמה דקות או יותר להריץ את צינור עיבוד הנתונים. מידע נוסף על הצגת ההתקדמות זמין במאמר בנושא גישה לתוצאות של הרצת תהליך עבודה.

    אחרי שהצינור סיים לפעול, אפשר לחפש את המטא-נתונים המיובאים ב-Dataplex Universal Catalog.

  3. אופציונלי: אם רוצים להריץ את צינור העיבוד לפי לוח זמנים, יוצרים לוח זמנים באמצעות Cloud Scheduler. עליך לספק את הפרטים הבאים:

    • תדירות: ביטוי unix-cron שמגדיר את לוח הזמנים להרצת צינור העיבוד.
    • ארגומנט של תהליך העבודה: ארגומנטי זמן הריצה של המחבר, כפי שמתואר בשלב הקודם.
    • חשבון שירות: חשבון השירות. חשבון השירות מנהל את הכלי לתזמון.

gcloud

  1. שומרים את הגדרת עומס העבודה הבאה כקובץ YAML:

    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
    
        - check_networking:
            switch:
              - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: ${NETWORK_URI == "" and SUBNETWORK_URI == ""}
                steps:
                 - submit_extract_job_with_default_network_uri:
                      assign:
                        - NETWORK_TYPE: "networkUri"
                        - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}  
              - condition: ${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: ${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: ${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: ${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: prepare_pyspark_job_body
    
        - create_target_entry_group:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: prepare_pyspark_job_body
    
        - prepare_pyspark_job_body:
            assign:
              - pyspark_batch_body:
                  mainPythonFileUri: file:///main.py
                  args:
                    - ${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - ${"--target_location_id=" + args.CLOUD_REGION}
                    - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - ${"--output_folder=" + WORKFLOW_ID}
                    - ${args.ADDITIONAL_CONNECTOR_ARGS}
            next: add_jar_file_uri_if_present
    
        - add_jar_file_uri_if_present:
            switch:
              - condition: ${args.JAR_FILE_URI != "" and args.JAR_FILE_URI != null}
                assign:
                  - pyspark_batch_body.jarFileUris : ${args.JAR_FILE_URI}
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: ${WORKFLOW_ID}
              body:
                pysparkBatch: ${pyspark_batch_body}
                runtimeConfig:
                    containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: ${args.SERVICE_ACCOUNT}
                        stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
                        ${NETWORK_TYPE}: ${NETWORKING}
                        networkTags: ${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: ${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups: 
                      - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: ${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: ${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
              - condition: ${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: ${IMPORT_JOB_STATUS}
              - condition: ${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: ${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: ${IMPORT_JOB_STATUS}
    
  2. מגדירים משתני Bash, יוצרים את תהליך העבודה ויוצרים לוח זמנים להפעלת צינור עיבוד הנתונים (אופציונלי):

    # Define Bash variables (replace with your actual values)
    project_id="PROJECT_ID"
    region="LOCATION_ID"
    service_account="SERVICE_ACCOUNT_ID"
    workflow_source="WORKFLOW_DEFINITION_FILE.yaml"
    workflow_name="WORKFLOW_NAME"
    workflow_args='WORKFLOW_ARGUMENTS'
    
    # Create Workflows resource
    gcloud workflows deploy ${workflow_name} \
      --project=${project_id} \
      --location=${region} \
      --source=${workflow_source} \
      --service-account=${service_account}
    
    # Create Cloud Scheduler job
    gcloud scheduler jobs create http ${workflow_name}-scheduler \
      --project=${project_id} \
      --location=${region} \
      --schedule="CRON_SCHEDULE_EXPRESSION" \
      --time-zone="UTC" \
      --uri="https://workflowexecutions.googleapis.com/v1/projects/${project_id}/locations/${region}/workflows/${workflow_name}/executions" \
      --http-method="POST" \
      --oauth-service-account-email=${service_account} \
      --headers="Content-Type=application/json" \
      --message-body='{"argument": ${workflow_args}}'
    

    מחליפים את מה שכתוב בשדות הבאים:

    • PROJECT_ID: השם של פרויקט היעד Google Cloudשאליו רוצים לייבא את המטא-נתונים.
    • LOCATION_ID: מיקום היעד Google Cloud שבו יפעלו המשימות של Serverless for Apache Spark וייבוא המטא-נתונים, ושבו יתבצע ייבוא המטא-נתונים.
    • SERVICE_ACCOUNT_ID: חשבון השירות שהגדרתם בקטע תפקידים נדרשים במאמר הזה.
    • WORKFLOW_DEFINITION_FILE: הנתיב לקובץ ה-YAML של הגדרת תהליך העבודה.
    • WORKFLOW_NAME: השם של תהליך העבודה.
    • WORKFLOW_ARGUMENTS: הארגומנטים של זמן הריצה שיועברו למחבר. הארגומנטים הם בפורמט JSON:

      {
          "TARGET_PROJECT_ID": "PROJECT_ID",
          "CLOUD_REGION": "LOCATION_ID",
          "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID",
          "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN,
          "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID",
          "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID",
          "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS],
          "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE",
          "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES],
          "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES],
          "IMPORT_JOB_LOG_LEVEL": "INFO",
          "JAR_FILE_URI": "",
          "NETWORK_TAGS": [],
          "NETWORK_URI": "",
          "SUBNETWORK_URI": ""
       }
      

      ב-Cloud Scheduler, המירכאות הכפולות בתוך המחרוזת המצוטטת מסומנות בתו בריחה (escape) באמצעות קו נטוי הפוך (\). לדוגמה: --message-body="{\"argument\": \"{\\\"key\\\": \\\"value\\\"}\"}".

      מחליפים את מה שכתוב בשדות הבאים:

      • ENTRY_GROUP_ID: המזהה של קבוצת הרשומות שאליה רוצים לייבא את המטא-נתונים. מזהה קבוצת הרשומות יכול להכיל אותיות קטנות, מספרים ומקפים.

        השם המלא של המשאב של קבוצת הרשומות הזו הוא projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

      • CREATE_ENTRY_GROUP_BOOLEAN: אם רוצים שצינור הנתונים ייצור את קבוצת הרשומות אם היא עדיין לא קיימת בפרויקט, צריך להגדיר את הערך הזה ל-true.
      • BUCKET_ID: השם של קטגוריית Cloud Storage לאחסון קובץ ייבוא המטא-נתונים שנוצר על ידי המחבר. כל הרצה של תהליך עבודה יוצרת תיקייה חדשה.
      • ADDITIONAL_CONNECTOR_ARGUMENTS: רשימה של ארגומנטים נוספים להעברה למחבר. דוגמאות מופיעות במאמר פיתוח מחבר בהתאמה אישית לייבוא מטא-נתונים.
      • CONTAINER_IMAGE: קובץ האימג' של הקונטיינר המותאם אישית של המחבר שמתארח ב-Artifact Registry.
      • ENTRY_TYPES: רשימה של סוגי רשומות שנכללות בייבוא, בפורמט projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. המיקום LOCATION_ID צריך להיות זהה למיקוםGoogle Cloud שאליו מייבאים את המטא-נתונים, או global.
      • ASPECT_TYPES: רשימה של סוגי היבטים שנכללים בייבוא, בפורמט projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. המיקום LOCATION_ID צריך להיות זהה למיקוםGoogle Cloud שאליו מייבאים את המטא-נתונים, או global.
      • אופציונלי: עבור הארגומנט NETWORK_TAGS, מספקים רשימה של תגי רשת.
      • אופציונלי: בארגומנט NETWORK_URI, מציינים את ה-URI של רשת ה-VPC שמקושרת למקור הנתונים. אם מציינים רשת, לא מציינים את הארגומנט subnetwork.
      • אופציונלי: בארגומנט SUBNETWORK_URI, מציינים את ה-URI של רשת המשנה שמקושרת למקור הנתונים. אם מציינים רשת משנה, לא צריך לציין את ארגומנט הרשת.
    • CRON_SCHEDULE_EXPRESSION: ביטוי cron שמגדיר את לוח הזמנים להרצת צינור העיבוד. לדוגמה, כדי להפעיל את התזמון בחצות בכל יום, משתמשים בביטוי 0 0 * * *.

  3. כדי להריץ את צינור עיבוד הנתונים לפי דרישה, מריצים את תהליך העבודה:

    workflow_name="WORKFLOW_NAME"
    workflow_args='WORKFLOW_ARGUMENTS'
    
    gcloud workflows run "${workflow_name}" --project=${project_id} --location=${location} --data '${workflow_args}'
    

    הארגומנטים של תהליך העבודה הם בפורמט JSON, אבל לא נעשה בהם escape.

    בהתאם לכמות המטא-נתונים שמייבאים, יכול להיות שתהליך העבודה יימשך כמה דקות או יותר. מידע נוסף על הצגת ההתקדמות זמין במאמר גישה לתוצאות של הרצת תהליך עבודה.

    אחרי שהצינור סיים לפעול, אפשר לחפש את המטא-נתונים המיובאים ב-Dataplex Universal Catalog.

Terraform

  1. משכפלים את מאגר cloud-dataplex.

    המאגר כולל את קובצי Terraform הבאים:

    • main.tf: הגדרת המשאבים שרוצים ליצור. Google Cloud
    • variables.tf: מגדיר את המשתנים.
    • byo-connector.tfvars: הגדרת המשתנים של צינור הקישוריות המנוהל.
  2. עורכים את הקובץ .tfvars כדי להחליף את הפלייסהולדרים בפרטים של המחבר.

    project_id                      = "PROJECT_ID"
    region                          = "LOCATION_ID"
    service_account                 = "SERVICE_ACCOUNT_ID"
    cron_schedule                   = "CRON_SCHEDULE_EXPRESSION"
    workflow_args                   = {"TARGET_PROJECT_ID": "PROJECT_ID", "CLOUD_REGION": "LOCATION_ID", "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID", "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN, "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID", "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID", "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS], "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE", "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES], "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES], "IMPORT_JOB_LOG_LEVEL": "INFO", "NETWORK_TAGS": [], "NETWORK_URI": "", "SUBNETWORK_URI": ""}
    
    
    workflow_source                 = <<EOF
    main:
      params: [args]
      steps:
        - init:
            assign:
            - WORKFLOW_ID: $${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - NETWORK_URI: $${default(map.get(args, "NETWORK_URI"), "")}
            - SUBNETWORK_URI: $${default(map.get(args, "SUBNETWORK_URI"), "")}
            - NETWORK_TAGS: $${default(map.get(args, "NETWORK_TAGS"), [])}
    
        - check_networking:
            switch:
              - condition: $${NETWORK_URI != "" and SUBNETWORK_URI != ""}
                raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
              - condition: $${NETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_network_uri:
                      assign:
                        - NETWORKING: $${NETWORK_URI}
                        - NETWORK_TYPE: "networkUri"
              - condition: $${SUBNETWORK_URI != ""}
                steps:
                  - submit_extract_job_with_subnetwork_uri:
                      assign:
                        - NETWORKING: $${SUBNETWORK_URI}
                        - NETWORK_TYPE: "subnetworkUri"
            next: set_default_networking
    
        - set_default_networking:
            assign:
              - NETWORK_TYPE: "networkUri"
              - NETWORKING: $${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}
            next: check_create_target_entry_group
    
        - check_create_target_entry_group:
            switch:
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == true}
                next: create_target_entry_group
              - condition: $${args.CREATE_TARGET_ENTRY_GROUP == false}
                next: generate_extract_job_link
    
        - create_target_entry_group:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: generate_extract_job_link
    
        - generate_extract_job_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
                severity: "INFO"
            next: submit_pyspark_extract_job
    
        - submit_pyspark_extract_job:
            call: http.post
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              headers:
                Content-Type: "application/json"
              query:
                batchId: $${WORKFLOW_ID}
              body:
                pysparkBatch:
                  mainPythonFileUri: file:///main.py
                  args:
                    - $${"--target_project_id=" + args.TARGET_PROJECT_ID}
                    - $${"--target_location_id=" + args.CLOUD_REGION}
                    - $${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
                    - $${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
                    - $${"--output_folder=" + WORKFLOW_ID}
                    - $${args.ADDITIONAL_CONNECTOR_ARGS}
                runtimeConfig:
                    containerImage: $${args.CUSTOM_CONTAINER_IMAGE}
                environmentConfig:
                    executionConfig:
                        serviceAccount: $${args.SERVICE_ACCOUNT}
                        stagingBucket: $${args.CLOUD_STORAGE_BUCKET_ID}
                        $${NETWORK_TYPE}: $${NETWORKING}
                        networkTags: $${NETWORK_TAGS}
            result: RESPONSE_MESSAGE
            next: check_pyspark_extract_job
    
        - check_pyspark_extract_job:
            call: http.get
            args:
              url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: PYSPARK_EXTRACT_JOB_STATUS
            next: check_pyspark_extract_job_done
    
        - check_pyspark_extract_job_done:
            switch:
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
                next: generate_import_logs_link
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
              - condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
                raise: $${PYSPARK_EXTRACT_JOB_STATUS}
            next: pyspark_extract_job_wait
    
        - pyspark_extract_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_pyspark_extract_job
    
        - generate_import_logs_link:
            call: sys.log
            args:
                data: $${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
                severity: "INFO"
            next: submit_import_job
    
        - submit_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
              body:
                type: IMPORT
                import_spec:
                  source_storage_uri: $${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
                  entry_sync_mode: FULL
                  aspect_sync_mode: INCREMENTAL
                  log_level: $${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
                  scope:
                    entry_groups: 
                      - $${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
                    entry_types: $${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
                    aspect_types: $${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
            result: IMPORT_JOB_RESPONSE
            next: get_job_start_time
    
        - get_job_start_time:
            assign:
              - importJobStartTime: $${sys.now()}
            next: import_job_startup_wait
    
        - import_job_startup_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: initial_get_import_job
    
        - initial_get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_status_available
    
        - check_import_job_status_available:
            switch:
              - condition: $${sys.now() - importJobStartTime > 300}  # 5 minutes = 300 seconds
                next: kill_import_job
              - condition: $${"status" in IMPORT_JOB_STATUS.body}
                next: check_import_job_done
            next: import_job_status_wait
    
        - import_job_status_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: check_import_job_status_available
    
        - check_import_job_done:
            switch:
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
                next: the_end
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
                raise: $${IMPORT_JOB_STATUS}
              - condition: $${sys.now() - importJobStartTime > 43200}  # 12 hours = 43200 seconds
                next: kill_import_job
            next: import_job_wait
    
        - get_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: IMPORT_JOB_STATUS
            next: check_import_job_done
    
        - import_job_wait:
            call: sys.sleep
            args:
              seconds: 30
            next: get_import_job
    
        - kill_import_job:
            call: http.post
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            next: get_killed_import_job
    
        - get_killed_import_job:
            call: http.get
            args:
              url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
              auth:
                type: OAuth2
                scopes: "https://www.googleapis.com/auth/cloud-platform"
            result: KILLED_IMPORT_JOB_STATUS
            next: killed
    
        - killed:
            raise: $${KILLED_IMPORT_JOB_STATUS}
    
        - the_end:
            return: $${IMPORT_JOB_STATUS}
    EOF
    

    מחליפים את מה שכתוב בשדות הבאים:

    • PROJECT_ID: השם של פרויקט היעד Google Cloudשאליו רוצים לייבא את המטא-נתונים.
    • LOCATION_ID: מיקום היעד Google Cloud שבו יפעלו המשימות של Serverless for Apache Spark וייבוא המטא-נתונים, ושבו יתבצע ייבוא המטא-נתונים.
    • SERVICE_ACCOUNT_ID: חשבון השירות שהגדרתם בקטע תפקידים נדרשים במאמר הזה.
    • CRON_SCHEDULE_EXPRESSION: ביטוי cron שמגדיר את לוח הזמנים להרצת צינור העיבוד. לדוגמה, כדי להפעיל את התזמון בחצות בכל יום, משתמשים בביטוי 0 0 * * *.
    • ENTRY_GROUP_ID: המזהה של קבוצת הרשומות שאליה רוצים לייבא את המטא-נתונים. מזהה קבוצת הרשומות יכול להכיל אותיות קטנות, מספרים ומקפים.

      השם המלא של המשאב של קבוצת הרשומות הזו הוא projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.

    • CREATE_ENTRY_GROUP_BOOLEAN: אם רוצים שצינור הנתונים ייצור את קבוצת הרשומות אם היא עדיין לא קיימת בפרויקט, צריך להגדיר את הערך הזה ל-true.
    • BUCKET_ID: השם של קטגוריית Cloud Storage לאחסון קובץ ייבוא המטא-נתונים שנוצר על ידי המחבר. כל הרצה של תהליך עבודה יוצרת תיקייה חדשה.
    • ADDITIONAL_CONNECTOR_ARGUMENTS: רשימה של ארגומנטים נוספים להעברה למחבר. דוגמאות מופיעות במאמר פיתוח מחבר בהתאמה אישית לייבוא מטא-נתונים. מקיפים כל ארגומנט במירכאות כפולות ומפרידים בין הארגומנטים באמצעות פסיקים.
    • CONTAINER_IMAGE: קובץ האימג' של הקונטיינר המותאם אישית של המחבר שמתארח ב-Artifact Registry.
    • ENTRY_TYPES: רשימה של סוגי רשומות שנכללות בייבוא, בפורמט projects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID. המיקום LOCATION_ID צריך להיות זהה למיקוםGoogle Cloud שאליו מייבאים את המטא-נתונים, או global.
    • ASPECT_TYPES: רשימה של סוגי היבטים שנכללים בייבוא, בפורמט projects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID. המיקום LOCATION_ID צריך להיות זהה למיקוםGoogle Cloud שאליו מייבאים את המטא-נתונים, או global.
    • אופציונלי: עבור הארגומנט NETWORK_TAGS, מספקים רשימה של תגי רשת.
    • אופציונלי: בארגומנט NETWORK_URI, מציינים את ה-URI של רשת ה-VPC שמקושרת למקור הנתונים. אם מציינים רשת, לא מציינים את הארגומנט subnetwork.
    • אופציונלי: בארגומנט SUBNETWORK_URI, מציינים את ה-URI של רשת המשנה שמקושרת למקור הנתונים. אם מציינים רשת משנה, לא צריך לציין את ארגומנט הרשת.
  3. מאתחלים את Terraform:

    terraform init
    
  4. מאמתים את Terraform באמצעות קובץ .tfvars:

    terraform plan --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    מחליפים את CONNECTOR_VARIABLES_FILE בשם של קובץ הגדרות המשתנים.

  5. פורסים את Terraform באמצעות הקובץ .tfvars:

    terraform apply --var-file=CONNECTOR_VARIABLES_FILE.tfvars
    

    ‫Terraform יוצרת תהליך עבודה ומשימה ב-Cloud Scheduler בפרויקט שצוין. ‫Workflows מריץ את צינור עיבוד הנתונים לפי התזמון שאתם מציינים.

    בהתאם לכמות המטא-נתונים שמייבאים, יכול להיות שתהליך העבודה יימשך כמה דקות או יותר. מידע נוסף על הצגת ההתקדמות זמין במאמר גישה לתוצאות של הרצת תהליך עבודה.

    אחרי שהצינור סיים לפעול, אפשר לחפש את המטא-נתונים המיובאים ב-Dataplex Universal Catalog.

צפייה ביומני המשרות

שימוש ב-Cloud Logging כדי להציג יומנים של צינור קישוריות מנוהל. מטען הייעודי (payload) של היומן כולל קישור ליומנים של משימת האצווה Serverless for Apache Spark ועבודת ייבוא המטא-נתונים, לפי הצורך. מידע נוסף זמין במאמר בנושא הצגת יומני זרימת עבודה.

פתרון בעיות

כדאי לנסות את ההצעות הבאות לפתרון בעיות:

המאמרים הבאים