סטרימינג של אירועים שנשלחים מהשרת

הדף הזה רלוונטי ל-Apigee ול-Apigee Hybrid.

לעיון במסמכי התיעוד של Apigee Edge

‫Apigee תומך בסטרימינג רציף של תשובות מנקודות קצה של אירועים שנשלחים מהשרת (SSE) ללקוחות בזמן אמת. התכונה SSE ב-Apigee שימושית לטיפול בממשקי API של מודלים גדולים של שפה (LLM) שפועלים בצורה הכי יעילה על ידי סטרימינג של התשובות שלהם בחזרה ללקוח. סטרימינג של SSE מפחית את זמן האחזור, והלקוחות יכולים לקבל נתוני תשובה ברגע שהם נוצרים על ידי LLM. התכונה הזו תומכת בשימוש בסוכני AI שפועלים בסביבות בזמן אמת, כמו בוטים לשירות לקוחות או כלי תזמור של תהליכי עבודה.

כדי להשתמש ב-SSE עם Apigee, פשוט מפנים proxy ל-API ליעד עם SSE או לנקודת קצה של proxy. כדי להשיג שליטה מדויקת יותר בתגובת ה-SSE, ‏ Apigee מספק זרימת נקודות קצה מיוחדת שנקראת EventFlow. במסגרת EventFlow, אפשר להוסיף קבוצה מוגבלת של כללי מדיניות כדי לבצע פעולות בתגובת ה-SSE, כמו סינון, שינוי או טיפול בשגיאות. מידע נוסף על תהליכי proxy זמין במאמר שליטה בשרתי proxy של API באמצעות תהליכים.

יצירת proxy ל-API ל-SSE

ממשק המשתמש של Apigee מספק תבנית ליצירת שרת proxy חדש שכולל EventFlow.

בצע את השלבים הבאים כדי ליצור proxy ל-API עם תבנית EventFlow באמצעות ממשק המשתמש של Apigee:

  1. במסוף Google Cloud , נכנסים לדף Apigee > Proxy Development > API Proxies.

    מעבר אל API Proxies

  2. בחלונית API Proxies (שרתי proxy של API), לוחצים על + Create (יצירה).
  3. בחלונית Create a proxy (יצירת שרת proxy), בקטע Proxy template (תבנית שרת proxy), בוחרים באפשרות Proxy with Server-Sent Events (SSE) (שרת proxy עם אירועים שנשלחים מהשרת).
  4. בקטע פרטי שרת proxy, מזינים את הפרטים הבאים:
    • שם ה-Proxy: מזינים שם ל-Proxy, למשל myproxy.
    • נתיב בסיס: מוגדר אוטומטית לערך שמזינים עבור שם ה-Proxy. נתיב הבסיס הוא חלק מכתובת ה-URL שמשמשת לשליחת בקשות ל-API. מערכת Apigee משתמשת בכתובת ה-URL כדי להתאים בקשות נכנסות ולנתב אותן ל-proxy ל-API המתאים.
    • תיאור (אופציונלי): מזינים תיאור ל-proxy ל-API החדש, למשל "בדיקת Apigee באמצעות proxy פשוט".
    • יעד (API קיים): מזינים את כתובת ה-URL של יעד ה-SSE עבור ה-proxy ל-API. לדוגמה: https://mocktarget.apigee.net/sse-events/5
    • לוחצים על הבא.
  5. פריסה (אופציונלי):
    • סביבות פריסה: אופציונלי. משתמשים בתיבות הסימון כדי לבחור סביבה אחת או יותר שבהן רוצים לפרוס את ה-proxy. אם אתם לא רוצים לפרוס את ה-proxy בשלב הזה, אתם יכולים להשאיר את השדה סביבות פריסה ריק. תמיד אפשר לפרוס את ה-proxy מאוחר יותר.
    • חשבון שירות: אופציונלי. חשבון שירות עבור ה-proxy. חשבון השירות מייצג את הזהות של ה-proxy שנפרס, וקובע אילו הרשאות יש לו. זו תכונה מתקדמת, ולצורך המדריך הזה אפשר להתעלם ממנה.

    שרתי proxy ל-API שנפרסו עם הגדרת EventFlow יחויבו כשרתי proxy ניתנים להרחבה.

  6. לוחצים על יצירה.

אפשר גם לעיין במאמר יצירת proxy ל-API פשוט.

הגדרת EventFlow

כדי להשיג שליטה מדויקת יותר בתגובת ה-SSE,‏ Apigee מספקת זרימת נקודות קצה מיוחדת שנקראת EventFlow. בהקשר של EventFlow, אפשר להוסיף קבוצה מוגבלת של מדיניות, שמפורטת בהמשך, כדי לשנות את תגובת ה-SSE לפני שהיא מועברת חזרה ללקוח. מידע נוסף על זרימות של שרתי proxy זמין במאמר שליטה בשרתי proxy של API באמצעות זרימות.

מיקום של EventFlow

