סטרימינג של אירועים שנשלחים מהשרת

הדף הזה רלוונטי ל-Apigee ול-Apigee Hybrid.

לעיון במסמכי התיעוד של Apigee Edge

‫Apigee תומך בסטרימינג רציף של תשובות מנקודות קצה של אירועים שנשלחים מהשרת (SSE) ללקוחות בזמן אמת. התכונה SSE ב-Apigee שימושית לטיפול בממשקי API של מודלים גדולים של שפה (LLM) שפועלים בצורה הכי יעילה על ידי סטרימינג של התשובות שלהם בחזרה ללקוח. סטרימינג של SSE מפחית את זמן האחזור, והלקוחות יכולים לקבל נתוני תשובה ברגע שהם נוצרים על ידי LLM. התכונה הזו תומכת בשימוש בסוכני AI שפועלים בסביבות בזמן אמת, כמו בוטים לשירות לקוחות או כלי תזמור של תהליכי עבודה.

כדי להשתמש ב-SSE עם Apigee, פשוט מפנים proxy ל-API ליעד עם SSE או לנקודת קצה של proxy. כדי להשיג שליטה מדויקת יותר בתגובת ה-SSE, ‏ Apigee מספק זרימת נקודת קצה מיוחדת שנקראת EventFlow. במסגרת EventFlow, אפשר להוסיף קבוצה מוגבלת של כללי מדיניות כדי לבצע פעולות בתגובת ה-SSE, כמו סינון, שינוי או טיפול בשגיאות. מידע נוסף על תהליכי proxy זמין במאמר שליטה בשרתי proxy של API באמצעות תהליכים.

יצירת proxy ל-API ל-SSE

ממשק המשתמש של Apigee מספק תבנית ליצירת שרת proxy חדש שכולל EventFlow.

בצע את השלבים הבאים כדי ליצור proxy ל-API עם תבנית EventFlow באמצעות ממשק המשתמש של Apigee:

  1. במסוף Google Cloud , נכנסים לדף Apigee > Proxy Development > API Proxies.

    מעבר אל API Proxies

  2. בחלונית API Proxies (שרתי proxy של API), לוחצים על + Create (יצירה).
  3. בחלונית Create a proxy (יצירת שרת proxy), בקטע Proxy template (תבנית שרת proxy), בוחרים באפשרות Proxy with Server-Sent Events (SSE) (שרת proxy עם אירועים שנשלחים מהשרת).
  4. בקטע פרטי שרת proxy, מזינים את הפרטים הבאים:
    • שם ה-Proxy: מזינים שם ל-Proxy, למשל myproxy.
    • נתיב בסיס: מוגדר אוטומטית לערך שמזינים עבור שם ה-proxy. נתיב הבסיס הוא חלק מכתובת ה-URL שמשמשת לשליחת בקשות ל-API. מערכת Apigee משתמשת בכתובת ה-URL כדי להתאים בקשות נכנסות ולנתב אותן ל-proxy ל-API המתאים.
    • תיאור (אופציונלי): מזינים תיאור ל-proxy ל-API החדש, למשל "בדיקת Apigee באמצעות proxy פשוט".
    • יעד (API קיים): מזינים את כתובת ה-URL של יעד ה-SSE עבור ה-proxy ל-API. לדוגמה: https://mocktarget.apigee.net/sse-events/5
    • לוחצים על הבא.
  5. פריסה (אופציונלי):
    • סביבות פריסה: אופציונלי. משתמשים בתיבות הסימון כדי לבחור סביבה אחת או יותר שבהן רוצים לפרוס את ה-proxy. אם אתם לא רוצים לפרוס את ה-proxy בשלב הזה, אתם יכולים להשאיר את השדה סביבות פריסה ריק. תמיד אפשר לפרוס את ה-proxy מאוחר יותר.
    • חשבון שירות: אופציונלי. חשבון שירות עבור ה-proxy. חשבון השירות מייצג את הזהות של ה-proxy שנפרס, וקובע אילו הרשאות יש לו. זוהי תכונה מתקדמת, ולצורך המדריך הזה אפשר להתעלם ממנה.

    שרתי proxy ל-API שנפרסו עם הגדרת EventFlow יחויבו כשרתי proxy ניתנים להרחבה.

  6. לוחצים על יצירה.

אפשר גם לעיין במאמר יצירת proxy ל-API פשוט.

הגדרת EventFlow

כדי להשיג שליטה מדויקת יותר בתגובת ה-SSE,‏ Apigee מספקת זרימת נקודות קצה מיוחדת שנקראת EventFlow. בהקשר של EventFlow, אפשר להוסיף קבוצה מוגבלת של מדיניות, שמפורטת בהמשך, כדי לשנות את תגובת ה-SSE לפני שהיא מועברת חזרה ללקוח. מידע נוסף על זרימות של שרתי proxy זמין במאמר שליטה בשרתי proxy של API באמצעות זרימות.

מיקום של EventFlow

ל-EventFlow יש שני מאפיינים:

  • name: שם לזיהוי רצף הפעולות.
  • content-type: הערך של המאפיין הזה חייב להיות text/event-stream.

אפשר לעיין גם במאמר הפניה להגדרת תהליך.

אפשר להציב EventFlow בתוך הגדרה של TargetEndpoint או ProxyEndpoint, כמו בדוגמאות הקוד הבאות:

<ProxyEndpoint>

<ProxyEndpoint name="default">
  <Description/>
  <FaultRules/>
  <PreFlow name="PreFlow">
    <Request/>
    <Response/>
  </PreFlow>
  <PostFlow name="PostFlow">
    <Request/>
    <Response/>
  </PostFlow>
  <Flows/>
  <EventFlow name="EventFlow" content-type="text/event-stream">
    <Response/>
  </EventFlow>
  <HTTPProxyConnection>
    <Properties/>
    <URL>https://httpbin.org/sse</URL>
  </HTTPProxyConnection>
