/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;
import static com.google.cloud.teleport.templates.TextToBigQueryStreaming.wrapBigQueryInsertError;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.coders.FailsafeElementCoder;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateCreationParameter;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.templates.PubSubToBigQuery.Options;
import com.google.cloud.teleport.templates.common.BigQueryConverters.FailsafeJsonToTableRow;
import com.google.cloud.teleport.templates.common.ErrorConverters;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.util.DualInputNestedValueProvider;
import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
import com.google.cloud.teleport.util.ResourceUtils;
import com.google.cloud.teleport.util.ValueProviderUtils;
import com.google.cloud.teleport.values.FailsafeElement;
import com.google.common.collect.ImmutableList;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * The {@link PubSubToBigQuery} pipeline is a streaming pipeline which ingests data in JSON format
 * from Cloud Pub/Sub, executes a UDF, and outputs the resulting records to BigQuery. Any errors
 * which occur in the transformation of the data or execution of the UDF will be output to a
 * separate errors table in BigQuery. The errors table will be created if it does not exist prior to
 * execution. Both output and error tables are specified by the user as template parameters.
 *
 * <p><b>Pipeline Requirements</b>
 *
 * <ul>
 *   <li>The Pub/Sub topic exists.
 *   <li>The BigQuery output table exists.
 * </ul>
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_PubSub_Subscription_to_BigQuery.md">README
 * for Subscription</a> or <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_PubSub_to_BigQuery.md">README
 * for Topic</a> for instructions on how to use or modify this template.
 */
@Template(
    name = "PubSub_Subscription_to_BigQuery",
    category = TemplateCategory.STREAMING,
    displayName = "Pub/Sub Subscription to BigQuery",
    description =
        "The Pub/Sub Subscription to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub subscription and writes them to a BigQuery table. "
            + "You can use the template as a quick solution to move Pub/Sub data to BigQuery. "
            + "The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements.",
    optionsClass = Options.class,
    skipOptions = "inputTopic",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-subscription-to-bigquery",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The <a href=\"https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage\">`data` field</a> of Pub/Sub messages must use the JSON format, described in this <a href=\"https://developers.google.com/api-client-library/java/google-http-java-client/json\">JSON guide</a>. For example, messages with values in the `data` field formatted as `{\"k1\":\"v1\", \"k2\":\"v2\"}` can be inserted into a BigQuery table with two columns, named `k1` and `k2`, with a string data type.",
      "The output table must exist prior to running the pipeline. The table schema must match the input JSON objects."
    },
    streaming = true,
    supportsAtLeastOnce = true,
    supportsExactlyOnce = true)
@Template(
    name = "PubSub_to_BigQuery",
    category = TemplateCategory.STREAMING,
    displayName = "Pub/Sub Topic to BigQuery",
    description =
        "The Pub/Sub Topic to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub topic and writes them to a BigQuery table. "
            + "You can use the template as a quick solution to move Pub/Sub data to BigQuery. "
            + "The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements.",
    optionsClass = Options.class,
    skipOptions = "inputSubscription",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-bigquery",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The <a href=\"https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage\">`data` field</a> of Pub/Sub messages must use the JSON format, described in this <a href=\"https://developers.google.com/api-client-library/java/google-http-java-client/json\">JSON guide</a>. For example, messages with values in the `data` field formatted as `{\"k1\":\"v1\", \"k2\":\"v2\"}` can be inserted into a BigQuery table with two columns, named `k1` and `k2`, with a string data type.",
      "The output table must exist prior to running the pipeline. The table schema must match the input JSON objects."
    },
    hidden = true,
    streaming = true,
    supportsAtLeastOnce = true)
