このページでは、機械学習(ML)モデルのトレーニング用データを準備するために MLTransform 機能を使用する理由と方法について説明します。具体的には、このページでは、MLTransform を使用してエンベディングを生成してデータを処理する方法について説明します。
MLTransform は、複数のデータ処理変換を 1 つのクラスに統合することで、Apache Beam ML データ処理オペレーションをワークフローに適用するプロセスを簡素化しています。

MLTransform を使用します。エンベディングの概要
エンベディングは、最新のセマンティック検索アプリケーションと検索拡張生成(RAG)アプリケーションに不可欠です。エンベディングにより、システムはより深くより概念的なレベルで情報を理解し、操作できます。セマンティック検索では、エンベディングによってクエリとドキュメントがベクトル表現に変換されます。これらの表現は、基盤となる意味と関係性を捉えています。そのため、キーワードが完全に一致しない場合でも、関連性の高い検索結果を見つけることが可能です。これは、標準的なキーワード ベースの検索を大きく上回るものです。エンベディングは、商品のおすすめにも使用できます。これには、画像とテキストを使用するマルチモーダル検索、ログ分析、重複除去などのタスクが含まれます。
RAG では、エンベディングは、ナレッジベースから関連するコンテキストを取得して、大規模言語モデル(LLM)のレスポンスをグラウンディングするうえで重要な役割を果たします。RAG システムは、ユーザーのクエリとナレッジベース内の情報のチャンクの両方をエンベディングすることで、最も意味的に類似した情報を効率的に特定して取得できます。このセマンティック マッチングにより、LLM は正確で有益な回答を生成するために必要な情報にアクセスできます。
エンベディング用のデータを取り込んで処理する

