ייבוא של ערכי תכונות בקבוצות

ייבוא באצ' מאפשר לייבא ערכי מאפיינים בכמות גדולה ממקור נתונים תקין. בבקשת ייבוא באצווה, אפשר לייבא ערכים לעד 100 מאפיינים של סוג ישות. שימו לב: כדי למנוע התנגשויות, אפשר להריץ רק משימת ייבוא אחת של קבוצת נתונים לכל סוג ישות.

בבקשת ייבוא באצווה, מציינים את המיקום של נתוני המקור ואת המיפוי שלהם לתכונות במאגר התכונות. מכיוון שכל בקשה לייבוא באצווה מיועדת לסוג ישות אחד, נתוני המקור צריכים להיות גם הם מסוג ישות אחד.

אחרי שהייבוא מסתיים בהצלחה, ערכי התכונות זמינים לפעולות קריאה עוקבות.

ייבוא נתוני הביצועים של משרות

‫Vertex AI Feature Store (גרסה קודמת) מספק ייבוא עם תפוקה גבוהה, אבל זמן האחזור המינימלי יכול להיות כמה דקות. כל בקשה ל-Vertex AI Feature Store (גרסה קודמת) מפעילה משימה להשלמת העבודה. משימת ייבוא נמשכת כמה דקות, גם אם מייבאים רשומה אחת.

אם רוצים לבצע שינויים בביצועים של משימה, משנים את שני המשתנים הבאים:

  • מספר הצמתים של מילוי בקשה באופן מיידי במאגר התכונות.
  • מספר העובדים שמשמשים למשימת הייבוא. תהליכי Worker מעבדים נתונים וכותבים אותם אל מאגר התכונות.

מספר העובדים המומלץ הוא עובד אחד לכל 10 צמתים של מילוי בקשה באופן מיידי במאגר פיצ'רים. אפשר להגדיר ערך גבוה יותר אם עומס מילוי הבקשה באופן מיידי נמוך. אפשר לציין עד 100 עובדים. לקבלת הנחיות נוספות, אפשר לעיין במאמר בנושא מעקב אחרי משאבים והתאמה שלהם בהתאם כדי לייעל את הייבוא של קבוצות.

אם לא הוקצו מספיק משאבים לאשכול מילוי בקשה באופן מיידי, יכול להיות שעבודת הייבוא תיכשל. במקרה של כשל, צריך לנסות שוב את בקשת הייבוא כשהעומס על מילוי בקשה באופן מיידי נמוך, או להגדיל את מספר הצמתים של מאגר התכונות ואז לנסות שוב את הבקשה.

אם למאגר הפיצ'רים אין מאגר אונליין (אפס צמתים למילוי בקשה באופן מיידי), עבודת הייבוא כותבת רק למאגר האופליין, והביצועים של העבודה תלויים רק במספר העובדים שמבצעים את הייבוא.

עקביות הנתונים

אם נתוני המקור משתנים במהלך הייבוא, יכול להיות שיהיו אי-התאמות. חשוב לוודא שכל השינויים בנתוני המקור הושלמו לפני שמתחילים עבודת ייבוא. בנוסף, ערכים כפולים של מאפיינים עלולים לגרום להצגת ערכים שונים בבקשות אונליין ובבקשות אצווה. מוודאים שיש ערך תכונה אחד לכל מזהה ישות וצמד חותמת זמן.

אם פעולת ייבוא נכשלת, יכול להיות שמאגר פיצ'רים יכיל רק נתונים חלקיים, מה שעלול להוביל להחזרת ערכים לא עקביים בין בקשות להצגת נתונים אונליין לבין בקשות לשליפת נתונים ב-batch. כדי להימנע מחוסר עקביות כזה, צריך לנסות שוב את אותה בקשת יבוא ולחכות עד שהבקשה תושלם בהצלחה.

ערכי Null ומערכים ריקים

במהלך הייבוא, מערכת Vertex AI Feature Store (גרסה קודמת) מתייחסת לערכים סקלריים מסוג null או למערכים ריקים כאל ערכים ריקים. הם כוללים ערכים ריקים בעמודת CSV. מאגר התכונות של Vertex AI (גרסה קודמת) לא תומך בערכי null לא סקלריים, כמו ערך null במערך.