</ProxyEndpoint>

<TargetEndpoint>

<TargetEndpoint name="default">
  <Description/>
  <FaultRules/>
  <PreFlow name="PreFlow">
    <Request/>
    <Response/>
  </PreFlow>
  <PostFlow name="PostFlow">
    <Request/>
    <Response/>
  </PostFlow>
  <Flows/>
  <EventFlow name="EventFlow" content-type="text/event-stream">
    <Response/>
  </EventFlow>
  <HTTPTargetConnection>
    <Properties/>
    <URL>https://httpbin.org/sse</URL>
  </HTTPTargetConnection>
</TargetEndpoint>

חשוב גם לציין שאף על פי שאפשר להוסיף EventFlow ל-TargetEndpoint, ל-ProxyEndpoint או לשניהם, רק EventFlow אחד מבוצע.

בטבלה הבאה מוצגות הפסקאות של EventFlow על סמך מיקום נקודת הקצה:

ProxyEndpoint TargetEndpoint השימוש ב-EventFlow
‫EventFlow ב-ProxyEndpoint ‫EventFlow ב-TargetEndpoint ‫EventFlow ב-TargetEndpoint
לא EventFlow ‫EventFlow ב-TargetEndpoint ‫EventFlow ב-TargetEndpoint
‫EventFlow ב-ProxyEndpoint לא EventFlow ‫EventFlow ב-ProxyEndpoint

הוספת מדיניות ל-EventFlow

אפשר להוסיף עד ארבע מדיניות לרכיב Response של EventFlow. כמו בכל התהליכים, כללי המדיניות מופעלים לפי הסדר שבו הם נוספו, ואפשר להוסיף שלבים מותנים כדי לשלוט בהפעלה שלהם. חשוב לציין שאפשר להוסיף ל-EventFlow רק את סוגי המדיניות הבאים. אסור להשתמש בסוגים אחרים של מדיניות ב-EventFlow:

אפשר לעיין גם במאמרים בנושא צירוף והגדרה של מדיניות בממשק המשתמש וצירוף והגדרה של מדיניות בקובצי XML.

בדוגמאות הבאות מוצג EventFlow עם שלב מדיניות RaiseFault מותנה שנוסף:

<ProxyEndpoint>

<ProxyEndpoint name="default">
  <EventFlow content-type="text/event-stream">
    <Response>
      <Step>
        <Name>Raise-Fault-Cred-Invalid</Name>
        <Condition>fault.name equals "invalid_access_token"</Condition>
      </Step>
    </Response>
  </EventFlow>
  <HTTPProxyConnection>
</ProxyEndpoint></pre>

<TargetEndpoint>

<TargetEndpoint name="default">
  <EventFlow content-type="text/event-stream">
    <Response>
      <Step>
        <Name>Raise-Fault-Cred-Invalid</Name>
        <Condition>fault.name equals "invalid_access_token"</Condition>
      </Step>
    </Response>
  </EventFlow>
  <HTTPTargetConnection>
</TargetEndpoint></pre>

EventFlow עוד דוגמאות לקוד זמינות בקטע תרחישים לדוגמה ודוגמאות לשימוש ב-EventFlow.

משתנים בתהליך

EventFlow מאכלס שלושה משתני זרימת תשובות. שימו לב שאפשר להשתמש במשתנים האלה רק בהקשר של האירוע הנוכחי שעובר עיבוד ב-EventFlow. לגישה למשתנים האלה או להגדרתם מחוץ להיקף EventFlow אין השפעה. הם רלוונטיים רק בהקשר של EventFlow.

  • response.event.current.content: מחרוזת שמכילה את התגובה המלאה של האירוע הנוכחי. מערכת Apigee לא מנתחת את המחרוזת בשום צורה. הוא מכיל את כל התשובה ללא שינוי, כולל כל שדות הנתונים.
  • response.event.current.data: מחרוזת שמכילה את מטען הנתונים של האירוע הנוכחי. אפשר לשנות את המשתנה הזה ב-EventFlow כדי לשנות את מטען הנתונים שנשלח ללקוח.
  • response.event.current.count: סופר באופן מצטבר את מספר אירועי התגובה שנשלחו. הערך הזה מתעדכן לכל אירוע שמתקבל. הערך יהיה 1 לאירוע הראשון, והוא יגדל לאירועים הבאים.

אפשר גם לעיין במאמר הפניה למשתנה של זרימת נתונים.

תרחישי שימוש ודוגמאות ל-EventFlow

בדוגמאות הבאות מוצגות דרכים להטמיע תרחישי שימוש נפוצים בפרוקסי של SSE:

שינוי תשובה של SSE