ל-EventFlow יש שני מאפיינים:

  • name: שם לזיהוי רצף הפעולות.
  • content-type: הערך של המאפיין הזה חייב להיות text/event-stream.

אפשר לעיין גם במאמר הפניה להגדרת תהליך.

אפשר להציב EventFlow בתוך הגדרה של TargetEndpoint או ProxyEndpoint, כמו בדוגמאות הקוד הבאות:

<ProxyEndpoint>

<ProxyEndpoint name="default">
  <Description/>
  <FaultRules/>
  <PreFlow name="PreFlow">
    <Request/>
    <Response/>
  </PreFlow>
  <PostFlow name="PostFlow">
    <Request/>
    <Response/>
  </PostFlow>
  <Flows/>
  <EventFlow name="EventFlow" content-type="text/event-stream">
    <Response/>
  </EventFlow>
  <HTTPProxyConnection>
    <Properties/>
    <URL>https://httpbin.org/sse</URL>
  </HTTPProxyConnection>
</ProxyEndpoint>

<TargetEndpoint>

<TargetEndpoint name="default">
  <Description/>
  <FaultRules/>
  <PreFlow name="PreFlow">
    <Request/>
    <Response/>
  </PreFlow>
  <PostFlow name="PostFlow">
    <Request/>
    <Response/>
  </PostFlow>
  <Flows/>
  <EventFlow name="EventFlow" content-type="text/event-stream">
    <Response/>
  </EventFlow>
  <HTTPTargetConnection>
    <Properties/>
    <URL>https://httpbin.org/sse</URL>
  </HTTPTargetConnection>
</TargetEndpoint>

חשוב גם לציין שאף על פי שאפשר להוסיף EventFlow ל-TargetEndpoint, ל-ProxyEndpoint או לשניהם, רק EventFlow אחד מבוצע.

בטבלה הבאה מוצגות הפעולות של קטעי EventFlow על סמך מיקום נקודת הקצה:

ProxyEndpoint TargetEndpoint השימוש ב-EventFlow
‫EventFlow ב-ProxyEndpoint ‫EventFlow ב-TargetEndpoint ‫EventFlow ב-TargetEndpoint
לא EventFlow ‫EventFlow ב-TargetEndpoint ‫EventFlow ב-TargetEndpoint
‫EventFlow ב-ProxyEndpoint לא EventFlow ‫EventFlow ב-ProxyEndpoint

הוספת מדיניות ל-EventFlow

אפשר להוסיף עד ארבעה כללי מדיניות לרכיב Response של EventFlow. כמו בכל התהליכים, כללי המדיניות מופעלים לפי הסדר שבו הם נוספו, ואפשר להוסיף שלבים מותנים כדי לשלוט בהפעלה שלהם. חשוב לציין שסוגי כללי המדיניות שאפשר להוסיף ל-EventFlow מוגבלים לסוגים הבאים. אסור להוסיף ל-EventFlow סוגים אחרים של כללי מדיניות:

אפשר לעיין גם במאמרים בנושא צירוף והגדרה של מדיניות בממשק המשתמש וצירוף והגדרה של מדיניות בקובצי XML.

בדוגמאות הבאות מוצג EventFlow עם שלב מדיניות RaiseFault מותנה שנוסף:

<ProxyEndpoint>

<ProxyEndpoint name="default">
  <EventFlow content-type="text/event-stream">
    <Response>
      <Step>
        <Name>Raise-Fault-Cred-Invalid</Name>
        <Condition>fault.name equals "invalid_access_token"</Condition>
      </Step>
    </Response>
  </EventFlow>
  <HTTPProxyConnection>
</ProxyEndpoint></pre>

<TargetEndpoint>

<TargetEndpoint name="default">
  <EventFlow content-type="text/event-stream">
    <Response>
      <Step>
        <Name>Raise-Fault-Cred-Invalid</Name>
        <Condition>fault.name equals "invalid_access_token"</Condition>
      </Step>
    </Response>
  </EventFlow>
  <HTTPTargetConnection>
</TargetEndpoint></pre>

EventFlow עוד דוגמאות לקוד זמינות בקטע תרחישים לדוגמה ודוגמאות לשימוש ב-EventFlow.

משתנים בתהליך

EventFlow מאכלס שלושה משתני זרימת תשובות. שימו לב שאפשר להשתמש במשתנים האלה רק בהקשר של האירוע הנוכחי שעובר עיבוד ב-EventFlow. לגישה למשתנים האלה או להגדרתם מחוץ להיקף EventFlow אין השפעה. הם רלוונטיים רק בהקשר של EventFlow.

  • response.event.current.content: מחרוזת שמכילה את התגובה המלאה של האירוע הנוכחי. מערכת Apigee לא מנתחת את המחרוזת בשום צורה. הוא מכיל את התשובה המלאה ללא שינוי, כולל כל שדות הנתונים.
  • response.event.current.data: מחרוזת שמכילה את מטען הנתונים של האירוע הנוכחי. אפשר לשנות את המשתנה הזה ב-EventFlow כדי לשנות את מטען הנתונים שנשלח ללקוח.
  • response.event.current.count: סופר באופן מצטבר את מספר אירועי התגובה שנשלחו. הערך הזה מתעדכן לכל אירוע שמתקבל. הערך יהיה 1 לאירוע הראשון, והוא יגדל לאירועים הבאים.

