Wenn Sie Dataflow für die ML-Inferenz verwenden, empfehlen wir die RunInference-Transformation. Die Verwendung dieser Transformation bietet eine Reihe von Vorteilen:
- Intelligente Verwaltung des Modellarbeitsspeichers, die für einen Dataflow-Worker bei der lokalen Inferenz optimiert ist.
- Dynamische Batchverarbeitung, bei der Pipeline-Merkmale und benutzerdefinierte Einschränkungen zur Leistungsoptimierung verwendet werden.
- ML-fähige Dataflow-Backend-Funktionen, die für einen besseren Durchsatz und eine geringere Latenz sorgen können.
- Intelligente Backoff- und Autoscaling-Mechanismen bei der Überschreitung von Kontingenten für die Remote-Inferenz.
- Produktionsreife Messwerte und operative Funktionen.
Bei der Verwendung von RunInference sind einige Dinge zu beachten:
Speicherverwaltung
Wenn Sie ein mittelgroßes oder großes ML-Modell laden, kann es sein, dass auf Ihrem Computer nicht genügend Arbeitsspeicher vorhanden ist. Dataflow bietet Tools, mit denen Sie OOM-Fehler („Out Of Memory“) beim Laden von ML-Modellen vermeiden können. Anhand der folgenden Tabelle können Sie den geeigneten Ansatz für Ihr Szenario ermitteln.
| Szenario | Lösung |
|---|---|
| Die Modelle sind klein genug, um in den Arbeitsspeicher zu passen. |
Verwenden Sie die RunInference-Transformation ohne zusätzliche Konfigurationen. Die RunInference-Transformation gibt die Modelle für Threads frei. Wenn auf Ihrem Computer ein Modell pro CPU-Kern ausgeführt werden kann, kann Ihre Pipeline die Standardkonfiguration verwenden.
|
| Mehrere unterschiedlich trainierte Modelle führen dieselbe Aufgabe aus. | Verwenden Sie Schlüssel pro Modell. Weitere Informationen finden Sie unter ML-Inferenz mit mehreren unterschiedlich trainierten Modellen ausführen. |
| Ein Modell wird in den Arbeitsspeicher geladen und von allen Prozessen gemeinsam verwendet. |
Verwenden Sie den Parameter Wenn Sie einen benutzerdefinierten Modell-Handler erstellen, überschreiben Sie den Parameter |
| Sie müssen die genaue Anzahl der Modelle konfigurieren, die auf Ihren Computer geladen werden. |
Mit dem Parameter Wenn Sie einen benutzerdefinierten Modell-Handler erstellen, überschreiben Sie den Parameter |
Weitere Informationen zur Speicherverwaltung mit Dataflow finden Sie unter Fehlerbehebung bei Dataflow-Fehlern aufgrund von fehlerhaftem Arbeitsspeicher.
Batching
Es gibt viele Möglichkeiten, Batching in Beam zu implementieren. Wenn Sie jedoch Inferenz durchführen, empfehlen wir, das Batching von der RunInference-Transformation verarbeiten zu lassen. Wenn Ihr Modell mit einer bestimmten Batchgröße die beste Leistung erzielt, sollten Sie die Parameter für die Ziel-Batchgröße von RunInference einschränken. Die meisten Modell-Handler machen die maximale und die minimale Batchgröße als Parameter verfügbar. Wenn Sie beispielsweise die Batchgröße steuern möchten, die in eine HuggingFace-Pipeline eingegeben wird, können Sie den folgenden Modell-Handler definieren:
mh = HuggingFacePipelineModelHandler('text-classification', min_batch_size=4, max_batch_size=16)
Bei der RunInference-Transformation wird die maximale Batchgröße immer berücksichtigt. Die Mindestbatchgröße ist ein Ziel, wird aber nicht garantiert in allen Fällen eingehalten. Ein Beispiel finden Sie im folgenden Abschnitt unter Batch-Verarbeitung auf Grundlage von Bündeln.
Bundle-basiertes Batching
Dataflow übergibt Daten in Bundles an Transformationen. Die Größe dieser Bundles kann je nach den von Dataflow definierten Heuristiken variieren. Normalerweise sind Bundles in Batchpipelines recht groß (O(100) Elemente), während sie für Streamingpipelines recht klein sein können (einschließlich Größe 1).
Standardmäßig werden mit RunInference Batches aus jedem Bundle generiert. Es werden keine Batches aus mehreren Bundles erstellt. Wenn Sie beispielsweise eine Mindestbatchgröße von 8 haben, aber nur noch 3 Elemente in Ihrem Bundle vorhanden sind, verwendet RunInference eine Batchgröße von 3. Die meisten Modell-Handler stellen einen max_batch_duration_secs-Parameter zur Verfügung, mit dem Sie dieses Verhalten überschreiben können. Wenn max_batch_duration_secs festgelegt ist, werden RunInference-Batches über Bundles hinweg erstellt. Wenn die Transformation die Zielbatchgröße mit einem einzelnen Bundle nicht erreichen kann, wartet sie maximal max_batch_duration_secs, bevor ein Batch ausgegeben wird. Wenn Sie beispielsweise die Batchverarbeitung über mehrere Bundles hinweg aktivieren möchten, wenn Sie eine HuggingFace-Pipeline verwenden, können Sie den folgenden Modell-Handler definieren:
mh = HuggingFacePipelineModelHandler('text-classification', min_batch_size=4, max_batch_size=16, max_batch_duration_secs=3)
Dieses Feature kann hilfreich sein, wenn Sie in Ihrer Pipeline sehr kleine Batchgrößen haben. Andernfalls lohnt sich die Synchronisierungskosten für das Batching über Bundles hinweg in der Regel nicht, da sie zu einem teuren Shuffle führen kann.
Fehlerbehandlung
Die Fehlerbehandlung ist ein wichtiger Bestandteil jeder Produktionspipeline. Dataflow verarbeitet Elemente in beliebigen Gruppierungen. Sollte für eines der Elemente in der Gruppierung ein Fehler ausgegeben werden, wird die gesamte Gruppierung noch einmal verarbeitet. Wenn Sie keine zusätzliche Fehlerbehandlung anwenden, wiederholt Dataflow die Verarbeitung von Gruppierungen mit einem fehlerhaften Element im Batchmodus viermal. Wenn eine Gruppierung viermal fehlschlägt, fällt die gesamte Pipeline aus. Im Streamingmodus wird die Verarbeitung einer Gruppierung mit einem fehlerhaften Element unendlich oft wiederholt. Dies kann zur permanenten Blockierung der Pipeline führen.
RunInference bietet mit der with_exception_handling-Funktion einen integrierten Mechanismus zur Fehlerbehandlung.
Wenn Sie diese Funktion anwenden, werden alle Fehler mit ihren Fehlermeldungen an einen separaten FehlerPCollection weitergeleitet. So können Sie sie noch einmal verarbeiten. Wenn Sie Vor- oder Nachbearbeitungsvorgänge mit Ihrem Modell-Handler verknüpfen, leitet RunInference diese ebenfalls an die Fehlererfassung weiter. Wenn Sie beispielsweise alle Fehler eines Modellhandlers mit Vorverarbeitungs- und Nachbearbeitungsvorgängen erfassen möchten, verwenden Sie die folgende Logik:
main, other = pcoll | RunInference(model_handler.with_preprocess_fn(f1).with_postprocess_fn(f2)).with_exception_handling()
# handles failed preprocess operations, indexed in the order in which they were applied
other.failed_preprocessing[0] | beam.Map(logging.info)
# handles failed inferences
other.failed_inferences | beam.Map(logging.info)
# handles failed postprocess operations, indexed in the order in which they were applied
other.failed_postprocessing[0] | beam.Map(logging.info)
Zeitlimits
Wenn Sie die with_exception_handling-Funktion von RunInference verwenden, können Sie auch ein Zeitlimit für jeden Vorgang festlegen, das pro Batch gezählt wird. So können Sie vermeiden, dass eine einzelne hängende Inferenz die gesamte Pipeline zum Stillstand bringt. Tritt ein Zeitlimitüberschreitung auf, wird der entsprechende Datensatz an den Fehlerknoten PCollection weitergeleitet, der gesamte Modellstatus wird bereinigt und neu erstellt und die normale Ausführung wird fortgesetzt.
# Timeout execution after 60 seconds
main, other = pcoll | RunInference(model_handler).with_exception_handling(timeout=60)
Ab Beam 2.68.0 können Sie auch ein Zeitlimit mit der Pipelineoption --element_processing_timeout_minutes angeben. In diesem Fall führt ein Zeitlimit dazu, dass ein fehlgeschlagenes Arbeitselement wiederholt wird, bis es erfolgreich ist, anstatt die fehlgeschlagene Inferenz an eine Warteschlange für unzustellbare Nachrichten weiterzuleiten.
Mit Beschleunigern arbeiten
Bei der Verwendung von Beschleunigern haben viele Modell-Handler beschleunigerspezifische Konfigurationen, die Sie aktivieren können. Wenn Sie beispielsweise eine GPU und Hugging Face-Pipelines verwenden, empfehlen wir, den Parameter device auf „GPU“ zu setzen:
mh = HuggingFacePipelineModelHandler('text-classification', device='GPU')
Wir empfehlen außerdem, mit einer einzelnen VM-Instanz zu beginnen und die Pipeline dort lokal auszuführen. Folgen Sie dazu der Anleitung im Leitfaden zur Fehlerbehebung bei GPUs. Dadurch kann die Zeit, die zum Ausführen einer Pipeline erforderlich ist, erheblich verkürzt werden. So können Sie auch die Leistung Ihres Jobs besser nachvollziehen.
Weitere Informationen zur Verwendung von Beschleunigern in Dataflow finden Sie in der Dataflow-Dokumentation zu GPUs und TPUs.
Abhängigkeitsverwaltung
ML-Pipelines enthalten oft große und wichtige Abhängigkeiten wie PyTorch oder TensorFlow. Zum Verwalten dieser Abhängigkeiten empfehlen wir, benutzerdefinierte Container zu verwenden, wenn Sie Ihren Job in der Produktionsumgebung bereitstellen. So wird sichergestellt, dass Ihr Job in einer stabilen Umgebung ausgeführt wird, und das Debugging wird vereinfacht.
Weitere Informationen zur Abhängigkeitsverwaltung finden Sie auf der Beam-Seite Python Dependency Management.
Nächste Schritte
- Praktische Beispiele finden Sie in den Dataflow ML-Notebooks.
- Ausführliche Informationen zur Verwendung von ML mit Apache Beam finden Sie in der Dokumentation zu KI/ML-Pipelines.
- Weitere Informationen zur
RunInferenceAPI - Messwerte, mit denen Sie die
RunInference-Transformation überwachen können. - Zurück zur Seite „Informationen zu Dataflow ML“