בדוגמה הזו מוצג איך להסיר נתונים מתגובת SSE EventFlow לפני שמחזירים אותה ללקוח. התוכן של תגובת ה-SSE מאוחסן במשתנה של תהליך בשם response.event.current.content. במקרה הזה, אנחנו משתמשים במדיניות JavaScript כדי לאחזר את הערך של משתנה הזרימה, לנתח אותו ולשנות אותו. מידע נוסף זמין במאמר בנושא משתני זרימה.

  1. יוצרים שרת proxy חדש באמצעות תבנית ה-proxy של SSE. מידע נוסף זמין במאמר יצירת proxy ל-API עם אירועים שנשלחים מהשרת (SSE).
  2. פותחים את ה-proxy בכלי לעריכת proxy של Apigee ולוחצים על הכרטיסייה Develop (פיתוח).
  3. יוצרים מדיניות JavaScript חדשה עם ההגדרה הבאה. בדוגמה הזו, קוד ה-JavaScript נכלל ישירות במדיניות. אפשרות נוספת להגדרת המדיניות היא להוסיף את קוד ה-JavaScript לקובץ משאבים.
    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <Javascript continueOnError="false" enabled="true" timeLimit="200" name="js-update-resp">
      <DisplayName>js-update-resp</DisplayName>
      <Properties/>
      <Source>
        var event = JSON.parse(context.getVariable("response.event.current.content"));
        event.modelVersion = null;
        context.setVariable("response.event.current.content",JSON.stringify(event));
      </Source>
    </Javascript>
  4. מוסיפים את מדיניות JavaScript לתג EventFlow של ה-proxy. ה-EventFlow מצורף ל-TargetEndpoint או ל-ProxyEndpoint שמוגדרים כברירת מחדל. בדוגמה הזו נעשה שימוש ב-Gemini API ב-Vertex AI כדי ליצור תוכן.

    <ProxyEndpoint>

    <ProxyEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>js-update-resp</Name>
          </Step>
        </Response>
      </EventFlow>
      <HTTPProxyConnection>
        <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:streamGenerateContent?key=GEMINI_API_KEY&alt=sse</URL>
      </HTTPProxyConnection>
    </ProxyEndpoint>
    

    <TargetEndpoint>

    <TargetEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>js-update-resp</Name>
          </Step>
        </Response>
      </EventFlow>
      <HTTPTargetConnection>
        <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:streamGenerateContent?key=GEMINI_API_KEY&alt=sse</URL>
      </HTTPTargetConnection>
    </TargetEndpoint>
    
  5. שומרים את ה-proxy ומפעילים אותו.
  6. מתקשרים לשרת ה-proxy שפרסתם:
    curl -X POST -H 'Content-Type: application/json'  \
      "https://YOUR_APIGEE_ENVIRONMENT_GROUP_HOSTNAME/YOUR_API_PATH" \
      -d '{ "contents":[{"parts":[{"text": "Write a story about a magic pen."}]}]}'

    הצגת דוגמה לתשובה

    זוהי דוגמה לתשובה בלי הפעלת סינון. שימו לב שהתשובה כוללת את המאפיין modelVersion": "gemini-2.5-flash".

    data: {
        "candidates": [
          {
            "content": {
              "parts": [
                {
                  "text": "ara found the pen tucked away in a dusty antique shop, nestled amongst chipped tea"
                }
              ],
              "role": "model"
            }
          }
        ],
        "usageMetadata": {
          "promptTokenCount": 8,
          "totalTokenCount": 8
        },
        "modelVersion": "gemini-2.5-flash"
      }

    זו דוגמה נוספת לתשובה עם מדיניות JavaScript. המאפיין modelVersion הוסר.

    data: {
        "candidates": [
          {
            "content": {
              "parts": [
                {
                  "text": " the fantastical creatures of her imagination.  The quiet beauty of a simple life was a magic all its own.\n"
                }
              ],
              "role": "model"
            },
            "finishReason": "STOP"
          }
        ],
        "usageMetadata": {
          "promptTokenCount": 8,
          "candidatesTokenCount": 601,
          "totalTokenCount": 609,
          "promptTokensDetails": [
            {
              "modality": "TEXT",
              "tokenCount": 8
            }
          ],
          "candidatesTokensDetails": [
            {
              "modality": "TEXT",
              "tokenCount": 601
            }
          ]
        }
      }

סינון של תשובת SSE

בדוגמה הזו מוצג סינון של נתונים מתגובת SSE לפני החזרתם ללקוח. במקרה הזה, אנחנו מסננים את נתוני האירועים מהתגובה באמצעות מדיניות JavaScript. המדיניות מנתחת את תגובת האירוע ל-JSON, משנה את ה-JSON כדי להסיר את נתוני האירוע, ואז שולחת את נתוני התגובה ששונו חזרה ללקוח.