אפשר גם לעיין במאמר הפניה למשתנה של זרימת נתונים.

תרחישי שימוש ודוגמאות ל-EventFlow

בדוגמאות הבאות מוצגות דרכים להטמיע תרחישי שימוש נפוצים בפרוקסי של SSE:

שינוי תשובה של SSE

בדוגמה הזו מוצג איך להסיר נתונים מתגובת SSE EventFlow לפני שמחזירים אותה ללקוח. התוכן של תגובת ה-SSE מאוחסן במשתנה של תהליך בשם response.event.current.content. במקרה הזה, אנחנו משתמשים במדיניות JavaScript כדי לאחזר את הערך של משתנה הזרימה, לנתח אותו ולשנות אותו. מידע נוסף זמין במאמר בנושא משתני זרימה.

  1. יוצרים שרת proxy חדש באמצעות תבנית ה-proxy של SSE. מידע נוסף זמין במאמר יצירת proxy ל-API עם אירועים שנשלחים מהשרת (SSE).
  2. פותחים את ה-proxy בכלי לעריכת proxy של Apigee ולוחצים על הכרטיסייה Develop (פיתוח).
  3. יוצרים מדיניות JavaScript חדשה עם ההגדרה הבאה. בדוגמה הזו, קוד ה-JavaScript נכלל ישירות במדיניות. אפשרות נוספת להגדרת המדיניות היא להוסיף את קוד ה-JavaScript לקובץ משאבים.
    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <Javascript continueOnError="false" enabled="true" timeLimit="200" name="js-update-resp">
      <DisplayName>js-update-resp</DisplayName>
      <Properties/>
      <Source>
        var event = JSON.parse(context.getVariable("response.event.current.content"));
        event.modelVersion = null;
        context.setVariable("response.event.current.content",JSON.stringify(event));
      </Source>
    </Javascript>
  4. מוסיפים את מדיניות JavaScript לתג EventFlow של ה-proxy. ה-EventFlow מצורף ל-TargetEndpoint או ל-ProxyEndpoint שמוגדרים כברירת מחדל. בדוגמה הזו נעשה שימוש ב-Gemini API ב-Vertex AI כדי ליצור תוכן.

    <ProxyEndpoint>

    <ProxyEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>js-update-resp</Name>
          </Step>
        </Response>
      </EventFlow>
      <HTTPProxyConnection>
        <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:streamGenerateContent?key=GEMINI_API_KEY&alt=sse</URL>
      </HTTPProxyConnection>
    </ProxyEndpoint>
    

    <TargetEndpoint>

    <TargetEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>js-update-resp</Name>
          </Step>
        </Response>
      </EventFlow>
      <HTTPTargetConnection>
        <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:streamGenerateContent?key=GEMINI_API_KEY&alt=sse</URL>
      </HTTPTargetConnection>
    </TargetEndpoint>
    
  5. שומרים את ה-proxy ומפעילים אותו.
  6. מתקשרים לשרת ה-proxy שפרסתם:
    curl -X POST -H 'Content-Type: application/json'  \
      "https://YOUR_APIGEE_ENVIRONMENT_GROUP_HOSTNAME/YOUR_API_PATH" \
      -d '{ "contents":[{"parts":[{"text": "Write a story about a magic pen."}]}]}'

    הצגת דוגמה לתשובה

    זוהי דוגמה לתשובה בלי הפעלת סינון. שימו לב שהתשובה כוללת את המאפיין modelVersion": "gemini-2.5-flash".

    data: {
        "candidates": [
          {
            "content": {
              "parts": [
                {
                  "text": "ara found the pen tucked away in a dusty antique shop, nestled amongst chipped tea"
                }
              ],
              "role": "model"
            }
          }
        ],
        "usageMetadata": {
          "promptTokenCount": 8,
          "totalTokenCount": 8
        },
        "modelVersion": "gemini-2.5-flash"
      }

    זו דוגמה נוספת לתשובה עם מדיניות JavaScript. המאפיין modelVersion הוסר.

    data: {
        "candidates": [
          {
            "content": {
              "parts": [
                {
                  "text": " the fantastical creatures of her imagination.  The quiet beauty of a simple life was a magic all its own.\n"
                }
              ],
              "role": "model"
            },
            "finishReason": "STOP"
          }
        ],
        "usageMetadata": {
          "promptTokenCount": 8,
          "candidatesTokenCount": 601,
          "totalTokenCount": 609,
          "promptTokensDetails": [
            {
              "modality": "TEXT",
              "tokenCount": 8
            }
          ],
          "candidatesTokensDetails": [
            {
              "modality": "TEXT",
              "tokenCount": 601
            }
          ]
        }
      }

סינון של תשובת SSE