コア エンベディングのユースケースでは、知識の取り込みと処理の方法が重要な考慮事項となります。この取り込みは、バッチ方式またはストリーミング方式で行えます。この知識のソースは多岐にわたります。たとえば、この情報は Cloud Storage に保存されているファイルから取得することも、Pub/Sub や Google Cloud Managed Service for Apache Kafka などのストリーミング ソースから取得することもできます。
ストリーミング ソースの場合、データ自体は未加工のコンテンツ(例: 書式なしテキスト)またはドキュメントを指す URI である可能性があります。通常、最初のステージでは、ソースに関係なく、情報の前処理が行われます。生テキストの場合、基本的なデータ クレンジングなど、最小限の処理になることがあります。ただし、大きなドキュメントや複雑なコンテンツの場合は、重要なステップはチャンク化です。チャンク化には、ソースデータを管理しやすい小さな単位に分割することが含まれます。最適なチャンク化戦略は標準化されておらず、特定のデータとアプリケーションによって異なります。Dataflow などのプラットフォームには、さまざまなチャンク化のニーズに対応する機能が組み込まれており、この重要な前処理ステージを簡素化できます。
利点
MLTransform クラスには次のようなメリットがあります。
- データをベクトル データベースに push したり、推論を実行するために使用できるエンベディングを生成します。
- 複雑なコードの記述や基盤となるライブラリの管理を行わずに、データを変換できます。
- 複数のタイプの処理オペレーションを 1 つのインターフェースで効率的に連結できます。
サポートと制限事項
MLTransform クラスには次の制限があります。
- Apache Beam Python SDK バージョン 2.53.0 以降を使用するパイプラインで使用できます。
- パイプラインでは、デフォルト ウィンドウを使用する必要があります。
テキスト エンベディング変換:
- Python 3.8、3.9、3.10、3.11、3.12 をサポートします。
- バッチ パイプラインとストリーミング パイプラインの両方をサポートします。
- Vertex AI text-embeddings API と Hugging Face Sentence Transformers モジュールをサポートします。
ユースケース
サンプル ノートブックは、特定のユースケースで MLTransform を使用する方法を示しています。
- Vertex AI を使用して LLM のテキスト エンベディングを生成したい
- Apache Beam
MLTransformクラスと Vertex AI text-embeddings API を使用して、テキスト エンベディングを生成します。テキスト エンベディングは、テキストを数値ベクトルとして表現する方法であり、多くの自然言語処理(NLP)タスクで必要になります。 - Hugging Face を使用して LLM のテキスト エンベディングを生成したい
- Apache Beam
MLTransformクラスと Hugging Face Hub モデルを使用して、テキスト エンベディングを生成します。Hugging FaceSentenceTransformersフレームワークは、Python を使用して文、テキスト、画像のエンベディングを生成します。 - テキスト エンベディングを生成して AlloyDB for PostgreSQL に取り込みたい
- Apache Beam(特に
MLTransformクラスと Hugging Face Hub モデル)を使用して、テキスト エンベディングを生成します。次に、VectorDatabaseWriteTransformを使用して、これらのエンベディングと関連するメタデータを AlloyDB for PostgreSQL に読み込みます。このノートブックでは、AlloyDB for PostgreSQL ベクトル データベースにデータを入力するためのスケーラブルなバッチとストリーミング Beam データ パイプラインの構築方法を示します。これには、Pub/Sub や既存のデータベース テーブルなどのさまざまなソースからのデータの処理、カスタム スキーマの作成、データの更新が含まれます。 - テキスト エンベディングを生成して BigQuery に取り込みたい
- Apache Beam
MLTransformクラスと Hugging Face Hub モデルを使用して、商品カタログなどのアプリケーション データからテキスト エンベディングを生成します。これには Apache BeamHuggingfaceTextEmbeddings変換が使用されます。この変換では、文とテキストのエンベディングを生成するモデルを提供する Hugging Face SentenceTransformers フレームワークを使用します。生成されたエンベディングとそのメタデータは、Apache BeamVectorDatabaseWriteTransformを使用して BigQuery に取り込まれます。このノートブックでは、拡充変換を使用して BigQuery でベクトル類似度検索を行う方法も示します。
使用可能な変換の完全なリストについては、Apache Beam ドキュメントの変換をご覧ください。
エンベディング生成に MLTransform を使用する
MLTransform クラスを使用して情報をチャンク化し、エンベディングを生成するには、次のコードをパイプラインに含めます。
def create_chunk(product: Dict[str, Any]) -> Chunk:
return Chunk(
content=Content(
text=f"{product['name']}: {product['description']}"
),
id=product['id'], # Use product ID as chunk ID
metadata=product, # Store all product info in metadata
)
[...]
with beam.Pipeline() as p:
_ = (
p
| 'Create Products' >> beam.Create(products)
| 'Convert to Chunks' >> beam.Map(create_chunk)
| 'Generate Embeddings' >> MLTransform(
write_artifact_location=tempfile.mkdtemp())
.with_transform(huggingface_embedder)
| 'Write to AlloyDB' >> VectorDatabaseWriteTransform(alloydb_config)
)
上記の例では、要素ごとに 1 つのチャンクが作成されますが、代わりに LangChain を使用してチャンクを作成することもできます。
splitter = CharacterTextSplitter(chunk_size=100, chunk_overlap=20)
provider = beam.ml.rag.chunking.langchain.LangChainChunker(
document_field='content', metadata_fields=[], text_splitter=splitter)
with beam.Pipeline() as p:
_ = (
p
| 'Create Products' >> beam.io.textio.ReadFromText(products)
| 'Convert to Chunks' >> provider.get_ptransform_for_processing()
次のステップ
- ブログ投稿 Dataflow ML でリアルタイムのセマンティック検索や RAG を活用するアプリケーションを実現する方法を読む。
- Apache Beam ドキュメントのデータの前処理で
MLTransformの詳細を確認する。 - Apache Beam 変換カタログのデータ処理用の
MLTransformでその他の例を確認する。 - Colab でインタラクティブ ノートブックを実行する。