כמו בדוגמה הקודמת, בדוגמה הזו מאחזרים את הערך של משתנה הזרימה response.event.current.content, מנתחים אותו ל-JSON ואז מחילים לוגיקה כדי להטמיע את הסינון הרצוי.

  1. יוצרים שרת proxy חדש באמצעות תבנית ה-proxy של SSE. מידע נוסף זמין במאמר יצירת proxy ל-API עם אירועים שנשלחים מהשרת (SSE).
  2. פותחים את ה-proxy בכלי לעריכת proxy של Apigee ולוחצים על הכרטיסייה Develop (פיתוח).
  3. יוצרים מדיניות JavaScript חדשה עם ההגדרה הבאה. בדוגמה הזו, קוד ה-JavaScript נכלל ישירות במדיניות. אפשרות נוספת להגדרת המדיניות היא להוסיף את קוד ה-JavaScript לקובץ משאבים.
    <Javascript continueOnError="false" enabled="true" timeLimit="200" name="js-filter-resp">
      <DisplayName>js-filter-resp</DisplayName>
      <Properties/>
      <Source>
        var event = JSON.parse(context.getVariable("response.event.current.content"));
        if("error" in event){
          // Do not send event to customer
          context.setVariable("response.event.current.content", "");
        }
      </Source>
    </Javascript>
  4. מוסיפים את מדיניות JavaScript לתג EventFlow של ה-proxy. ה-EventFlow מצורף ל-TargetEndpoint או ל-ProxyEndpoint שמוגדרים כברירת מחדל. בדוגמה הזו נעשה שימוש ב-Gemini API ב-Vertex AI כדי ליצור תוכן.

    <ProxyEndpoint>

    <ProxyEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>js-filter-resp</Name>
          </Step>
        </Response>
      </EventFlow>
      <HTTPProxyConnection>
    	  <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:streamGenerateContent?key=GEMINI_API_KEY&alt=sse	</URL>
      </HTTPProxyConnection>
    </ProxyEndpoint>
    

    <TargetEndpoint>

    <TargetEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>js-filter-resp</Name>
          </Step>
        </Response>
      </EventFlow>
      <HTTPTargetConnection>
    	  <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:streamGenerateContent?key=GEMINI_API_KEY&alt=sse	</URL>
      </HTTPTargetConnection>
    </TargetEndpoint>
    
  5. שומרים את ה-proxy ומפעילים אותו.
  6. מתקשרים לשרת ה-proxy שפרסתם:
    curl -X POST -H 'Content-Type: application/json'  \
        "https://YOUR_APIGEE_ENVIRONMENT_GROUP_HOSTNAME/YOUR_API_PATH" \
        -d '{ "contents":[{"parts":[{"text": "Write a story about a magic pen."}]}]}'

    הצגת דוגמה לתשובה

    זוהי דוגמה לאופן שבו התשובה עשויה להיראות בלי להחיל סינון כלשהו. שימו לב שהוא כולל נתוני שגיאות:

    data: {
        "candidates": [
          {
            "content": {
              "parts": [
                {
                  "text": "El"
                }
              ],
              "role": "model"
            }
          }
        ],
        "usageMetadata": {
          "promptTokenCount": 8,
          "totalTokenCount": 8
        },
        "modelVersion": "gemini-2.5-flash"
      }
        data: {
        "error": "Service temporarily unavailable. We are experiencing high traffic.",
        "modelVersion": "gemini-2.5-flash"
        }

    זוהי דוגמה נוספת לתשובה אחרי החלת סינון, כשהודעת השגיאה הוסרה.

    data: {
      "candidates": [
        {
          "content": {
            "parts": [
              {
                "text": "El"
              }
            ],
            "role": "model"
          }
        }
      ],
      "usageMetadata": {
        "promptTokenCount": 8,
        "totalTokenCount": 8
      },
      "modelVersion": "gemini-2.5-flash"
    }
    data: {
      "candidates": [
        {
          "content": {
            "parts": [
              {
                "text": "ara found the pen tucked away in a dusty antique shop, nestled amongst chipped tea"
              }
            ],
            "role": "model"
          }
        }
      ],
      "usageMetadata": {
        "promptTokenCount": 8,
        "totalTokenCount": 8
      },
      "modelVersion": "gemini-2.5-flash"
    }

שליחת אירוע SSE למערכת חיצונית

בדוגמה הזו, אנחנו מצרפים את מדיניות PublishMessage של Apigee אל EventFlow כדי לשלוח אירוע SSE אל נושא Pub/Sub.

  1. יוצרים שרת proxy חדש באמצעות תבנית ה-proxy של SSE. מידע נוסף זמין במאמר יצירת proxy ל-API עם אירועים שנשלחים מהשרת (SSE).
  2. פותחים את ה-proxy בכלי לעריכת proxy של Apigee ולוחצים על הכרטיסייה Develop (פיתוח).
  3. יוצרים מדיניות PublishMessage חדשה עם ההגדרה הבאה:
    <PublishMessage continueOnError="false" enabled="true" name="PM-record-event">
      <DisplayName>PM-record-event</DisplayName>
      <Source>{response.event.current.content}</Source>
      <CloudPubSub>
        <Topic>projects/<customer_project>/topics/<topic_name></Topic>
      </CloudPubSub>
    </PublishMessage>
  4. מוסיפים את מדיניות PublishMessage כשלב ב-EventFlow של proxy ל-API.

    <ProxyEndpoint>

    <ProxyEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>PM-record-event</Name>
          </Step>
        </Response>
      </EventFlow>
      <HTTPProxyConnection>
    </ProxyEndpoint>

    <TargetEndpoint>

    <TargetEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>PM-record-event</Name>
          </Step>
        </Response>
      </EventFlow>
      <HTTPTargetConnection>
    </TargetEndpoint>
  5. פריסה ובדיקה של ה-proxy ל-API.
  6. אחרי שמוסיפים את התוכן שנוצר לנושא Pub/Sub, אפשר למשל ליצור פונקציית Cloud Run לעיבוד הודעות מהנושא.

שימוש במדיניות הגנה מוגברת על המודל של Apigee ב-EventFlow