בדוגמה הזו מוצג סינון של נתונים מתגובת SSE לפני החזרתם ללקוח. במקרה הזה, אנחנו מסננים את נתוני האירועים מהתגובה באמצעות מדיניות JavaScript. המדיניות מנתחת את תגובת האירוע ל-JSON, משנה את ה-JSON כדי להסיר את נתוני האירוע, ואז שולחת את נתוני התגובה ששונו חזרה ללקוח.

כמו בדוגמה הקודמת, בדוגמה הזו מאחזרים את הערך של משתנה הזרימה response.event.current.content, מנתחים אותו ל-JSON ואז מחילים לוגיקה כדי להטמיע את הסינון הרצוי.

  1. יוצרים שרת proxy חדש באמצעות תבנית ה-proxy של SSE. מידע נוסף זמין במאמר יצירת proxy ל-API עם אירועים שנשלחים מהשרת (SSE).
  2. פותחים את ה-proxy בכלי לעריכת proxy של Apigee ולוחצים על הכרטיסייה Develop (פיתוח).
  3. יוצרים מדיניות JavaScript חדשה עם ההגדרה הבאה. בדוגמה הזו, קוד ה-JavaScript נכלל ישירות במדיניות. אפשרות נוספת להגדרת המדיניות היא להוסיף את קוד ה-JavaScript לקובץ משאבים.
    <Javascript continueOnError="false" enabled="true" timeLimit="200" name="js-filter-resp">
      <DisplayName>js-filter-resp</DisplayName>
      <Properties/>
      <Source>
        var event = JSON.parse(context.getVariable("response.event.current.content"));
        if("error" in event){
          // Do not send event to customer
          context.setVariable("response.event.current.content", "");
        }
      </Source>
    </Javascript>
  4. מוסיפים את מדיניות JavaScript לתג EventFlow של ה-proxy. ה-EventFlow מצורף ל-TargetEndpoint או ל-ProxyEndpoint שמוגדרים כברירת מחדל. בדוגמה הזו נעשה שימוש ב-Gemini API ב-Vertex AI כדי ליצור תוכן.

    <ProxyEndpoint>

    <ProxyEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>js-filter-resp</Name>
          </Step>
        </Response>
      </EventFlow>
      <HTTPProxyConnection>
    	  <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:streamGenerateContent?key=GEMINI_API_KEY&alt=sse	</URL>
      </HTTPProxyConnection>
    </ProxyEndpoint>
    

    <TargetEndpoint>

    <TargetEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>js-filter-resp</Name>
          </Step>
        </Response>
      </EventFlow>
      <HTTPTargetConnection>
    	  <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:streamGenerateContent?key=GEMINI_API_KEY&alt=sse	</URL>
      </HTTPTargetConnection>
    </TargetEndpoint>
    
  5. שומרים את ה-proxy ומפעילים אותו.
  6. מתקשרים לשרת ה-proxy שפרסתם:
    curl -X POST -H 'Content-Type: application/json'  \
        "https://YOUR_APIGEE_ENVIRONMENT_GROUP_HOSTNAME/YOUR_API_PATH" \
        -d '{ "contents":[{"parts":[{"text": "Write a story about a magic pen."}]}]}'

    הצגת דוגמה לתשובה

    זוהי דוגמה לאופן שבו התשובה עשויה להיראות בלי להחיל סינון כלשהו. שימו לב שהוא כולל נתוני שגיאות:

    data: {
        "candidates": [
          {
            "content": {
              "parts": [
                {
                  "text": "El"
                }
              ],
              "role": "model"
            }
          }
        ],
        "usageMetadata": {
          "promptTokenCount": 8,
          "totalTokenCount": 8
        },
        "modelVersion": "gemini-2.5-flash"
      }
        data: {
        "error": "Service temporarily unavailable. We are experiencing high traffic.",
        "modelVersion": "gemini-2.5-flash"
        }

    זוהי דוגמה נוספת לתשובה אחרי החלת סינון, כשהודעת השגיאה הוסרה.

    data: {
      "candidates": [
        {
          "content": {
            "parts": [
              {
                "text": "El"
              }
            ],
            "role": "model"
          }
        }
      ],
      "usageMetadata": {
        "promptTokenCount": 8,
        "totalTokenCount": 8
      },
      "modelVersion": "gemini-2.5-flash"
    }
    data: {
      "candidates": [
        {
          "content": {
            "parts": [
              {
                "text": "ara found the pen tucked away in a dusty antique shop, nestled amongst chipped tea"
              }
            ],
            "role": "model"
          }
        }
      ],
      "usageMetadata": {
        "promptTokenCount": 8,
        "totalTokenCount": 8
      },
      "modelVersion": "gemini-2.5-flash"
    }

שליחת אירוע SSE למערכת חיצונית

