Stream from Pub/Sub to BigQuery

This tutorial uses the Pub/Sub Subscription to BigQuery template to create and run a Dataflow template job using the Google Cloud console or Google Cloud CLI. The tutorial walks you through a streaming pipeline example that reads JSON-encoded messages from Pub/Sub and writes them to a BigQuery table.

Streaming analytics and data integration pipelines use Pub/Sub to ingest and distribute data. Pub/Sub enables you to create systems of event producers and consumers, called publishers and subscribers. Publishers send events to the Pub/Sub service asynchronously, and Pub/Sub delivers the events to all services that need to react to them.

Dataflow is a fully-managed service for transforming and enriching data in stream (real-time) and batch modes. It provides a simplified pipeline development environment that uses the Apache Beam SDK to transform incoming data and then output the transformed data.

The benefit of this workflow is that you can use UDFs to transform the message data before it is written to BigQuery.

Before running a Dataflow pipeline for this scenario, consider whether a Pub/Sub BigQuery subscription with a UDF meets your requirements.

Create a Cloud Storage bucket

Begin by creating a Cloud Storage bucket using the Google Cloud console or Google Cloud CLI. The Dataflow pipeline uses this bucket as a temporary storage location.

Console

  1. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  2. Click Create.

  3. On the Create a bucket page, for Name your bucket, enter a name that meets the bucket naming requirements. Cloud Storage bucket names must be globally unique. Don't select the other options.

  4. Click Create.

gcloud

Use the gcloud storage buckets create command:

gcloud storage buckets create gs://BUCKET_NAME

Replace BUCKET_NAME with a name for your Cloud Storage bucket that meets the bucket naming requirements. Cloud Storage bucket names must be globally unique.

Create a Pub/Sub topic and subscription

Create a Pub/Sub topic and then create a subscription to that topic.

Console

To create a topic, complete the following steps.

  1. In the Google Cloud console, go to the Pub/Sub Topics page.

    Go to Topics

  2. Click Create topic.

  3. In the Topic ID field, enter an ID for your topic. For information about how to name a topic, see Guidelines to name a topic or a subscription.

  4. Retain the option Add a default subscription. Don't select the other options.

  5. Click Create.

  6. In the topic details page, the name of the subscription that was created is listed under Subscription ID. Note this value for later steps.

gcloud

To create a topic, run the gcloud pubsub topics create command. For information about how to name a subscription, see Guidelines to name a topic or a subscription.

gcloud pubsub topics create TOPIC_ID

Replace TOPIC_ID with a name for your Pub/Sub topic.

To create a subscription to your topic, run the gcloud pubsub subscriptions create command:

gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID

Replace SUBSCRIPTION_ID with a name for your Pub/Sub subscription.

Create a BigQuery table

In this step, you create a BigQuery table with the following schema:

Column name Data type
name STRING
customer_id INTEGER

If you don't already have a BigQuery dataset, first create one. For more information, see Create datasets. Then create a new empty table:

Console

  1. Go to the BigQuery page.

    Go to BigQuery

  2. In the Explorer pane, expand your project, and then select a dataset.

  3. In the Dataset info section, click Create table.

  4. In the Create table from list, select Empty table.

  5. In the Table box, enter the name of the table.

  6. In the Schema section, click Edit as text.

  7. Paste in the following schema definition:

    name:STRING,
    customer_id:INTEGER
    
  8. Click Create table.

gcloud

Use the bq mk command.

bq mk --table \
  PROJECT_ID:DATASET_NAME.TABLE_NAME \
  name:STRING,customer_id:INTEGER

Replace the following:

  • PROJECT_ID: your project ID
  • DATASET_NAME: the name of the dataset
  • TABLE_NAME: the name of the table to create

Run the pipeline

Run a streaming pipeline using the Google-provided Pub/Sub Subscription to BigQuery template. The pipeline gets incoming data from the Pub/Sub topic and outputs the data to your BigQuery dataset.