אתם יכולים להשתמש במדיניות SanitizeModelResponse כדי לבצע סניטציה של אירועים נכנסים שנשלחים מהשרת ב-EventFlow. המדיניות הזו מגנה על אפליקציות ה-AI שלכם על ידי סניטציה של תשובות ממודלים גדולים של שפה (LLM). למידע על הגנה מוגברת על המודל, אפשר לעיין במאמר סקירה כללית של הגנה מוגברת על המודל. למידע על מדיניות Apigee הגנה מוגברת על המודל, אפשר לעיין במאמר תחילת העבודה עם מדיניות Apigee הגנה מוגברת על המודל.

  1. יוצרים שרת proxy חדש באמצעות תבנית ה-proxy של SSE. מידע נוסף זמין במאמר יצירת proxy ל-API עם אירועים שנשלחים מהשרת (SSE).
  2. פותחים את ה-proxy בכלי לעריכת proxy של Apigee ולוחצים על הכרטיסייה Develop (פיתוח).
  3. יוצרים מדיניות חדשה של SanitizeModelResponse עם ההגדרה הבאה:
      <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
      <SanitizeModelResponse async="false" continueOnError="false" enabled="true" name="SMR-modelresponse">
        <IgnoreUnresolvedVariables>true</IgnoreUnresolvedVariables>
        <DisplayName>SMR-modelresponse</DisplayName>
        <ModelArmor>
          <TemplateName>projects/{project}/locations/{location}/templates/{template-name}</TemplateName>
        </ModelArmor>
        <LLMResponseSource>{response_partial}</LLMResponseSource>
        <!-- Use the below settings if you want to call a Model Armor policy on every event -->
        <LLMResponseSource>{response.event.current.content}</LLMResponseSource>
      </SanitizeModelResponse>
  4. (אופציונלי) מוסיפים מדיניות JavaScript לאירועים של קבוצה לפני ששולחים אותם למדיניות הגנה מוגברת על המודל של Apigee.
    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <Javascript continueOnError="false" enabled="true" timeLimit="200" name="JS-combine-resp">
      <DisplayName>JS-combine-events</DisplayName>
      <Properties/>
      <Source>
        var eventText = JSON.parse(context.getVariable("response.event.current.content").substring(5)).candidates[0].content.parts[0].text;
        var finishReason = JSON.parse(context.getVariable("response.event.current.content").substring(5)).candidates[0].finishReason;
        var idx = context.getVariable("response.event.current.count");
        if(idx%5==0 || finishReason=="STOP") {
          context.setVariable("response_partial", context.getVariable("tmp_buffer_pre"));
          context.setVariable("buff_ready", true);
          context.setVariable("tmp_buffer_pre", "");
        } else {
          context.setVariable("buff_ready", false);
          context.setVariable("response_partial", "");
          var previousBufferVal = context.getVariable("tmp_buffer_pre");
          if(previousBufferVal) {
            context.setVariable("tmp_buffer_pre", previousBufferVal+eventText);
          } else {
            context.setVariable("tmp_buffer_pre", eventText);
          }
        }
      </Source>
    </Javascript>
  5. מוסיפים את מדיניות JavaScript ו-ModelArmor לשלב בEventFlow של ה-proxy:
    <EventFlow name="EventFlow" content-type="text/event-stream">
      <Request/>
      <Response>
        <Step>
          <Name>JS-combine-resp</Name>
        </Step>
        <Step>
          <!-- Remove below Condition if you want to call model armor policy on every event -->
          <Condition> buff_ready = true </Condition>
          <Name>SMR-modelresponse</Name>
        </Step>
      </Response>
    </EventFlow>
  6. פריסה ובדיקה של ה-proxy ל-API.

טיפול בשגיאות ב-EventFlow

כברירת מחדל, סטרימינג האירועים מסתיים כשמתרחשת תקלה. עם זאת, אם רוצים לבצע ניפוי באגים נוסף, אפשר לשלוח מידע על תקלות אל Cloud Logging כמו בדוגמה הזו.

  1. יוצרים שרת proxy חדש באמצעות תבנית ה-proxy של SSE. מידע נוסף זמין במאמר יצירת proxy ל-API עם אירועים שנשלחים מהשרת (SSE).
  2. פותחים את ה-proxy בכלי לעריכת proxy של Apigee ולוחצים על הכרטיסייה Develop (פיתוח).
  3. יוצרים מדיניות חדשה של RaiseFault עם ההגדרה הבאה:
    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <RaiseFault continueOnError="false" enabled="true" name="RF-Empty-Event">
      <DisplayName>RF-Empty-Event</DisplayName>
      <Properties/>
      <FaultResponse>
        <AssignVariable>
          <Name>faultReason</Name>
          <Value>empty-event</Value>
        </AssignVariable>
      </FaultResponse>
      <IgnoreUnresolvedVariables>true</IgnoreUnresolvedVariables>
    </RaiseFault>
  4. מצרפים את מדיניות RaiseFault ל-EventFlow של שרת ה-proxy של SSE:
    <EventFlow content-type="text/event-stream">
      <Response>
        <Step>
          <Name>RF-Empty-Event</Name>
          <Condition>response.event.current.content ~ "data: "</Condition>
        </Step>
      </Response>
    </EventFlow>
  5. יוצרים מדיניות MessageLogging כדי לרשום שגיאות ביומן. לדוגמה:
    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <MessageLogging continueOnError="false" enabled="true" name="ML-log-error">
      <DisplayName>ML-log-error</DisplayName>
      <CloudLogging>
        <LogName>projects/{organization.name}/logs/apigee_errors</LogName>
        <Message contentType="text/plain">Request failed due to {faultReason}.</Message>
        <ResourceType>api</ResourceType>
      </CloudLogging>
      <logLevel>ALERT</logLevel>
    </MessageLogging>
  6. מוסיפים את מדיניות MessageLogging ל-FaultRules של נקודת הקצה של היעד או ה-proxy:

    <TargetEndpoint>

    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <TargetEndpoint name="TargetEndpoint-1">
      <Description/>
      <FaultRules>
        <FaultRule name="default-fault">
          <Step>
            <Name>ML-log-error</Name>
          </Step>
        </FaultRule>
      </FaultRules>
      ...
    </TargetEndpoint>

    <ProxyEndpoint>

    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <ProxyEndpoint name="ProxyEndpoint-1">
      <Description/>
      <FaultRules>
        <FaultRule name="default-fault">
          <Step>
            <Name>ML-log-error</Name>
          </Step>
        </FaultRule>
      </FaultRules>
      ...
    </ProxyEndpoint>
  7. פריסה ובדיקה של ה-proxy ל-API.

הפצת שגיאות בתהליך EventFlow