בדוגמה הזו, אנחנו מצרפים את מדיניות PublishMessage של Apigee אל EventFlow כדי לשלוח אירוע SSE אל נושא Pub/Sub.

  1. יוצרים שרת proxy חדש באמצעות תבנית ה-proxy של SSE. מידע נוסף זמין במאמר יצירת proxy ל-API עם אירועים שנשלחים מהשרת (SSE).
  2. פותחים את ה-proxy בכלי לעריכת proxy של Apigee ולוחצים על הכרטיסייה Develop (פיתוח).
  3. יוצרים מדיניות PublishMessage חדשה עם ההגדרה הבאה:
    <PublishMessage continueOnError="false" enabled="true" name="PM-record-event">
      <DisplayName>PM-record-event</DisplayName>
      <Source>{response.event.current.content}</Source>
      <CloudPubSub>
        <Topic>projects/<customer_project>/topics/<topic_name></Topic>
      </CloudPubSub>
    </PublishMessage>
  4. מוסיפים את מדיניות PublishMessage כשלב ב-EventFlow של proxy ל-API.

    <ProxyEndpoint>

    <ProxyEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>PM-record-event</Name>
          </Step>
        </Response>
      </EventFlow>
      <HTTPProxyConnection>
    </ProxyEndpoint>

    <TargetEndpoint>

    <TargetEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>PM-record-event</Name>
          </Step>
        </Response>
      </EventFlow>
      <HTTPTargetConnection>
    </TargetEndpoint>
  5. פריסה ובדיקה של ה-proxy ל-API.
  6. אחרי שמוסיפים את התוכן שנוצר לנושא Pub/Sub, אפשר למשל ליצור פונקציית Cloud Run לעיבוד הודעות מהנושא.

שימוש במדיניות הגנה מוגברת על המודל של Apigee ב-EventFlow

אתם יכולים להשתמש במדיניות SanitizeModelResponse כדי לבצע סניטציה של אירועים נכנסים שנשלחים מהשרת ב-EventFlow. המדיניות הזו מגנה על אפליקציות ה-AI שלכם על ידי סניטציה של תשובות ממודלים גדולים של שפה (LLM). למידע על הגנה מוגברת על המודל, אפשר לעיין במאמר סקירה כללית של הגנה מוגברת על המודל. למידע על מדיניות Apigee הגנה מוגברת על המודל, אפשר לעיין במאמר תחילת העבודה עם מדיניות Apigee הגנה מוגברת על המודל.

  1. יוצרים שרת proxy חדש באמצעות תבנית ה-proxy של SSE. מידע נוסף זמין במאמר יצירת proxy ל-API עם אירועים שנשלחים מהשרת (SSE).
  2. פותחים את ה-proxy בכלי לעריכת proxy של Apigee ולוחצים על הכרטיסייה Develop (פיתוח).
  3. יוצרים מדיניות חדשה של SanitizeModelResponse עם ההגדרה הבאה:
      <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
      <SanitizeModelResponse async="false" continueOnError="false" enabled="true" name="SMR-modelresponse">
        <IgnoreUnresolvedVariables>true</IgnoreUnresolvedVariables>
        <DisplayName>SMR-modelresponse</DisplayName>
        <ModelArmor>
          <TemplateName>projects/{project}/locations/{location}/templates/{template-name}</TemplateName>
        </ModelArmor>
        <LLMResponseSource>{response_partial}</LLMResponseSource>
        <!-- Use the below settings if you want to call a Model Armor policy on every event -->
        <LLMResponseSource>{response.event.current.content}</LLMResponseSource>
      </SanitizeModelResponse>
  4. (אופציונלי) מוסיפים מדיניות JavaScript לאירועים של קבוצה לפני ששולחים אותם למדיניות הגנה מוגברת על המודל של Apigee.
    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <Javascript continueOnError="false" enabled="true" timeLimit="200" name="JS-combine-resp">
      <DisplayName>JS-combine-events</DisplayName>
      <Properties/>
      <Source>
        var eventText = JSON.parse(context.getVariable("response.event.current.content").substring(5)).candidates[0].content.parts[0].text;
        var finishReason = JSON.parse(context.getVariable("response.event.current.content").substring(5)).candidates[0].finishReason;
        var idx = context.getVariable("response.event.current.count");
        if(idx%5==0 || finishReason=="STOP") {
          context.setVariable("response_partial", context.getVariable("tmp_buffer_pre"));
          context.setVariable("buff_ready", true);
          context.setVariable("tmp_buffer_pre", "");
        } else {
          context.setVariable("buff_ready", false);
          context.setVariable("response_partial", "");
          var previousBufferVal = context.getVariable("tmp_buffer_pre");
          if(previousBufferVal) {
            context.setVariable("tmp_buffer_pre", previousBufferVal+eventText);
          } else {
            context.setVariable("tmp_buffer_pre", eventText);
          }
        }
      </Source>
    </Javascript>
  5. מוסיפים את מדיניות JavaScript ו-ModelArmor לשלב בEventFlow של ה-proxy:
    <EventFlow name="EventFlow" content-type="text/event-stream">
      <Request/>
      <Response>
        <Step>
          <Name>JS-combine-resp</Name>
        </Step>
        <Step>
          <!-- Remove below Condition if you want to call model armor policy on every event -->
          <Condition> buff_ready = true </Condition>
          <Name>SMR-modelresponse</Name>
        </Step>
      </Response>
    </EventFlow>
  6. פריסה ובדיקה של ה-proxy ל-API.