public class PubSubToBigQuery {
  /** The log to output status messages to. */
  private static final Logger LOG = LoggerFactory.getLogger(PubSubToBigQuery.class);
  /** The tag for the main output for the UDF. */
  public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_OUT =
      new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
  /** The tag for the main output of the json transformation. */
  public static final TupleTag<TableRow> TRANSFORM_OUT = new TupleTag<TableRow>() {};
  /** The tag for the dead-letter output of the udf. */
  public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_DEADLETTER_OUT =
      new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
  /** The tag for the dead-letter output of the json to table row transform. */
  public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_DEADLETTER_OUT =
      new TupleTag<FailsafeElement<PubsubMessage, String>>() {};
  /** The default suffix for error tables if dead letter table is not specified. */
  public static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records";
  /** Pubsub message/string coder for pipeline. */
  public static final FailsafeElementCoder<PubsubMessage, String> CODER =
      FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());
  /** String/String Coder for FailsafeElement. */
  public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
  /**
   * The {@link Options} class provides the custom execution options passed by the executor at the
   * command-line.
   */
  public interface Options extends PipelineOptions, JavascriptTextTransformerOptions {
    @TemplateParameter.BigQueryTable(
        order = 1,
        description = "BigQuery output table",
        helpText =
            "The BigQuery output table location, in the format `<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>`")
    ValueProvider<String> getOutputTableSpec();
    void setOutputTableSpec(ValueProvider<String> value);
    @TemplateParameter.PubsubTopic(
        order = 2,
        description = "Input Pub/Sub topic",
        helpText = "The Pub/Sub topic to read the input from.")
    ValueProvider<String> getInputTopic();
    void setInputTopic(ValueProvider<String> value);
    @TemplateParameter.PubsubSubscription(
        order = 3,
        description = "Pub/Sub input subscription",
        helpText =
            "The Pub/Sub input subscription to read from, in the format `projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION>`.")
    ValueProvider<String> getInputSubscription();
    void setInputSubscription(ValueProvider<String> value);
    @TemplateCreationParameter(template = "PubSub_to_BigQuery", value = "false")
    @TemplateCreationParameter(template = "PubSub_Subscription_to_BigQuery", value = "true")
    @Description(
        "This determines whether the template reads from a Pub/sub subscription or a topic")
    @Default.Boolean(false)
    Boolean getUseSubscription();
    void setUseSubscription(Boolean value);
    @TemplateParameter.BigQueryTable(
        order = 5,
        optional = true,
        description =
            "Table for messages failed to reach the output table (i.e., Deadletter table)",
        helpText =
            "The BigQuery table to use for messages that fail to reach the output table, in the format of `<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>`. If the table doesn't exist, it is created during pipeline execution. If not specified, `OUTPUT_TABLE_SPEC_error_records` is used.")
    ValueProvider<String> getOutputDeadletterTable();
    void setOutputDeadletterTable(ValueProvider<String> value);
  }
  /**
   * The main entry-point for pipeline execution. This method will start the pipeline but will not
   * wait for it's execution to finish. If blocking execution is required, use the {@link
   * PubSubToBigQuery#run(Options)} method to start the pipeline and invoke {@code
   * result.waitUntilFinish()} on the {@link PipelineResult}.
   *
   * @param args The command-line args passed by the executor.
   */
  public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    run(options);
  }
  /**
   * Runs the pipeline to completion with the specified options. This method does not wait until the
   * pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
   * object to block until the pipeline is finished running if blocking programmatic execution is
   * required.
   *
   * @param options The execution options.
   * @return The pipeline result.
   */
  public static PipelineResult run(Options options) {
    Pipeline pipeline = Pipeline.create(options);
    CoderRegistry coderRegistry = pipeline.getCoderRegistry();
    coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);
    /*
     * Steps:
     *  1) Read messages in from Pub/Sub
     *  2) Transform the PubsubMessages into TableRows
     *     - Transform message payload via UDF
     *     - Convert UDF result to TableRow objects
     *  3) Write successful records out to BigQuery
     *  4) Write failed records out to BigQuery
     */
    /*
     * Step #1: Read messages in from Pub/Sub
     * Either from a Subscription or Topic
     */
    PCollection<PubsubMessage> messages = null;
    if (options.getUseSubscription()) {
      messages =
          pipeline.apply(
              "ReadPubSubSubscription",
              PubsubIO.readMessagesWithAttributes()
                  .fromSubscription(options.getInputSubscription()));
    } else {
      messages =
          pipeline.apply(
              "ReadPubSubTopic",
              PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()));
    }
    PCollectionTuple convertedTableRows =
        messages
            /*
             * Step #2: Transform the PubsubMessages into TableRows
             */
            .apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));
    /*
     * Step #3: Write the successful records out to BigQuery
     */
    WriteResult writeResult =
        convertedTableRows
            .get(TRANSFORM_OUT)
            .apply(
                "WriteSuccessfulRecords",
                BigQueryIO.writeTableRows()
                    .withoutValidation()
                    .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                    .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                    .withExtendedErrorInfo()
                    .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                    .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                    .to(options.getOutputTableSpec()));
    /*
     * Step 3 Contd.
     * Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
     */
    PCollection<FailsafeElement<String, String>> failedInserts =
        writeResult
            .getFailedInsertsWithErr()
            .apply(
                "WrapInsertionErrors",
                MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
                    .via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
            .setCoder(FAILSAFE_ELEMENT_CODER);
    /*
     * Step #4: Write records that failed table row transformation
     * or conversion out to BigQuery deadletter table.
     */
    PCollectionList.of(
            ImmutableList.of(
                convertedTableRows.get(UDF_DEADLETTER_OUT),
                convertedTableRows.get(TRANSFORM_DEADLETTER_OUT)))
        .apply("Flatten", Flatten.pCollections())
        .apply(
            "WriteFailedRecords",
            ErrorConverters.WritePubsubMessageErrors.newBuilder()
                .setErrorRecordsTable(
                    ValueProviderUtils.maybeUseDefaultDeadletterTable(
                        options.getOutputDeadletterTable(),
                        options.getOutputTableSpec(),
                        DEFAULT_DEADLETTER_TABLE_SUFFIX))
                .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
                .build());
    // 5) Insert records that failed insert into deadletter table
    failedInserts.apply(
        "WriteFailedRecords",
        ErrorConverters.WriteStringMessageErrors.newBuilder()
            .setErrorRecordsTable(
                ValueProviderUtils.maybeUseDefaultDeadletterTable(
                    options.getOutputDeadletterTable(),
                    options.getOutputTableSpec(),
                    DEFAULT_DEADLETTER_TABLE_SUFFIX))
            .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
            .build());
    return pipeline.run();
  }
  /**
   * If deadletterTable is available, it is returned as is, otherwise outputTableSpec +
   * defaultDeadLetterTableSuffix is returned instead.
   */
  private static ValueProvider<String> maybeUseDefaultDeadletterTable(
      ValueProvider<String> deadletterTable,
      ValueProvider<String> outputTableSpec,
      String defaultDeadLetterTableSuffix) {
    return DualInputNestedValueProvider.of(
        deadletterTable,
        outputTableSpec,
        new SerializableFunction<TranslatorInput<String, String>, String>() {
          @Override
          public String apply(TranslatorInput<String, String> input) {
            String userProvidedTable = input.getX();
            String outputTableSpec = input.getY();
            if (userProvidedTable == null) {
              return outputTableSpec + defaultDeadLetterTableSuffix;
            }
            return userProvidedTable;
          }
        });
  }
  /**
   * The {@link PubsubMessageToTableRow} class is a {@link PTransform} which transforms incoming
   * {@link PubsubMessage} objects into {@link TableRow} objects for insertion into BigQuery while
   * applying an optional UDF to the input. The executions of the UDF and transformation to {@link
   * TableRow} objects is done in a fail-safe way by wrapping the element with it's original payload
   * inside the {@link FailsafeElement} class. The {@link PubsubMessageToTableRow} transform will
   * output a {@link PCollectionTuple} which contains all output and dead-letter {@link
   * PCollection}.
   *
   * <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}:
   *
   * <ul>
   *   <li>{@link PubSubToBigQuery#UDF_OUT} - Contains all {@link FailsafeElement} records
   *       successfully processed by the optional UDF.
   *   <li>{@link PubSubToBigQuery#UDF_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
   *       records which failed processing during the UDF execution.
   *   <li>{@link PubSubToBigQuery#TRANSFORM_OUT} - Contains all records successfully converted from
   *       JSON to {@link TableRow} objects.
   *   <li>{@link PubSubToBigQuery#TRANSFORM_DEADLETTER_OUT} - Contains all {@link FailsafeElement}
   *       records which couldn't be converted to table rows.
   * </ul>
   */
  static class PubsubMessageToTableRow
      extends PTransform<PCollection<PubsubMessage>, PCollectionTuple> {
    private final Options options;
    PubsubMessageToTableRow(Options options) {
      this.options = options;
    }
    @Override
    public PCollectionTuple expand(PCollection<PubsubMessage> input) {
      PCollectionTuple udfOut =
          input
              // Map the incoming messages into FailsafeElements so we can recover from failures
              // across multiple transforms.
              .apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()))
              .apply(
                  "InvokeUDF",
                  FailsafeJavascriptUdf.<PubsubMessage>newBuilder()
                      .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                      .setFunctionName(options.getJavascriptTextTransformFunctionName())
                      .setReloadIntervalMinutes(
                          options.getJavascriptTextTransformReloadIntervalMinutes())
                      .setSuccessTag(UDF_OUT)
                      .setFailureTag(UDF_DEADLETTER_OUT)
                      .build());
      // Convert the records which were successfully processed by the UDF into TableRow objects.
      PCollectionTuple jsonToTableRowOut =
          udfOut
              .get(UDF_OUT)
              .apply(
                  "JsonToTableRow",
                  FailsafeJsonToTableRow.<PubsubMessage>newBuilder()
                      .setSuccessTag(TRANSFORM_OUT)
                      .setFailureTag(TRANSFORM_DEADLETTER_OUT)
                      .build());
      // Re-wrap the PCollections so we can return a single PCollectionTuple
      return PCollectionTuple.of(UDF_OUT, udfOut.get(UDF_OUT))
          .and(UDF_DEADLETTER_OUT, udfOut.get(UDF_DEADLETTER_OUT))
          .and(TRANSFORM_OUT, jsonToTableRowOut.get(TRANSFORM_OUT))
          .and(TRANSFORM_DEADLETTER_OUT, jsonToTableRowOut.get(TRANSFORM_DEADLETTER_OUT));
    }
  }
  /**
   * The {@link PubsubMessageToFailsafeElementFn} wraps an incoming {@link PubsubMessage} with the
   * {@link FailsafeElement} class so errors can be recovered from and the original message can be
   * output to a error records table.
   */
  static class PubsubMessageToFailsafeElementFn
      extends DoFn<PubsubMessage, FailsafeElement<PubsubMessage, String>> {
    @ProcessElement
    public void processElement(ProcessContext context) {
      PubsubMessage message = context.element();
      context.output(
          FailsafeElement.of(message, new String(message.getPayload(), StandardCharsets.UTF_8)));
    }
  }
}