בדוגמה הזו אנחנו מראים לכם איך להשתמש ב-EventFlow כדי להעביר שגיאות של תקלות ללקוח. התהליך נועד להודיע ללקוח על שגיאות באופן מיידי במהלך ביצוע המדיניות.

  1. יוצרים שרת proxy חדש באמצעות תבנית ה-proxy של SSE. מידע נוסף זמין במאמר יצירת proxy ל-API עם אירועים שנשלחים מהשרת (SSE).
  2. פותחים את ה-proxy בכלי לעריכת proxy של Apigee ולוחצים על הכרטיסייה Develop (פיתוח).
  3. יוצרים מדיניות חדשה של JavaScript עם ההגדרה הבאה:
    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
      <Javascript continueOnError="false" enabled="true" timeLimit="200" name="js-error">
        <DisplayName>js-error</DisplayName>
        <Properties/>
        <Source>
          if(context.getVariable("response.event.current.count")=="2") {
            throw new Error("Internal Error");
          }
          context.setVariable("response.event.current.content", context.getVariable("response.event.current.content"));
        </Source>
      </Javascript>

    המדיניות הזו נועדה להקפיץ הודעת שגיאה (throw) כשמתקיים תנאי מסוים.

  4. מצרפים את מדיניות JavaScript ל-EventFlow של שרת ה-proxy של SSE בהגדרות TargetEndpoint או ProxyEndpoint. בשלב הזה מוודאים שהמדיניות מעובדת על ידי EventFlow במהלך הטיפול בתגובה:

    <TargetEndpoint>

    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <TargetEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>js-error</Name>
          </Step>
        </Response>
      </EventFlow>
      <HTTPTargetConnection>
        <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:streamGenerateContent</URL>
      </HTTPTargetConnection>
    </TargetEndpoint>

    <ProxyEndpoint>

    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <ProxyEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>js-error</Name>
          </Step>
        </Response>
      </EventFlow>
      <HTTPProxyConnection>
        <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:streamGenerateContent</URL>
      </HTTPProxyConnection>
    </ProxyEndpoint>
  5. פורסים את ה-proxy ל-API.
  6. כדי לבדוק את התנהגות ה-proxy, משתמשים בפקודה curl הבאה:
    curl -X POST -H 'Content-Type: application/json'  "https://ENVIRONMENT_GROUP_NAME/llm-api" -d '{ "contents":[{"parts":[{"text": "Write a story about a magic pen."}]}]}'

    מחליפים את ENVIRONMENT_GROUP_NAME בשם של קבוצת הסביבות.

    הפלט אמור להיראות כמו בדוגמה הבאה:

    data: {"candidates": [{"content": {"parts": [{"text": "El"}],"role": "model"}}],"usageMetadata": {"promptTokenCount": 8,"totalTokenCount": 8},"modelVersion": "gemini-2.5-flash"}
    data: {"fault":{"faultstring":"Execution of JS-error failed with error: Exception thrown from JavaScript : Error: Internal Error (Resource_1_js#2)","detail":{"errorcode":"steps.javascript.ScriptExecutionFailed"}}}

    הפלט מציג את מקור הנתונים הראשוני ואחריו הודעה של fault. במקרה של שגיאה, Apigee מתעד את פרטי התקלה ושולח אותם ללקוח כאירוע.

מידע נוסף על טיפול בשגיאות ב-Apigee זמין במאמר בנושא טיפול בשגיאות.

שימוש ב-DataCapture לאיסוף נתוני ספירת טוקנים ב-Analytics

בדוגמה הזו אפשר לראות איך משתמשים במדיניות DataCapture בתוך EventFlow כדי לאסוף ספירות של טוקנים מאירועי SSE לשימוש בניתוח נתונים ב-Apigee.

  1. יוצרים כלים לאיסוף נתונים של ספירת טוקנים. מריצים את הפקודות הבאות כדי ליצור את כלי האיסוף הנדרשים בארגון:
    curl -X POST "https://apigee.googleapis.com/v1/organizations/ORG_NAME/datacollectors" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -H "Content-Type: application/json" \
      -d '{
        "name": "dc_tokenCount",
        "description": "Collects token count",
        "type": "INTEGER"
      }'
    curl -X POST "https://apigee.googleapis.com/v1/organizations/ORG_NAME/datacollectors" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -H "Content-Type: application/json" \
      -d '{
        "name": "dc_thoughtsTokenCount",
        "description": "Collects thought token count",
        "type": "INTEGER"
      }'
  2. יוצרים שרת proxy חדש באמצעות תבנית ה-proxy של SSE. מידע נוסף זמין במאמר יצירת proxy ל-API עם אירועים שנשלחים מהשרת (SSE).
  3. פותחים את ה-proxy בכלי לעריכת proxy של Apigee ולוחצים על הכרטיסייה Develop (פיתוח).
  4. יוצרים מדיניות JavaScript חדשה כדי לנתח את ספירת הטוקנים מנתוני האירועים של SSE. בכלי לעריכת שרתי proxy של Apigee, לוחצים על + לצד Policies, בוחרים באפשרות JavaScript כסוג המדיניות ומזינים שם למדיניות (לדוגמה, JS-parse-token). קובץ ה-XML של המדיניות שיתקבל צריך להיראות כך:
    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <Javascript continueOnError="false" enabled="true" timeLimit="200" name="JS-parse-token">
      <DisplayName>JS-parse-token</DisplayName>
      <Properties/>
      <Source>
        const event=context.getVariable('response.event.current.content')
        try {
          const jsonString = event.replace(/^data:\s*/, '').trim();
          const parsed = JSON.parse(jsonString);
          const usage = parsed.usageMetadata || {};
          context.setVariable("thoughtsToken", usage.thoughtsTokenCount);
          context.setVariable("totalToken", usage.totalTokenCount);
        } catch (e) {
          context.setVariable("js_error_occurred", true);
        }
      </Source>
    </Javascript>
  5. יוצרים מדיניות חדשה של DataCapture כדי לתעד את ספירת הטוקנים. בכלי לעריכת שרתי proxy של Apigee, לוחצים על + לצד Policies (מדיניות), בוחרים באפשרות DataCapture (איסוף נתונים) כסוג המדיניות ומזינים שם למדיניות (לדוגמה, DC-capture-tokencount). קובץ ה-XML של המדיניות שנוצר צריך להיראות כך:
    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <DataCapture name="DC-capture-tokencount" continueOnError="false" enabled="true">
      <DisplayName>DC-capture-tokencount</DisplayName>
      <IgnoreUnresolvedVariables>false</IgnoreUnresolvedVariables>
      <Capture>
        <Collect ref="thoughtsToken" default="-1"/>
        <DataCollector>dc_thoughtsTokenCount</DataCollector>
      </Capture>
      <Capture>
        <Collect ref="totalToken" default="-1"/>
        <DataCollector>dc_tokenCount</DataCollector>
      </Capture>
    </DataCapture>
  6. מצרפים את מדיניות JavaScript קודם, ואחריה את מדיניות DataCapture, בתוך EventFlow של שרת ה-proxy של SSE בתוך ההגדרה TargetEndpoint או ProxyEndpoint. השלב הזה מבטיח שהמדיניות תעובד על ידי EventFlow במהלך הטיפול בתגובה. קובץ ה-XML שיתקבל צריך להיראות כך:

    <TargetEndpoint>

    <TargetEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>JS-parse-token</Name>
          </Step>
          <Step>
            <Name>DC-capture-tokencount</Name>
          </Step>
        </Response>
      </EventFlow>
      <HTTPTargetConnection>
        <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:streamGenerateContent?key=GEMINI_API_KEY&alt=sse</URL>
      </HTTPTargetConnection>
    </TargetEndpoint>

    <ProxyEndpoint>

    <ProxyEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>JS-parse-token</Name>
          </Step>
          <Step>
            <Name>DC-capture-tokencount</Name>
          </Step>
        </Response>
      </EventFlow>
      <HTTPProxyConnection>
        <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:streamGenerateContent?key=GEMINI_API_KEY&alt=sse</URL>
      </HTTPProxyConnection>
    </ProxyEndpoint>
  7. שומרים ומפעילים את ה-proxy. פרטים נוספים מופיעים במאמר בנושא פריסת פרוקסי של API.
  8. שליחת בקשות בדיקה לשרת ה-proxy. אחרי שה-proxy מעבד את התגובות של SSE, אפשר לראות את ספירת האסימונים שנאספו בניתוח הנתונים של Apigee באמצעות אוספי הנתונים dc_tokenCount ו-dc_thoughtsTokenCount.