טיפול בשגיאות ב-EventFlow

כברירת מחדל, מקור האירועים מסתיים כשמתרחשת תקלה. עם זאת, אם רוצים לבצע ניפוי באגים נוסף, אפשר לשלוח מידע על התקלה אל Cloud Logging, כמו בדוגמה הזו.

  1. יוצרים שרת proxy חדש באמצעות תבנית ה-proxy של SSE. מידע נוסף זמין במאמר יצירת proxy ל-API עם אירועים שנשלחים מהשרת (SSE).
  2. פותחים את ה-proxy בכלי לעריכת proxy של Apigee ולוחצים על הכרטיסייה Develop (פיתוח).
  3. יוצרים מדיניות חדשה של RaiseFault עם ההגדרה הבאה:
    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <RaiseFault continueOnError="false" enabled="true" name="RF-Empty-Event">
      <DisplayName>RF-Empty-Event</DisplayName>
      <Properties/>
      <FaultResponse>
        <AssignVariable>
          <Name>faultReason</Name>
          <Value>empty-event</Value>
        </AssignVariable>
      </FaultResponse>
      <IgnoreUnresolvedVariables>true</IgnoreUnresolvedVariables>
    </RaiseFault>
  4. מצרפים את מדיניות RaiseFault ל-EventFlow של שרת ה-proxy של SSE:
    <EventFlow content-type="text/event-stream">
      <Response>
        <Step>
          <Name>RF-Empty-Event</Name>
          <Condition>response.event.current.content ~ "data: "</Condition>
        </Step>
      </Response>
    </EventFlow>
  5. יוצרים מדיניות MessageLogging כדי לרשום שגיאות ביומן. לדוגמה:
    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <MessageLogging continueOnError="false" enabled="true" name="ML-log-error">
      <DisplayName>ML-log-error</DisplayName>
      <CloudLogging>
        <LogName>projects/{organization.name}/logs/apigee_errors</LogName>
        <Message contentType="text/plain">Request failed due to {faultReason}.</Message>
        <ResourceType>api</ResourceType>
      </CloudLogging>
      <logLevel>ALERT</logLevel>
    </MessageLogging>
  6. מוסיפים את מדיניות MessageLogging ל-FaultRules של נקודת הקצה של היעד או ה-proxy:

    <TargetEndpoint>

    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <TargetEndpoint name="TargetEndpoint-1">
      <Description/>
      <FaultRules>
        <FaultRule name="default-fault">
          <Step>
            <Name>ML-log-error</Name>
          </Step>
        </FaultRule>
      </FaultRules>
      ...
    </TargetEndpoint>

    <ProxyEndpoint>

    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <ProxyEndpoint name="ProxyEndpoint-1">
      <Description/>
      <FaultRules>
        <FaultRule name="default-fault">
          <Step>
            <Name>ML-log-error</Name>
          </Step>
        </FaultRule>
      </FaultRules>
      ...
    </ProxyEndpoint>
  7. פריסה ובדיקה של ה-proxy ל-API.

הפצת שגיאות בתהליך EventFlow

בדוגמה הזו אנחנו מראים לכם איך להשתמש ב-EventFlow כדי להעביר שגיאות של תקלות ללקוח. התהליך נועד להודיע ללקוח על שגיאות באופן מיידי במהלך ביצוע המדיניות.

  1. יוצרים שרת proxy חדש באמצעות תבנית ה-proxy של SSE. מידע נוסף זמין במאמר יצירת proxy ל-API עם אירועים שנשלחים מהשרת (SSE).
  2. פותחים את ה-proxy בכלי לעריכת proxy של Apigee ולוחצים על הכרטיסייה Develop (פיתוח).
  3. יוצרים מדיניות חדשה של JavaScript עם ההגדרה הבאה:
    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
      <Javascript continueOnError="false" enabled="true" timeLimit="200" name="js-error">
        <DisplayName>js-error</DisplayName>
        <Properties/>
        <Source>
          if(context.getVariable("response.event.current.count")=="2") {
            throw new Error("Internal Error");
          }
          context.setVariable("response.event.current.content", context.getVariable("response.event.current.content"));
        </Source>
      </Javascript>

    המדיניות הזו נועדה להקפיץ הודעת שגיאה (throw) כשמתקיים תנאי מסוים.

  4. מצרפים את מדיניות JavaScript ל-EventFlow של שרת ה-proxy של SSE בהגדרות TargetEndpoint או ProxyEndpoint. בשלב הזה מוודאים שהמדיניות מעובדת על ידי EventFlow במהלך הטיפול בתגובה:

    <TargetEndpoint>

    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <TargetEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>js-error</Name>
          </Step>
        </Response>
      </EventFlow>
      <HTTPTargetConnection>
        <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:streamGenerateContent</URL>
      </HTTPTargetConnection>
    </TargetEndpoint>

    <ProxyEndpoint>

    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <ProxyEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>js-error</Name>
          </Step>
        </Response>
      </EventFlow>
      <HTTPProxyConnection>
        <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:streamGenerateContent</URL>
      </HTTPProxyConnection>
    </ProxyEndpoint>
  5. פורסים את ה-proxy ל-API.
  6. כדי לבדוק את התנהגות ה-proxy, משתמשים בפקודה curl הבאה:
    curl -X POST -H 'Content-Type: application/json'  "https://ENVIRONMENT_GROUP_NAME/llm-api" -d '{ "contents":[{"parts":[{"text": "Write a story about a magic pen."}]}]}'

    מחליפים את ENVIRONMENT_GROUP_NAME בשם של קבוצת הסביבות.

    הפלט אמור להיראות כמו בדוגמה הבאה:

    data: {"candidates": [{"content": {"parts": [{"text": "El"}],"role": "model"}}],"usageMetadata": {"promptTokenCount": 8,"totalTokenCount": 8},"modelVersion": "gemini-2.5-flash"}
    data: {"fault":{"faultstring":"Execution of JS-error failed with error: Exception thrown from JavaScript : Error: Internal Error (Resource_1_js#2)","detail":{"errorcode":"steps.javascript.ScriptExecutionFailed"}}}

    הפלט מציג את מקור הנתונים הראשוני ואחריו הודעה של fault. במקרה של שגיאה, Apigee מתעד את פרטי התקלה ושולח אותם ללקוח כאירוע.

