Pub/Sub Topic or Subscription to Cloud Storage Text テンプレートは、Pub/Sub からレコードを読み取り、一連の Cloud Storage ファイルにテキスト形式で保存するストリーミング パイプラインです。このテンプレートを使用すると、Pub/Sub のデータを簡単に保存して後で使用できます。このテンプレートは、デフォルトで 5 分ごとに新しいファイルを生成します。
パイプラインの要件
- 実行前に Pub/Sub トピックまたはサブスクリプションが存在している必要があります。
- トピックに公開するメッセージは、テキスト形式となる必要があります。
- トピックに公開するメッセージは、改行を含んでいない必要があります。出力ファイルでは、1 つの Pub/Sub メッセージが 1 行に保存されます。
テンプレートのパラメータ
必須パラメータ
- outputDirectory: 出力ファイルを書き込むパスとファイル名の接頭辞。この値の末尾はスラッシュにする必要があります。例:
gs://your-bucket/your-path/
オプション パラメータ
- inputTopic: 読み取り元の Pub/Sub トピック。このパラメータを指定する場合は、
inputSubscriptionを使用しないでください。例:projects/<PROJECT_ID>/topics/<TOPIC_NAME> - inputSubscription: 入力の読み取り元になる Pub/Sub サブスクリプション。このパラメータを指定する場合は、
inputTopicを使用しないでください。例:projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_NAME> - userTempLocation: 一時ファイルを出力するユーザー指定のディレクトリ。末尾はスラッシュにする必要があります。
- outputFilenamePrefix: ウィンドウ処理された各ファイルに配置する接頭辞。例:
output-。デフォルトは output です。 - outputFilenameSuffix: ウィンドウ処理されたファイルの接尾辞。通常は、
.txtや.csvなどのファイル拡張子です。例:.txt。デフォルトは空です。 - outputShardTemplate: シャード テンプレートは、ウィンドウ処理されたファイルの動的部分を定義します。デフォルトでは、パイプラインは各ウィンドウ内で 1 つのシャードをファイル システムへの出力に使用します。つまり、ウィンドウごとにすべてのデータが 1 つのファイルに出力されます。
outputShardTemplateのデフォルトはW-P-SS-of-NNです。ここで、Wはウィンドウ期間、Pはペイン情報、Sはシャード番号、Nはシャード数です。ファイルが 1 つの場合、outputShardTemplateのSS-of-NN部分は00-of-01になります。 - numShards: 書き込み時に生成される出力シャードの最大数。シャード数が多いと Cloud Storage への書き込みのスループットが高くなりますが、出力 Cloud Storage ファイルの処理時にシャード全体のデータ集計コストが高くなる可能性があります。デフォルト値は 0 です。
- windowDuration: ウィンドウ期間は、出力ディレクトリにデータが書き込まれる間隔です。パイプラインのスループットに基づいて期間を構成します。たとえば、スループットを向上させるには、データがメモリに収まるようにウィンドウ サイズを小さくする必要があります。デフォルトは
5m(5 分)、最小は1s(1 秒)です。指定できる形式は、[int]s(秒、例:5s)、[int]m(分、例:12m)、[int]h(時間、例:2h)です。例:5m - yearPattern: 年のフォーマット パターン。
yまたはYのいずれかにする必要があります。「年」の値に大文字小文字の区別はありません。必要に応じて、パターンを英数字以外の文字またはディレクトリ文字(/)で囲みます。デフォルトはYYYYです。 - monthPattern: 月のフォーマット パターン。1 つ以上の
M文字にする必要があります。必要に応じて、パターンを英数字以外の文字またはディレクトリ文字(/)で囲みます。デフォルトはMMです。 - dayPattern: 日付のフォーマット パターン。月の日付の場合は
d、年の日付の場合はDにする必要があります。「年」の値に大文字小文字の区別はありません。必要に応じて、パターンを英数字以外の文字またはディレクトリ文字(/)で囲みます。デフォルトはddです。 - hourPattern: 時間のフォーマット パターン。1 つ以上の
H文字にする必要があります。必要に応じて、パターンを英数字以外の文字またはディレクトリ文字(/)で囲みます。デフォルトはHHです。 - minutePattern: 分のフォーマット パターン。1 つ以上の
m文字にする必要があります。必要に応じて、パターンを英数字以外の文字またはディレクトリ文字(/)で囲みます。デフォルトはmmです。
テンプレートを実行する
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは
us-central1です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、[ the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template] を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow flex-template run JOB_NAME \ --project=YOUR_PROJECT_ID \ --region REGION_NAME \ --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\ outputDirectory=gs://BUCKET_NAME/output/,\ outputFilenamePrefix=output-,\ outputFilenameSuffix=.txt
次のように置き換えます。
JOB_NAME: 一意の任意のジョブ名REGION_NAME: Dataflow ジョブをデプロイするリージョン(例:us-central1)VERSION: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内の対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名BUCKET_NAME: Cloud Storage バケットの名前
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME" "outputDirectory": "gs://BUCKET_NAME/output/", "outputFilenamePrefix": "output-", "outputFilenameSuffix": ".txt", }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex", } }
次のように置き換えます。
PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクト IDJOB_NAME: 一意の任意のジョブ名LOCATION: Dataflow ジョブをデプロイするリージョン(例:us-central1)VERSION: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内の対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
SUBSCRIPTION_NAME: Pub/Sub サブスクリプション名BUCKET_NAME: Cloud Storage バケットの名前
次のステップ
- Dataflow テンプレートについて学習する。
- Google 提供のテンプレートのリストを確認する。