ביצוע שלבים בתהליך העבודה במקביל

שלבים מקבילים יכולים לקצר את זמן הביצוע הכולל של תהליך עבודה, כי הם מאפשרים לבצע כמה קריאות חסימה בו-זמנית.

יכול להיות שייקח זמן לחסימת שיחות כמו sleep,‏ HTTP calls ו-callbacks, החל ממילישניות ועד ימים. השלבים המקבילים נועדו לעזור בביצוע פעולות כאלה שפועלות במקביל לאורך זמן. אם בתהליך העבודה צריך לבצע כמה קריאות חוסמות שאינן תלויות זו בזו, אפשר להשתמש בענפים מקבילים כדי לקצר את זמן הביצוע הכולל. לשם כך, מתחילים את הקריאות בו-זמנית ומחכים עד שכולן יסתיימו.

לדוגמה, אם תהליך העבודה שלכם צריך לאחזר נתוני לקוחות מכמה מערכות עצמאיות לפני שהוא ממשיך, ענפים מקבילים מאפשרים לבצע בקשות API בו-זמנית. אם יש חמש מערכות וכל אחת מהן מגיבה תוך שתי שניות, ביצוע השלבים ברצף בתהליך עבודה יכול להימשך לפחות 10 שניות, אבל ביצוע השלבים במקביל יכול להימשך שתי שניות בלבד.

יצירת שלב מקביל

יוצרים parallel שלב כדי להגדיר חלק בתהליך העבודה שבו שני שלבים או יותר יכולים להתבצע בו-זמנית.

YAML

  - PARALLEL_STEP_NAME:
      parallel:
        exception_policy: POLICY
        shared: [VARIABLE_A, VARIABLE_B, ...]
        concurrency_limit: CONCURRENCY_LIMIT
        BRANCHES_OR_FOR:
          ...

JSON

  [
    {
      "PARALLEL_STEP_NAME": {
        "parallel": {
          "exception_policy": "POLICY",
          "shared": [
            "VARIABLE_A",
            "VARIABLE_B",
            ...
          ],
          "concurrency_limit": "CONCURRENCY_LIMIT",
          "BRANCHES_OR_FOR":
          ...
        }
      }
    }
  ]

מחליפים את מה שכתוב בשדות הבאים:

  • PARALLEL_STEP_NAME: השם של השלב המקביל.
  • POLICY (אופציונלי): קובע את הפעולה שתתבצע בענפים אחרים אם מתרחשת חריגה שלא טופלה. מדיניות ברירת המחדל, continueAll, לא תגרום לפעולה נוספת, וכל הענפים האחרים ינסו לפעול. הערה: continueAll היא המדיניות היחידה שנתמכת כרגע.
  • VARIABLE_A, VARIABLE_B וכו': רשימה של משתנים שאפשר לכתוב בהם עם היקף הורה שמאפשר הקצאות בשלב המקביל. מידע נוסף זמין במאמר בנושא משתנים משותפים.
  • CONCURRENCY_LIMIT (אופציונלי): המספר המקסימלי של הסתעפויות וחזרות שיכולות להתבצע בו-זמנית בהרצת תהליך עבודה יחיד לפני שהסתעפויות וחזרות נוספות יוכנסו לתור להמתנה. ההגדרה הזו חלה רק על שלב parallel אחד ולא על שלבים נוספים. הערך חייב להיות מספר שלם חיובי, והוא יכול להיות ערך מילולי או ביטוי. פרטים נוספים זמינים במאמר בנושא מגבלות על פעולות בו-זמניות.
  • BRANCHES_OR_FOR: משתמשים בערך branches או for כדי לציין אחת מהאפשרויות הבאות:
    • ענפים שאפשר להריץ בו-זמנית.
    • לולאה שבה איטרציות יכולות לפעול בו-זמנית.

שימו לב לנקודות הבאות:

  • ענפים מקבילים ואיטרציות יכולים לפעול בכל סדר, ויכול להיות שהסדר יהיה שונה בכל הרצה.
  • שלבים מקבילים יכולים לכלול שלבים מקבילים אחרים, מוטמעים, עד למגבלת העומק. מידע נוסף על מכסות ומגבלות
  • פרטים נוספים זמינים בדף העזרה בנושא תחביר של שלבים מקבילים.

החלפת פונקציה ניסיונית בשלב מקביל

אם אתם משתמשים ב-experimental.executions.map כדי לתמוך בעבודה מקבילה, אתם יכולים להעביר את תהליך העבודה לשימוש בשלבים מקבילים במקום זאת, ולהפעיל לולאות for רגילות במקביל. דוגמאות מופיעות במאמר החלפת פונקציה ניסיונית בשלב מקביל.

דוגמאות

בדוגמאות האלה אפשר לראות את התחביר.

ביצוע פעולות במקביל (באמצעות ענפים)

אם בתהליך העבודה יש כמה קבוצות שונות של שלבים שאפשר לבצע בו-זמנית, הצבתן בענפים מקבילים יכולה לקצר את הזמן הכולל שנדרש להשלמת השלבים האלה.