שימוש במדיניות LLMTokenQuota כדי לאכוף מכסות של טוקנים ב-EventFlow

בדוגמה הזו מוצג שימוש במדיניות LLMTokenQuota בתוך EventFlow כדי לספור ולאכוף מכסות של טוקנים בסטרימינג של SSE. במהלך עיבוד זרם האירועים, ספירת הטוקנים מתבצעת רק אם המטא-נתונים של השימוש בטוקנים מהתגובה של ה-LLM נמצאים באירוע. כשמגלים את המטא-נתונים של השימוש באסימון, הם מחולצים והמדיניות מופעלת. בכל שאר האירועים, המדיניות לא משפיעה על הפעולה.

  1. יוצרים שרת proxy חדש באמצעות תבנית ה-proxy של SSE. מידע נוסף זמין במאמר יצירת proxy ל-API עם אירועים שנשלחים מהשרת (SSE).
  2. פותחים את ה-proxy בכלי לעריכת proxy של Apigee ולוחצים על הכרטיסייה Develop (פיתוח).
  3. יוצרים מדיניות LLMTokenQuota חדשה כדי לספור את האסימונים שנצרכים על ידי זרם ה-SSE. בכלי לעריכת ה-proxy של Apigee, לוחצים על + לצד Policies, בוחרים באפשרות LLMTokenQuota כסוג המדיניות ומזינים שם למדיניות (לדוגמה, LTQ-Count-Only). קובץ ה-XML של המדיניות שמתקבל צריך להיראות כך:
    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <LLMTokenQuota name="LTQ-Count-Only" type="rollingwindow">
      <SharedName>common-counter</SharedName>
      <CountOnly>true</CountOnly>
      <Allow count="15000"/>
      <Interval>30</Interval>
      <TimeUnit>minute</TimeUnit>
      <Distributed>true</Distributed>
      <LLMTokenUsageSource>{jsonPath('$.usageMetadata.candidatesTokenCount',response.event.current.data,false)}</LLMTokenUsageSource>
      <LLMModelSource>{jsonPath('$.modelVersion',response.event.current.data,false)}</LLMModelSource>
    </LLMTokenQuota>
  4. יוצרים מדיניות LLMTokenQuota שנייה כדי לאכוף את מכסת הטוקנים על בקשות נכנסות. בכלי לעריכת שרתי proxy של Apigee, לוחצים על + לצד Policies, בוחרים באפשרות LLMTokenQuota כסוג המדיניות ומזינים שם למדיניות (לדוגמה, LTQ-Enforce-Only). המדיניות הזו משתמשת באותו <SharedName> כמו מדיניות הספירה, כך ששתי המדיניות חולקות את אותו מונה של מכסת השימוש. קובץ ה-XML של המדיניות שמתקבל אמור להיראות כך:
    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <LLMTokenQuota name="LTQ-Enforce-Only" type="rollingwindow">
      <SharedName>common-counter</SharedName>
      <EnforceOnly>true</EnforceOnly>
      <Allow count="15000"/>
      <Interval>30</Interval>
      <TimeUnit>minute</TimeUnit>
      <Distributed>true</Distributed>
    </LLMTokenQuota>
  5. מוסיפים את המדיניות LTQ-Enforce-Only כשלב בRequest של PreFlow של ProxyEndpoint. השלב הזה מבטיח שהמכסה תיבדק בכל בקשה נכנסת. קובץ ה-XML שמתקבל אמור להיראות כך:
    <ProxyEndpoint name="default">
      <PreFlow name="PreFlow">
        <Request>
          <Step>
            <Name>LTQ-Enforce-Only</Name>
          </Step>
        </Request>
        <Response/>
      </PreFlow>
    </ProxyEndpoint>
  6. מוסיפים את מדיניות LTQ-Count-Only כשלב בתוך EventFlow של שרת ה-proxy של SSE בהגדרות של TargetEndpoint או ProxyEndpoint. השלב הזה מבטיח ש-EventFlow יעבד את המדיניות במהלך הטיפול בתגובה ויגדיל את מונה המכסה המשותפת. קובץ ה-XML שיתקבל צריך להיראות כך:

    <TargetEndpoint>

    <TargetEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>LTQ-Count-Only</Name>
          </Step>
        </Response>
      </EventFlow>
      <HTTPTargetConnection>
        <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:streamGenerateContent?key=GEMINI_API_KEY&alt=sse</URL>
      </HTTPTargetConnection>
    </TargetEndpoint>

    <ProxyEndpoint>

    <ProxyEndpoint name="default">
      <EventFlow content-type="text/event-stream">
        <Response>
          <Step>
            <Name>LTQ-Count-Only</Name>
          </Step>
        </Response>
      </EventFlow>
      <HTTPProxyConnection>
        <URL>https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:streamGenerateContent?key=GEMINI_API_KEY&alt=sse</URL>
      </HTTPProxyConnection>
    </ProxyEndpoint>
  7. שומרים ומפעילים את ה-proxy. פרטים נוספים מופיעים במאמר בנושא פריסת פרוקסי של API.
  8. שליחת בקשות בדיקה לשרת ה-proxy. בכל בקשה נכנסת, המדיניות LTQ-Enforce-Only בודקת את מונה המכסה המשותפת ודוחה את הבקשה אם כבר הושגה המגבלה שהוגדרה. בזמן שה-proxy מעבד את התגובות של SSE, מדיניות LTQ-Count-Only סופרת את הטוקנים שמדווחים במטא-נתונים של השימוש בטוקנים של כל אירוע, ומגדילה את אותו מונה מכסה משותף. כשהמונה מגיע למגבלה שהוגדרה, בקשות צריכת טוקנים עוקבות נדחות עד שהמונה מתאפס.