מידע נוסף על טיפול בשגיאות ב-Apigee זמין במאמר בנושא טיפול בשגיאות.

שימוש ב-DataCapture לאיסוף נתוני ספירת טוקנים ב-Analytics

בדוגמה הזו אפשר לראות איך משתמשים במדיניות DataCapture בתוך EventFlow כדי לאסוף ספירות של טוקנים מאירועי SSE לשימוש בניתוח נתונים ב-Apigee.

  1. יוצרים כלים לאיסוף נתונים של ספירת טוקנים. מריצים את הפקודות הבאות כדי ליצור את כלי האיסוף הנדרשים בארגון:
    curl -X POST "https://apigee.googleapis.com/v1/organizations/ORG_NAME/datacollectors" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -H "Content-Type: application/json" \
      -d '{
        "name": "dc_tokenCount",
        "description": "Collects token count",
        "type": "INTEGER"
      }'
    curl -X POST "https://apigee.googleapis.com/v1/organizations/ORG_NAME/datacollectors" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -H "Content-Type: application/json" \
      -d '{
        "name": "dc_thoughtsTokenCount",
        "description": "Collects thought token count",
        "type": "INTEGER"
      }'
  2. יוצרים שרת proxy חדש באמצעות תבנית ה-proxy של SSE. מידע נוסף זמין במאמר יצירת proxy ל-API עם אירועים שנשלחים מהשרת (SSE).
  3. פותחים את ה-proxy בכלי לעריכת proxy של Apigee ולוחצים על הכרטיסייה Develop (פיתוח).
  4. יוצרים מדיניות JavaScript חדשה כדי לנתח את ספירת הטוקנים מנתוני האירועים של SSE. בכלי לעריכת שרתי proxy של Apigee, לוחצים על + לצד Policies, בוחרים באפשרות JavaScript כסוג המדיניות ומזינים שם למדיניות (לדוגמה, JS-parse-token). קובץ ה-XML של המדיניות שיתקבל צריך להיראות כך:
    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <Javascript continueOnError="false" enabled="true" timeLimit="200" name="JS-parse-token">
      <DisplayName>JS-parse-token</DisplayName>
      <Properties/>
      <Source>
        const event=context.getVariable('response.event.current.content')
        try {
          const jsonString = event.replace(/^data:\s*/, '').trim();
          const parsed = JSON.parse(jsonString);
          const usage = parsed.usageMetadata || {};
          context.setVariable("thoughtsToken", usage.thoughtsTokenCount);
          context.setVariable("totalToken", usage.totalTokenCount);
        } catch (e) {
          context.setVariable("js_error_occurred", true);
        }
      </Source>
    </Javascript>
  5. יוצרים מדיניות חדשה של DataCapture כדי לתעד את ספירת הטוקנים. בכלי לעריכת שרתי proxy של Apigee, לוחצים על + לצד Policies, בוחרים באפשרות DataCapture כסוג המדיניות ומזינים שם למדיניות (לדוגמה, DC-capture-tokencount). קובץ ה-XML של המדיניות שנוצר צריך להיראות כך:
    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <DataCapture name="DC-capture-tokencount" continueOnError="false" enabled="true">
      <DisplayName>DC-capture-tokencount</DisplayName>
      <IgnoreUnresolvedVariables>false</IgnoreUnresolvedVariables>
      <Capture>
        <Collect ref="thoughtsToken" default="-1"/>
        <DataCollector>dc_thoughtsTokenCount</DataCollector>
      </Capture>
      <Capture>
        <Collect ref="totalToken" default="-1"/>
        <DataCollector>dc_tokenCount</DataCollector>
      </Capture>
    </DataCapture>
  6. מצרפים את מדיניות JavaScript קודם, ואחריה את מדיניות DataCapture, בתוך EventFlow של ה-proxy של SSE בתוך ההגדרה TargetEndpoint או ProxyEndpoint. השלב הזה מבטיח ש-EventFlow יעבד את כללי המדיניות במהלך הטיפול בתגובה. קובץ ה-XML שיתקבל צריך להיראות כך:

    <TargetEndpoint>

    <TargetEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>JS-parse-token</Name>
          </Step>
          <Step>
            <Name>DC-capture-tokencount</Name>
          </Step>
        </Response>
      </EventFlow>
      <HTTPTargetConnection>
        <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:streamGenerateContent?key=GEMINI_API_KEY&alt=sse</URL>
      </HTTPTargetConnection>
    </TargetEndpoint>

    <ProxyEndpoint>

    <ProxyEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>JS-parse-token</Name>
          </Step>
          <Step>
            <Name>DC-capture-tokencount</Name>
          </Step>
        </Response>
      </EventFlow>
      <HTTPProxyConnection>
        <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:streamGenerateContent?key=GEMINI_API_KEY&alt=sse</URL>
      </HTTPProxyConnection>
    </ProxyEndpoint>
  7. שומרים ומפעילים את ה-proxy. פרטים נוספים מופיעים במאמר בנושא פריסת פרוקסי של API.
  8. שליחת בקשות בדיקה לשרת ה-proxy. אחרי שה-proxy מעבד את התגובות של SSE, מספר האסימונים שתועד זמין בניתוח הנתונים של Apigee באמצעות אוספי הנתונים dc_tokenCount ו-dc_thoughtsTokenCount.

