JavaScript ユーザー定義関数(UDF)は、単一メッセージ変換(SMT)の一種です。UDF は、BigQuery JavaScript UDF と同様に、Pub/Sub 内でカスタム変換ロジックを実装する柔軟な方法を提供します。
UDF は入力として単一のメッセージを受け取り、入力に対して定義されたアクションを実行し、プロセスの結果を返します。
UDF には次の主なプロパティがあります。
コード: 変換ロジックを定義する JavaScript コード。
関数名: Pub/Sub がメッセージに適用する、指定されたコード内の JavaScript 関数の名前。
JavaScript 関数を作成する
UDF コードには、次のシグネチャを持つ関数が含まれている必要があります。
/**
* Transforms a Pub/Sub message.
* @return {(Object<string, (string | Object<string, string>)>|* null)} - To
* filter a message, return `null`. To transform a message, return a map with
* the following keys:
* - (required) 'data' : {string}
* - (optional) 'attributes' : {Object<string, string>}
* Returning empty `attributes` will remove all attributes from the message.
*
* @param {(Object<string, (string | Object<string, string>)>} - Pub/Sub
* message. Keys:
* - (required) 'data' : {string}
* - (required) 'attributes' : {Object<string, string>}
*
* @param {Object<string, any>} metadata - Pub/Sub message metadata.
* Keys:
* - (optional) 'message_id' : {string}
* - (optional) 'publish_time': {string} YYYY-MM-DDTHH:MM:SSZ format
* - (optional) 'ordering_key': {string}
*/
function <function_name>(message, metadata) {
// Perform custom transformation logic
return message; // to filter a message instead, return `null`
}
入力
この関数は次の入力を受け取ります。
message引数: Pub/Sub メッセージを表す JavaScript オブジェクト。次のプロパティが含まれます。data:(String、必須)メッセージ ペイロード。attributes:(Object<String, String>、省略可)メッセージ属性を表すキーと値のペアのマップ。
metadata引数: Pub/Sub メッセージに関する不変のメタデータを含む JavaScript オブジェクト。
出力
この関数は、次のいずれかを返す必要があります。
メッセージを変換するには、
message.dataとmessage.attributesの内容を編集し、変更されたmessageオブジェクトを返します。メッセージをフィルタするには、
nullを返します。
入出力の要件
- UDF がメッセージ ペイロードを変換する場合、ペイロードの入力と出力は UTF-8 でエンコードされた文字列である必要があります。
- UDF がメッセージ ペイロードを変換しない場合、ペイロードは任意のエンコードを使用できます。
- 属性の Key-Value ペアは UTF-8 でエンコードされた文字列にする必要があります。
UDF がメッセージを変換する方法
メッセージに対して UDF を実行した結果は、次のいずれかになります。
UDF はメッセージを変換します。
UDF は
nullを返します。トピック SMT: Pub/Sub はパブリッシャーに成功を返し、フィルタされたメッセージのレスポンスにメッセージ ID を含めます。Pub/Sub はメッセージを保存せず、サブスクライバーに送信しません。
サブスクリプション SMT: Pub/Sub は、メッセージをサブスクライバーに送信せずにメッセージ配信を確認応答します。
UDF がエラーをスローします。
トピック SMT: Pub/Sub はパブリッシャーにエラーを返し、メッセージをパブリッシュしません。
サブスクリプション SMT: Pub/Sub がメッセージを否定応答します。
UDF SMT を作成する
SMT は、Pub/Sub トピックまたはサブスクリプションで構成できます。
- トピック SMT は、Pub/Sub がメッセージを保存する前に実行され、結果はすべてのサブスクライバーが利用できます。
サブスクリプション SMT はメッセージが配信される前に実行され、結果はそのサブスクリプションでのみ使用できます。
コンソール
Google Cloud コンソールで、Pub/Sub の [トピック] ページに移動します。
トピックまたはサブスクリプションのいずれかを作成します。
トピックを作成するには、[トピックを作成] をクリックします。[トピックを作成] ページが開きます。
サブスクリプションを作成するには:
サブスクリプションを作成するトピックの名前をクリックします。
[サブスクリプションを作成] をクリックします。[サブスクリプションをトピックに追加] ページが開きます。
[変換] で、[変換を追加] をクリックします。
[変換タイプ] で [JavaScript UDF] を選択します。
[関数名] フィールドに、SMT が呼び出す JavaScript 関数の名前を入力します。例:
redactSSNテキスト領域に、UDF のコードを入力します。例:
function redactSSN(message, metadata) { const data = JSON.parse(message.data); delete data['ssn']; message.data = JSON.stringify(data); return message; }コードには、[関数名] フィールドと一致する名前の関数が含まれている必要があります。
SMT をすぐに有効にしない場合は、[変換を無効にする] を選択します。このオプションを選択すると、SMT はトピックとともに作成されますが、受信メッセージに対して実行されません。トピックを作成したら、トピックを編集して SMT を有効にできます。
トピックまたはサブスクリプションを作成するには、[作成] をクリックします。
gcloud
定義ファイルを作成する
UDF SMT を定義する YAML ファイルまたは JSON ファイルを作成します。
YAML
- javascriptUdf:
code: { FUNCTION_CODE }
functionName: FUNCTION_NAME
JSON
{
"javascriptUdf": {
"code": {
FUNCTION_CODE
}
"functionName": FUNCTION_NAME
}
}
次のように置き換えます。
FUNCTION_CODE: UDF の JavaScript コード。コードには、名前が
functionNameフィールドと一致する関数が含まれている必要があります。例:function redactSSN(message, metadata) { const data = JSON.parse(message.data); delete data['ssn']; message.data = JSON.stringify(data); return message; }FUNCTION_NAME: SMT が呼び出す JavaScript 関数の名前。例:
redactSSN
トピックまたはサブスクリプションを作成する
トピックを作成するには、gcloud pubsub topics create コマンドを実行します。
gcloud pubsub topics create TOPIC_ID \
--message-transforms-file=TRANSFORMS_FILE
次のように置き換えます。
- TOPIC_ID: 作成するトピックの ID または名前。
- TRANSFORMS_FILE: 定義ファイルへのパス。
サブスクリプションを作成するには、gcloud pubsub subscriptions create コマンドを実行します。
gcloud pubsub subscriptions create SUBSCRIPTION_ID \
--topic=projects/PROJECT_ID/topics/TOPIC_ID \
--message-transforms-file=TRANSFORMS_FILE
次のように置き換えます。
SUBSCRIPTION_ID: 作成するサブスクリプションの ID または名前。
PROJECT_ID: トピックを含むプロジェクトの ID。
TOPIC_ID: 登録するトピックの ID。
TRANSFORMS_FILE: 定義ファイルへのパス。
必要に応じて、SMT を作成する前に検証してテストできます。詳しくは、以下のページをご覧ください。
制限事項
Pub/Sub は、効率的な変換オペレーションを確保するために、UDF にリソースの上限を適用します。制限事項は次のとおりです。
- UDF あたり最大 20 KB のコード
- メッセージあたりの最大実行時間 500 ミリ秒
- ECMAScript 標準組み込みのみをサポート
- 外部 API への呼び出しなし
- 外部ライブラリのインポートなし
サンプル UDF
パブリッシュとサブスクライブの UDF の例を次に示します。その他のサンプルは、UDF ライブラリで確認できます。
関数: 曜日の整数を対応する文字列に変換します。
次の UDF をトピックまたはサブスクリプションに追加すると、メッセージのパブリッシュまたは配信時に次の変更が行われます。
Pub/Sub はメッセージに関数を適用します。メッセージに JSON ペイロードがない場合、UDF はエラーをスローします。
UDF は
dayOfWeekというフィールドを探し、このフィールドの値が 0 ~ 6 の数値である場合は、Mondayなどの対応する曜日に変換します。フィールドが存在しない場合、または数値が 0 ~ 6 の範囲にない場合、コードはdayOfWeekフィールドをUnknownに設定します。UDF は、変更されたペイロードをメッセージにシリアル化して戻します。
Pub/Sub は、更新されたメッセージをパイプラインの次のステップに渡します。
function intToString(message, metadata) {
const data = JSON.parse(message.data);
switch(`data["dayOfWeek"]`) {
case 0:
data["dayOfWeek"] = "Sunday";
break;
case 1:
data["dayOfWeek"] = "Monday";
break;
case 2:
data["dayOfWeek"] = "Tuesday";
break;
case 3:
data["dayOfWeek"] = "Wednesday";
break;
case 4:
data["dayOfWeek"] = "Thursday";
break;
case 5:
data["dayOfWeek"] = "Friday";
break;
case 6:
data["dayOfWeek"] = "Saturday";
break;
default:
data["dayOfWeek"] = "Unknown";
}
message.data = JSON.stringify(data);
return message;
}
機能: 社会保障番号を秘匿化する
次の UDF をトピックまたはサブスクリプションに追加すると、メッセージのパブリッシュまたは配信時に次の変更が行われます。
Pub/Sub はメッセージに関数を適用します。メッセージに JSON ペイロードがない場合、UDF はエラーをスローします。
UDF は、メッセージ ペイロードからフィールド
ssnを削除します(存在する場合)。UDF は、変更されたペイロードをメッセージにシリアル化して戻します。
Pub/Sub は、更新されたメッセージをパイプラインの次のステップに渡します。
function redactSSN(message, metadata) {
const data = JSON.parse(message.data);
delete data['ssn'];
message.data = JSON.stringify(data);
return message;
}
機能: 特定のメッセージをフィルタして自動的に確認応答する
次の UDF をトピックまたはサブスクリプションに追加すると、メッセージのパブリッシュまたは配信時に次の変更が行われます。
Pub/Sub はメッセージに関数を適用します。メッセージに JSON ペイロードがない場合、UDF はエラーをスローします。
UDF は、ペイロードに
regionというフィールドが含まれているかどうかを確認します。regionフィールドの値がUSでない場合、関数は null を返し、Pub/Sub はメッセージをフィルタします。regionフィールドの値がUSの場合、Pub/Sub は元のメッセージをパイプラインの次のステップに渡します。
function filterForUSRegion(message, metadata) {
const data = JSON.parse(message.data);
if (data["region"] !== "US") {
return null;
}
return message;
}
関数: メッセージの内容を検証して、金額が 100 を超えていないことを確認します
次の UDF をトピックまたはサブスクリプションに追加すると、メッセージのパブリッシュまたは配信時に次の変更が行われます。
Pub/Sub はメッセージに関数を適用します。メッセージに JSON ペイロードがない場合、UDF はエラーをスローします。
UDF は、メッセージに
amountというフィールドが含まれているかどうかを確認します。amountフィールドの値が100より大きい場合、関数はエラーをスローします。amountフィールドの値が100より大きくない場合、関数は元のメッセージを返します。Pub/Sub は、メッセージを失敗としてマークするか、元のメッセージをパイプラインの次のステップに渡します。
function validateAmount(message, metadata) {
const data = JSON.parse(message.data);
if (data["amount"] > 100) {
throw new Error("Amount is invalid");
}
return message;
}
次のステップ
その他のサンプルについては、UDF ライブラリをご覧ください。