מידע נוסף על המדיניות LLMTokenQuota, כולל איך להשתמש בה עם תגובות SSE ואיך להגדיר מוני מכסות משותפים, זמין במאמר בנושא המדיניות LLMTokenQuota.

צפייה בנתוני SSE בניתוח הנתונים של Apigee

הנתונים של שרתי ה-proxy של SSE מופיעים בניתוח הנתונים של Apigee כמו שקורה עם כל proxy ל-API. במסוף Cloud, עוברים אל Analytics > API metrics.

ניפוי באגים בשרתי proxy של SSE

אפשר להשתמש בכלי לניפוי באגים של Apigee כדי לנפות באגים בשרתי proxy של SSE. נתוני ניפוי הבאגים נשמרים עבור EventFlow בדיוק כמו עבור סוגי התהליכים האחרים.

פתרון בעיות

כדי לפתור בעיות שקשורות לתנועה בזמן אמת, כדאי לבדוק את יומני הגישה של Apigee כדי לזהות את הגורם לבעיה.

מגבלות

המגבלות הבאות חלות על שרתי proxy של SSE:

  • אם המדיניות DataCapture מוחלת על כל אירוע, רק הערך האחרון יתועד.
  • אפשר להשאיר חיבור SSE פתוח ללא הגבלת זמן, אבל משך החיבור מוגבל בסופו של דבר על ידי הגדרת הזמן הקצוב לתפוגה של איזון העומסים במורד הזרם של Apigee. כברירת מחדל, הגדרת הזמן הקצוב לתפוגה של מאזן העומסים של Apigee היא 30 שניות. לחיבורים ארוכים יותר, מומלץ להגדיל את הזמן הקצוב לתפוגה או ליצור שירות קצה עורפי נפרד עם ערך גבוה יותר של זמן קצוב לתפוגה כדי לטפל בתנועת הנתונים של SSE.
  • נתוני הניתוח מתועדים אחרי סגירת סשן ה-SSE, ולכן יכול להיות שתבחינו בעיכוב בדיווח של נתוני הניתוח.
  • תקלות בתוך EventFlow גורמות ליציאה מיידית מהסטרים ולשגיאה. למידע על רישום שגיאות כאלה באופן ידני או על שליחתן ללקוח, אפשר לעיין במאמר תרחישי שימוש ב-EventFlow ודוגמאות.
  • לקוח שמקבל תגובות SSE בסטרימינג יקבל את הכותרות HTTP, כולל קודי סטטוס, בתחילת סטרימינג האירועים. כתוצאה מכך, אם זרם האירועים עובר למצב שגיאה, קוד הסטטוס שמתקבל בהתחלה לא ישקף את מצב השגיאה.

    אפשר לראות את המגבלה הזו כשמציגים סשן ניפוי באגים. במהלך הסשן, יכול להיות שתבחינו שHTTP קוד הסטטוס של הסטרימינג שמופיע במצב השגיאה שונה מקודי הסטטוס שנשלחים ללקוח. זה יכול לקרות כי רשומת סשן הניפוי באגים נוצרת אחרי שכל הבקשה עובדה, ולא בתחילת זרם האירועים. יכול להיות שבסשן הניפוי באגים יוצג קוד השגיאה שנוצר, בעוד שהלקוח יראה בהתחלה רק את הסטטוס 2xx שהתקבל בכותרות.