במהלך מילוי בקשה באופן מיידי ושליפת נתונים ב-batch, Vertex AI Feature Store (Legacy) מחזיר את הערך האחרון של התכונה שלא מכיל null או ערך ריק. אם ערך היסטורי של התכונה לא זמין, Vertex AI Feature Store (Legacy) מחזיר null.

ערכי NaN

‫Vertex AI Feature Store (גרסה קודמת) תומך בערכי NaN (Not a Number) במשתנים Double ו-DoubleArray. במהלך הייבוא, אפשר להזין NaN בקובץ ה-CSV של נתוני התצוגה כדי לייצג ערך NaN. במהלך מילוי בקשה באופן מיידי ושליפת נתונים ב-batch, Vertex AI Feature Store (גרסה קודמת) מחזיר NaN לערכי NaN.

ייבוא באצווה

ייבוא ערכים בכמות גדולה למאגר פיצ'רים של תכונה אחת או יותר מסוג ישות יחיד.

ממשק משתמש באינטרנט

  1. בקטע Vertex AI במסוף Google Cloud , עוברים לדף Features.

    כניסה לדף Features

  2. בוחרים אזור מהרשימה הנפתחת אזור.
  3. בטבלת התכונות, מעיינים בעמודה Entity type ומחפשים את סוג הישות שמכיל את התכונות שרוצים לייבא עבורן ערכים.
  4. לוחצים על השם של סוג הישות.
  5. בסרגל הפעולות, לוחצים על הזנת ערכים.
  6. בקטע מקור נתונים, בוחרים באחת מהאפשרויות הבאות:
    • קובץ CSV ב-Cloud Storage: בוחרים באפשרות הזו כדי לייבא נתונים מכמה קובצי CSV מ-Cloud Storage. מציינים את הנתיב והשם של קובץ ה-CSV. כדי לציין קבצים נוספים, לוחצים על הוספת קובץ נוסף.
    • קובץ AVRO ב-Cloud Storage: בוחרים באפשרות הזו כדי לייבא נתונים מקובץ AVRO מ-Cloud Storage. מציינים את הנתיב והשם של קובץ ה-AVRO.
    • טבלה ב-BigQuery: בוחרים באפשרות הזו כדי לייבא נתונים מטבלה ב-BigQuery או מתצוגת BigQuery. מחפשים ובוחרים טבלה או תצוגה לשימוש, בפורמט הבא: PROJECT_ID.DATASET_ID.TABLE_ID
  7. לוחצים על Continue.
  8. בקטע Map column to features (מיפוי עמודה לפיצ'רים), מציינים אילו עמודות בנתוני המקור ממופות לישויות ולפיצ'רים במאגר הפיצ'רים.
    1. מציינים את שם העמודה בנתוני המקור שמכילה את מזהי הישות.
    2. כדי לציין את חותמת הזמן, צריך לציין עמודה של חותמות זמן בנתוני המקור או לציין חותמת זמן יחידה שמשויכת לכל ערכי התכונות שמייבאים.
    3. ברשימת התכונות, מזינים את שם העמודה של נתוני המקור שממופה לכל תכונה. כברירת מחדל, ב-Vertex AI Feature Store (גרסה קודמת) ההנחה היא ששם התכונה ושם העמודה זהים.
  9. לוחצים על העברה.

REST

כדי לייבא ערכי מאפיינים למאפיינים קיימים, שולחים בקשת POST באמצעות השיטה featurestores.entityTypes.importFeatureValues. שימו לב: אם השמות של עמודות נתוני המקור והמזהים של התכונות ביעד שונים, צריך לכלול את הפרמטר sourceField.

לפני שמשתמשים בנתוני הבקשה, צריך להחליף את הנתונים הבאים:

  • LOCATION_ID: האזור שבו נוצר מאגר הפיצ'רים. לדוגמה, us-central1.
  • PROJECT_ID: מזהה הפרויקט.
  • FEATURESTORE_ID: מזהה של מאגר התכונות.
  • ENTITY_TYPE_ID: מזהה סוג הישות.
  • ENTITY_SOURCE_COLUMN_ID: המזהה של עמודת המקור שמכילה את מזהי הישויות.
  • FEATURE_TIME_ID: המזהה של עמודת המקור שמכילה את חותמות הזמן של התכונות עבור ערכי התכונות.
  • FEATURE_ID: מזהה של תכונה קיימת במאגר הפיצ'רים לייבוא ערכים.
  • FEATURE_SOURCE_COLUMN_ID: המזהה של עמודת המקור שמכילה את ערכי התכונות של הישויות.
  • SOURCE_DATA_DETAILS: מיקום נתוני המקור, שמציין גם את הפורמט, למשל: ‫"bigquerySource": { "inputUri": "bq://test.dataset.sourcetable" } לטבלה או לתצוגה ב-BigQuery.
  • WORKER_COUNT: מספר העובדים שישמשו לכתיבת נתונים למאגר התכונות.

ה-method של ה-HTTP וכתובת ה-URL:

POST https://LOCATION_ID-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues

גוף בקשת JSON:

{
  "entityIdField": "ENTITY_SOURCE_COLUMN_ID",
  "featureTimeField": "FEATURE_TIME_ID",
  SOURCE_DATA_DETAILS,
  "featureSpecs": [{
    "id": "FEATURE_ID",
    "sourceField": "FEATURE_SOURCE_COLUMN_ID"
  }],
  "workerCount": WORKER_COUNT
}

כדי לשלוח את הבקשה עליכם לבחור אחת מהאפשרויות הבאות:

curl

שומרים את גוף הבקשה בקובץ בשם request.json ומריצים את הפקודה הבאה:

curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json; charset=utf-8" \
-d @request.json \
"https://LOCATION_ID-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues"

PowerShell

שומרים את גוף הבקשה בקובץ בשם request.json ומריצים את הפקודה הבאה:

$cred = gcloud auth print-access-token
$headers = @{ "Authorization" = "Bearer $cred" }

Invoke-WebRequest `
-Method POST `
-Headers $headers `
-ContentType: "application/json; charset=utf-8" `
-InFile request.json `
-Uri "https://LOCATION_ID-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues" | Select-Object -Expand Content

הפלט שיוצג אמור להיות דומה לזה שמופיע כאן. אפשר להשתמש ב-OPERATION_ID בתגובה כדי לקבל את הסטטוס של הפעולה.

{
  "name": "projects/PROJECT_NUMBER/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID/operations/OPERATION_ID",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.aiplatform.v1.ImportFeatureValuesOperationMetadata",
    "genericMetadata": {
      "createTime": "2021-03-02T00:04:13.039166Z",
      "updateTime": "2021-03-02T00:04:13.039166Z"
    }
  }
}

Python

במאמר התקנת Vertex AI SDK ל-Python מוסבר איך להתקין או לעדכן את Vertex AI SDK ל-Python. מידע נוסף מופיע ב מאמרי העזרה של Python API.

import datetime
from typing import List, Union

from google.cloud import aiplatform


def import_feature_values_sample(
    project: str,
    location: str,
    entity_type_id: str,
    featurestore_id: str,
    feature_ids: List[str],
    feature_time: Union[str, datetime.datetime],
    gcs_source_uris: Union[str, List[str]],
    gcs_source_type: str,
):

    aiplatform.init(project=project, location=location)

    my_entity_type = aiplatform.featurestore.EntityType(
        entity_type_name=entity_type_id, featurestore_id=featurestore_id
    )

    my_entity_type.ingest_from_gcs(
        feature_ids=feature_ids,
        feature_time=feature_time,
        gcs_source_uris=gcs_source_uris,
        gcs_source_type=gcs_source_type,
    )

Java

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי Javaהוראות ההגדרה במאמר Vertex AI quickstart using client libraries. מידע נוסף מופיע במאמרי העזרה של Vertex AI Java API.

כדי לבצע אימות ב-Vertex AI, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.


import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.aiplatform.v1.AvroSource;
import com.google.cloud.aiplatform.v1.EntityTypeName;
import com.google.cloud.aiplatform.v1.FeaturestoreServiceClient;
import com.google.cloud.aiplatform.v1.FeaturestoreServiceSettings;
import com.google.cloud.aiplatform.v1.GcsSource;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesOperationMetadata;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesRequest;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesRequest.FeatureSpec;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ImportFeatureValuesSample {

  public static void main(String[] args)
      throws IOException, InterruptedException, ExecutionException, TimeoutException {
    // TODO(developer): Replace these variables before running the sample.
    String project = "YOUR_PROJECT_ID";
    String featurestoreId = "YOUR_FEATURESTORE_ID";
    String entityTypeId = "YOUR_ENTITY_TYPE_ID";
    String entityIdField = "YOUR_ENTITY_FIELD_ID";
    String featureTimeField = "YOUR_FEATURE_TIME_FIELD";
    String gcsSourceUri = "YOUR_GCS_SOURCE_URI";
    int workerCount = 2;
    String location = "us-central1";
    String endpoint = "us-central1-aiplatform.googleapis.com:443";
    int timeout = 300;
    importFeatureValuesSample(
        project,
        featurestoreId,
        entityTypeId,
        gcsSourceUri,
        entityIdField,
        featureTimeField,
        workerCount,
        location,
        endpoint,
        timeout);
  }

  static void importFeatureValuesSample(
      String project,
      String featurestoreId,
      String entityTypeId,
      String gcsSourceUri,
      String entityIdField,
      String featureTimeField,
      int workerCount,
      String location,
      String endpoint,
      int timeout)
      throws IOException, InterruptedException, ExecutionException, TimeoutException {
    FeaturestoreServiceSettings featurestoreServiceSettings =
        FeaturestoreServiceSettings.newBuilder().setEndpoint(endpoint).build();

    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (FeaturestoreServiceClient featurestoreServiceClient =
        FeaturestoreServiceClient.create(featurestoreServiceSettings)) {
      List<FeatureSpec> featureSpecs = new ArrayList<>();

      featureSpecs.add(FeatureSpec.newBuilder().setId("title").build());
      featureSpecs.add(FeatureSpec.newBuilder().setId("genres").build());
      featureSpecs.add(FeatureSpec.newBuilder().setId("average_rating").build());
      ImportFeatureValuesRequest importFeatureValuesRequest =
          ImportFeatureValuesRequest.newBuilder()
              .setEntityType(
                  EntityTypeName.of(project, location, featurestoreId, entityTypeId).toString())
              .setEntityIdField(entityIdField)
              .setFeatureTimeField(featureTimeField)
              .addAllFeatureSpecs(featureSpecs)
              .setWorkerCount(workerCount)
              .setAvroSource(
                  AvroSource.newBuilder()
                      .setGcsSource(GcsSource.newBuilder().addUris(gcsSourceUri)))
              .build();
      OperationFuture<ImportFeatureValuesResponse, ImportFeatureValuesOperationMetadata>
          importFeatureValuesFuture =
              featurestoreServiceClient.importFeatureValuesAsync(importFeatureValuesRequest);
      System.out.format(
          "Operation name: %s%n", importFeatureValuesFuture.getInitialFuture().get().getName());
      System.out.println("Waiting for operation to finish...");
      ImportFeatureValuesResponse importFeatureValuesResponse =
          importFeatureValuesFuture.get(timeout, TimeUnit.SECONDS);
      System.out.println("Import Feature Values Response");
      System.out.println(importFeatureValuesResponse);
      featurestoreServiceClient.close();
    }
  }
}

Node.js

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי Node.jsהוראות ההגדרה במאמר Vertex AI quickstart using client libraries. מידע נוסף מופיע במאמרי העזרה של Vertex AI Node.js API.

כדי לבצע אימות ב-Vertex AI, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.

/**
 * TODO(developer): Uncomment these variables before running the sample.\
 * (Not necessary if passing values as arguments)
 */

// const project = 'YOUR_PROJECT_ID';
// const featurestoreId = 'YOUR_FEATURESTORE_ID';
// const entityTypeId = 'YOUR_ENTITY_TYPE_ID';
// const avroGcsUri = 'AVRO_FILE_IN_THE_GCS_URI';
// const entityIdField = 'ENTITY_ID_FIELD_IN_AVRO';
// const featureTimeField = 'TIMESTAMP_FIELD_IN_AVRO';
// const workerCount = <NO_OF_WORKERS_FOR_INGESTION_JOB>;
// const location = 'YOUR_PROJECT_LOCATION';
// const apiEndpoint = 'YOUR_API_ENDPOINT';
// const timeout = <TIMEOUT_IN_MILLI_SECONDS>;

// Imports the Google Cloud Featurestore Service Client library
const {FeaturestoreServiceClient} = require('@google-cloud/aiplatform').v1;

// Specifies the location of the api endpoint
const clientOptions = {
  apiEndpoint: apiEndpoint,
};

// Instantiates a client
const featurestoreServiceClient = new FeaturestoreServiceClient(
  clientOptions
);

async function importFeatureValues() {
  // Configure the entityType resource
  const entityType = `projects/${project}/locations/${location}/featurestores/${featurestoreId}/entityTypes/${entityTypeId}`;

  const avroSource = {
    gcsSource: {
      uris: [avroGcsUri],
    },
  };

  const featureSpecs = [{id: 'age'}, {id: 'gender'}, {id: 'liked_genres'}];

  const request = {
    entityType: entityType,
    avroSource: avroSource,
    entityIdField: entityIdField,
    featureSpecs: featureSpecs,
    featureTimeField: featureTimeField,
    workerCount: Number(workerCount),
  };

  // Import Feature Values Request
  const [operation] = await featurestoreServiceClient.importFeatureValues(
    request,
    {timeout: Number(timeout)}
  );
  const [response] = await operation.promise();

  console.log('Import feature values response');
  console.log('Raw response:');
  console.log(JSON.stringify(response, null, 2));
}
importFeatureValues();

הצגת משימות ייבוא

אפשר להשתמש במסוף Google Cloud כדי להציג משימות של ייבוא באצווה בGoogle Cloud פרויקט.

ממשק משתמש באינטרנט

  1. בקטע Vertex AI במסוף Google Cloud , עוברים לדף Features.

    כניסה לדף Features

  2. בוחרים אזור מהרשימה הנפתחת אזור.
  3. בסרגל הפעולות, לוחצים על הצגת משימות ההטמעה כדי להציג רשימה של משימות ייבוא לכל מאגרי התכונות.
  4. לוחצים על המזהה של עבודת ייבוא כדי לראות את הפרטים שלה, כמו מקור הנתונים, מספר ישויות הייבוא ומספר ערכי התכונות שייובאו.

החלפת נתונים קיימים במאגר תכונות

אתם יכולים לייבא מחדש ערכים כדי להחליף ערכים קיימים של תכונות אם לשניהם יש חותמות זמן זהות. אין צורך למחוק קודם את ערכי התכונות הקיימים. לדוגמה, יכול להיות שאתם מסתמכים על נתוני מקור בסיסיים שהשתנו לאחרונה. כדי לשמור על עקביות בין מאגר הפיצ'רים לבין נתוני הבסיס, צריך לייבא מחדש את ערכי התכונות. אם יש חוסר התאמה בין חותמות הזמן, המערכת תתייחס לערכים המיובאים כאל ערכים ייחודיים, והערכים הישנים ימשיכו להתקיים (לא יוחלפו).

כדי להבטיח עקביות בין בקשות להצגה אונליין לבין בקשות לשליפת נתונים ב-batch, צריך לחכות עד שמשימת הייבוא תושלם לפני ששולחים בקשות להצגה.

השלמת חוסר בנתונים היסטוריים (data backfill)

אם אתם מבצעים השלמה עם מסמכים קודמים של נתונים, כלומר מייבאים ערכים קודמים של תכונות, צריך להשבית את מילוי הבקשה באופן מיידי של משימת הייבוא. מילוי בקשה באופן מיידי מיועד להצגת הערכים העדכניים ביותר של התכונות בלבד, ולא כולל השלמה עם מסמכים קודמים. השבתת מילוי בקשה באופן מיידי מועילה כי היא מבטלת את העומס על הצמתים של מילוי בקשה באופן מיידי ומגדילה את התפוקה של עבודת הייבוא, מה שיכול לקצר את זמן ההשלמה שלה.

כשמשתמשים ב-API או בספריות הלקוח, אפשר להשבית את מילוי הבקשות באופן מיידי של משימות ייבוא. מידע נוסף זמין בשדה disableOnlineServing של השיטה importFeatureValue.

המאמרים הבאים