צפייה בנתוני SSE בניתוח הנתונים של Apigee

הנתונים של שרתי proxy של SSE מוצגים בניתוח הנתונים של Apigee כצפוי עבור כל proxy ל-API. במסוף Cloud, עוברים אל ניתוח נתונים > מדדי API.

ניפוי באגים בשרתי proxy של SSE

אפשר להשתמש בכלי לניפוי באגים של Apigee כדי לנפות באגים בשרתי proxy של SSE. נתוני ניפוי הבאגים נשמרים עבור EventFlow בדיוק כמו עבור סוגי התהליכים האחרים.

פתרון בעיות

כדי לפתור בעיות שקשורות לתנועה בזמן אמת, כדאי לבדוק את יומני הגישה של Apigee כדי לזהות את הגורם לבעיה.

מגבלות

המגבלות הבאות חלות על שרתי proxy של SSE:

  • אם המדיניות DataCapture מוחלת על כל אירוע, רק הערך האחרון יתועד.
  • אפשר להשאיר חיבור SSE פתוח ללא הגבלת זמן, אבל משך החיבור מוגבל בסופו של דבר על ידי הגדרת הזמן הקצוב לתפוגה של מאזן העומסים במורד הזרם של Apigee. כברירת מחדל, הזמן הקצוב לתפוגה של מאזן העומסים של Apigee מוגדר ל-30 שניות. כדי להאריך את משך החיבור, מומלץ להגדיל את הזמן הקצוב לתפוגה או ליצור שירות לקצה העורפי נפרד עם ערך גבוה יותר של זמן קצוב לתפוגה כדי לטפל בתנועת הנתונים של SSE.
  • נתוני הניתוח נרשמים אחרי סגירת סשן ה-SSE, ולכן יכול להיות שתבחינו בעיכוב מסוים בדיווח של נתוני הניתוח.
  • תקלות בתוך EventFlow גורמות ליציאה מיידית מהסטרים ולשגיאה. למידע על רישום שגיאות כאלה באופן ידני או על שליחתן ללקוח, אפשר לעיין במאמר תרחישי שימוש ב-EventFlow ודוגמאות.
  • לקוח שמקבל תגובות SSE בסטרימינג יקבל את הכותרות HTTP, כולל קודי סטטוס, בתחילת סטרימינג האירועים. כתוצאה מכך, אם זרם האירועים עובר למצב שגיאה, קוד הסטטוס שמתקבל בהתחלה לא ישקף את מצב השגיאה.

    אפשר לראות את המגבלה הזו כשמציגים סשן ניפוי באגים. במהלך הסשן, יכול להיות שתבחינו שHTTP קוד הסטטוס של הסטרימינג שמופיע במצב השגיאה שונה מקודי הסטטוס שנשלחים ללקוח. זה יכול לקרות כי רשומת סשן הניפוי באגים נוצרת אחרי שכל הבקשה עובדה, ולא בתחילת זרם האירועים. יכול להיות שבסשן הניפוי באגים יוצג קוד השגיאה שנוצר, בעוד שהלקוח יראה בהתחלה רק את הסטטוס 2xx שהתקבל בכותרות.