בדוגמה הבאה, מועבר User ID כארגומנט לתהליך העבודה, והנתונים מאוחזרים במקביל משני שירותים שונים. משתנים משותפים מאפשרים לכתוב ערכים בענפים ולקרוא אותם אחרי שהענפים מסיימים:

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - userProfile: {}
          - recentItems: []
    - enrichUserData:
        parallel:
          shared: [userProfile, recentItems]  # userProfile and recentItems are shared to make them writable in the branches
          branches:
            - getUserProfileBranch:
                steps:
                  - getUserProfile:
                      call: http.get
                      args:
                        url: '${"https://example.com/users/" + input.userId}'
                      result: userProfile
            - getRecentItemsBranch:
                steps:
                  - getRecentItems:
                      try:
                        call: http.get
                        args:
                          url: '${"https://example.com/items?userId=" + input.userId}'
                        result: recentItems
                      except:
                        as: e
                        steps:
                          - ignoreError:
                              assign:  # continue with an empty list if this call fails
                                - recentItems: []

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "userProfile": {}
            },
            {
              "recentItems": []
            }
          ]
        }
      },
      {
        "enrichUserData": {
          "parallel": {
            "shared": [
              "userProfile",
              "recentItems"
            ],
            "branches": [
              {
                "getUserProfileBranch": {
                  "steps": [
                    {
                      "getUserProfile": {
                        "call": "http.get",
                        "args": {
                          "url": "${\"https://example.com/users/\" + input.userId}"
                        },
                        "result": "userProfile"
                      }
                    }
                  ]
                }
              },
              {
                "getRecentItemsBranch": {
                  "steps": [
                    {
                      "getRecentItems": {
                        "try": {
                          "call": "http.get",
                          "args": {
                            "url": "${\"https://example.com/items?userId=\" + input.userId}"
                          },
                          "result": "recentItems"
                        },
                        "except": {
                          "as": "e",
                          "steps": [
                            {
                              "ignoreError": {
                                "assign": [
                                  {
                                    "recentItems": []
                                  }
                                ]
                              }
                            }
                          ]
                        }
                      }
                    }
                  ]
                }
              }
            ]
          }
        }
      }
    ]
  }
}

עיבוד פריטים במקביל (באמצעות לולאה מקבילה)

אם אתם צריכים לבצע את אותה פעולה לכל פריט ברשימה, תוכלו להשלים את ההרצה מהר יותר באמצעות לולאה מקבילה. לולאה מקבילה מאפשרת לבצע כמה איטרציות של לולאה במקביל. שימו לב: בניגוד ללולאות for רגילות, אפשר לבצע איטרציות בכל סדר.

בדוגמה הבאה, קבוצה של התראות למשתמשים מעובדת בלולאה מקבילה for:

YAML

main:
  params: [input]
  steps:
    - sendNotifications:
        parallel:
          for:
            value: notification
            in: ${input.notifications}
            steps:
              - notify:
                  call: http.post
                  args:
                    url: https://example.com/sendNotification
                    body:
                      notification: ${notification}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "sendNotifications": {
          "parallel": {
            "for": {
              "value": "notification",
              "in": "${input.notifications}",
              "steps": [
                {
                  "notify": {
                    "call": "http.post",
                    "args": {
                      "url": "https://example.com/sendNotification",
                      "body": {
                        "notification": "${notification}"
                      }
                    }
                  }
                }
              ]
            }
          }
        }
      }
    ]
  }
}

צבירת נתונים (באמצעות לולאה מקבילה)

אפשר לעבד קבוצה של פריטים תוך כדי איסוף נתונים מהפעולות שבוצעו בכל פריט. לדוגמה, יכול להיות שתרצו לעקוב אחרי המזהים של פריטים שנוצרו או לשמור רשימה של פריטים עם שגיאות.

בדוגמה הבאה, כל אחת מ-10 שאילתות נפרדות למערך נתונים ציבורי ב-BigQuery מחזירה את מספר המילים במסמך או בקבוצת מסמכים. משתנה משותף מאפשר לצבור את ספירת המילים ולקרוא אותה אחרי שכל האיטרציות מסתיימות. אחרי חישוב מספר המילים בכל המסמכים, תהליך העבודה מחזיר את המספר הכולל.

YAML

# Use a parallel loop to make ten queries to a public BigQuery dataset and
# use a shared variable to accumulate a count of words; after all iterations
# complete, return the total number of words across all documents
main:
  params: [input]
  steps:
    - init:
        assign:
          - numWords: 0
          - corpuses:
              - sonnets
              - various
              - 1kinghenryvi
              - 2kinghenryvi
              - 3kinghenryvi
              - comedyoferrors
              - kingrichardiii
              - titusandronicus
              - tamingoftheshrew
              - loveslabourslost
    - runQueries:
        parallel:  # 'numWords' is shared so it can be written within the parallel loop
          shared: [numWords]
          for:
            value: corpus
            in: ${corpuses}
            steps:
              - runQuery:
                  call: googleapis.bigquery.v2.jobs.query
                  args:
                    projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    body:
                      useLegacySql: false
                      query: ${"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` " + " WHERE corpus='" + corpus + "' "}
                  result: query
              - add:
                  assign:
                    - numWords: ${numWords + int(query.rows[0].f[0].v)}  # first result is the count
    - done:
        return: ${numWords}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "numWords": 0
            },
            {
              "corpuses": [
                "sonnets",
                "various",
                "1kinghenryvi",
                "2kinghenryvi",
                "3kinghenryvi",
                "comedyoferrors",
                "kingrichardiii",
                "titusandronicus",
                "tamingoftheshrew",
                "loveslabourslost"
              ]
            }
          ]
        }
      },
      {
        "runQueries": {
          "parallel": {
            "shared": [
              "numWords"
            ],
            "for": {
              "value": "corpus",
              "in": "${corpuses}",
              "steps": [
                {
                  "runQuery": {
                    "call": "googleapis.bigquery.v2.jobs.query",
                    "args": {
                      "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
                      "body": {
                        "useLegacySql": false,
                        "query": "${\"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` \" + \" WHERE corpus='\" + corpus + \"' \"}"
                      }
                    },
                    "result": "query"
                  }
                },
                {
                  "add": {
                    "assign": [
                      {
                        "numWords": "${numWords + int(query.rows[0].f[0].v)}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "done": {
          "return": "${numWords}"
        }
      }
    ]
  }
}

המאמרים הבאים