Console

  1. In the Google Cloud console, go to the Dataflow Jobs page.

    Go to Jobs

  2. Click Create job from template.

  3. Enter a Job name for your Dataflow job.

  4. For Regional endpoint, select a region for your Dataflow job.

  5. For Dataflow template, select the Pub/Sub Subscription to BigQuery template.

  6. For BigQuery output table, select Browse and select your BigQuery table.

  7. In the Pub/Sub input subscription list, select the Pub/Sub subscription.

  8. For Temporary location, enter the following:

    gs://BUCKET_NAME/temp/
    

    Replace BUCKET_NAME with the name of your Cloud Storage bucket. The temp folder stores temporary files for the Dataflow jobs.

  9. Click Run job.

gcloud

To run the template in your shell or terminal, use the gcloud dataflow jobs run command.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-DATAFLOW_REGION/latest/PubSub_Subscription_to_BigQuery \
    --region DATAFLOW_REGION \
    --staging-location gs://BUCKET_NAME/temp \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME

Replace the following variables:

  • JOB_NAME. a name for the job
  • DATAFLOW_REGION: a region for the job
  • PROJECT_ID: the name of your Google Cloud Platform project
  • SUBSCRIPTION_ID: the name of your Pub/Sub subscription
  • DATASET_NAME: the name of your BigQuery dataset
  • TABLE_NAME: the name of your BigQuery table

Publish messages to Pub/Sub

After the Dataflow job starts, you can publish messages to Pub/Sub, and the pipeline writes them to BigQuery.

Console

  1. In the Google Cloud console, go to the Pub/Sub > Topics page.

    Go to Topics

  2. In the topic list, click the name of your topic.

  3. Click Messages.

  4. Click Publish messages.

  5. For Number of messages, enter 10.

  6. For Message body, enter {"name": "Alice", "customer_id": 1}.

  7. Click Publish.

gcloud

To publish messages to your topic, use the gcloud pubsub topics publish command.

for run in {1..10}; do
  gcloud pubsub topics publish TOPIC_ID --message='{"name": "Alice", "customer_id": 1}'
done

Replace TOPIC_ID with the name of your topic.

View your results

View the data written to your BigQuery table. It can take up to a minute for data to start appearing in your table.

Console

  1. In the Google Cloud console, go to the BigQuery page.
    Go to the BigQuery page

  2. In the query editor, run the following query:

    SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`
    LIMIT 1000
    

    Replace the following variables:

    • PROJECT_ID: the name of your Google Cloud Platform project
    • DATASET_NAME: the name of your BigQuery dataset
    • TABLE_NAME: the name of your BigQuery table

gcloud

Check the results in BigQuery by running the following query:

bq query --use_legacy_sql=false 'SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`'

Replace the following variables:

  • PROJECT_ID: the name of your Google Cloud Platform project
  • DATASET_NAME: the name of your BigQuery dataset
  • TABLE_NAME: the name of your BigQuery table

Use a UDF to transform the data

This tutorial assumes that the Pub/Sub messages are formatted as JSON, and that the BigQuery table schema matches the JSON data.

Optionally, you can provide a JavaScript user-defined function (UDF) that transforms the data before it is written to BigQuery. The UDF can perform additional processing, such as filtering, removing personal identifiable information (PII), or enriching the data with additional fields.

For more information, see Create user-defined functions for Dataflow templates.

Use a dead-letter table

While the job is running, the pipeline might fail to write individual messages to BigQuery. Possible errors include:

  • Serialization errors, including badly-formatted JSON.
  • Type conversion errors, caused by a mismatch in the table schema and the JSON data.
  • Extra fields in the JSON data that are not present in the table schema.

The pipeline writes these errors to a dead-letter table in BigQuery. By default, the pipeline automatically creates a dead-letter table named TABLE_NAME_error_records, where TABLE_NAME is the name of the output table. To use a different name, set the outputDeadletterTable template parameter.