Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Expand
titleDo it Yourself!

Task Overview

This example enables us to record and store changing gas prices and see how they change over time. To do so, we need to:

  1. Insert a table into the Redshift database and gas price data.
  2. Add new rows of gas price data as they come in and check to see how existing values are historized.

Download and Import Sample Pipelines

Before we start, you need to download, import, and configure the pipelines associated with this example:

  1. Download the Redshift_Iteration_Initialization_Example ZIP file. It contains two pipelines:
    • Redshift_Iteration_Initialization_Example: Creates a DB table with initial data in Redshift. Once this is done, it upserts data into the DB.
    • Redshift_Select_Write: Retrieves the latest data from Redshift and writes it to a file, enabling us to see how the data in Redshift changes with every upsert iteration.
  2. Import the pipelines into SnapLogic and configure the Redshift Snaps with your Redshift account.

Pipeline Properties

The Redshift_Iteration_Initialization_Example pipeline creates a Gas Dimensions (gas_dim_test) table and inserts new gas price values into it. To do so, it needs to initialize the DB using one branch of the pipeline, and then iteratively upsert new data using other branches. To control which branch of the pipeline gets triggered, two parameters must be declared at the pipeline level:

  • INITIALIZE: Indicates whether the task being performed is one of initialization.
  • SCD2_ITERATION: Indicates whether the task being performed is an iterative upsert.

Later, in the pipeline, we shall use these parameters to create trigger configurations to control when each child pipeline is executed.

Understanding the Sample Pipelines

This section describes the two pipelines used in this example and explains how they work together.

The Redshift_Iteration_Initialization_Example Pipeline

The Redshift_Iteration_Initialization_Example pipeline comprises three pipelines:

  1. Create DB Table and Upload Initial Data: Creates the DB table and uploads three rows of data.
  2. Upload Updated Data: Uploads new data associated with the rows created by Pipeline 1 above.
  3. Truncate DB Table: Clears the contents of the DB table, resetting the pipeline for reuse from scratch.
Create DB Table and Upload Initial Data

This pipeline contains the following Snaps:

Snap NameSnapPurposeSettingsComments
Initialization DataJSON Generator SnapContains the initial data for the Gas Dimensions DB table.

Contains the initial upload data:

Expand
titleView initial upload data


Paste code macro
 
[
    {
     "premium": 4.13,
     "regular": 3.83,
     "station_id": "U01",
     "zipcode": "94402"
    },
    {
     "premium": 3.05,
     "regular": 2.75,
     "station_id": "V01",
     "zipcode": "94402"
    },
    {
     "premium": 3.45,
     "regular": 3.25,
     "station_id": "C01",
     "zipcode": "94402"
    }
]




The data is organized into four columns:

  • premium: Lists the price of premium fuel.
  • regular: Lists the price of regular fuel.
  • station_id: Lists the identifier of the gas station from where the data was collected.
  • zipcode: Lists the zip code in which the gas station operates.

The important thing to note here is that, while gas rates may change, the data in the station_id and zipcode fields will not. Thus, these can be used as key fields later in the pipeline.

Check InitializeFilter SnapChecks whether the DB table can be created.

Contains the following configuration in the Filter field:


Paste code macro

parseInt(_INITITALIZE) > 0


This Snap checks whether the value of the INITIALIZE pipeline parameter is greater than 0. If it is, then the Snap executes the Snaps that follow it; else, it exits.

Thus, when we need to create the gas_dim table, we set the INITIALIZE parameter to 1. Once the pipeline completes execution, we change the value of the Initialize parameter to 0. This ensures that the table is not re-created.

Map Attributes to RedshiftMapper SnapMaps attributes in the input data to column headers that must be created in the Redshift DB.

Contains the following mappings:

  • Date.now(): $start_date: Instructs the DB to take the current date-time as the time from which the uploaded data is active.
  • null: $end_date: Sets end_date to 'null'.
  • true: $active: Maps the value 'true' to indicate that a record is active.
  • $station_id: $station_id: Maps station_id in the input data to station_id in the Redshift DB table.
  • $zipcode: $zipcode: Maps zipcode in the input data to zipcode in the Redshift DB table.
  • $premium: $premium: Maps premium in the input data to premium in the Redshift DB table.
  • $regular: $regular: Maps regular in the input data to regular in the Redshift DB table.

Create Table in Redshift with Input DataRedshift - InsertCreates a DB table in Redshift.

Contains the following settings:

  • Schema Name: "public"
  • Table Name: "gas_dim_test"

This creates a table named "gas_dim_test" in the "public" schema in Redshift with the data in the Initialization Data Snap.

Upload Updated Data

This pipeline updates the data in the gas_dim_test DB table in three iterations. This pipeline contains the following Snaps:

Snap NameSnapPurposeSettingsComments

Iteration 1

SCD2 Data (Iteration 1)JSON GeneratorContains the updated data for the gas_dim_test DB table.

Contains the following data:

Expand
titleView the data used for Iteration 1


Paste code macro

[
    {
     "premium": 4.25,
     "regular": 3.98,
     "station_id": "U01",
     "zipcode": "94402"
    }
]




Iteration 1 updates the data associated with station_id U01.
Check Iteration 1Filter SnapChecks whether the first data update iteration should be executed.

Contains the following setting in the Filter expression field:

Paste code macro

parseInt(_SCD2_ITERATION) == 1


If, in the pipeline properties, the value of the SCD2_ITERATION parameter is 1, and the value of the INITIALIZE parameter is 0, then this pipeline is executed.
Check Not InitializeFilter Snap

Contains the following setting in the Filter expression field:

Paste code macro

parseInt(_INITITALIZE) == 0


Iteration 2
SCD2 Data (Iteration 2)JSON GeneratorContains the updated data for the gas_dim_test DB table.

Contains the following data:

Expand
titleView the data used for Iteration 2


Paste code macro

[
    {
     "premium": 4.00,
     "regular": 3.71,
     "station_id": "U01",
     "zipcode": "94402"
    },
    {
     "premium": 3.99,
     "regular": 3.50,
     "station_id": "V01",
     "zipcode": "94402"
    }    
]




Iteration 2 updates the data associated with station_id U01 and V01.
Check Iteration 2Filter SnapChecks whether the second data update iteration should be executed.

Contains the following setting in the Filter expression field:

Paste code macro

parseInt(_SCD2_ITERATION) == 2


If, in the pipeline properties, the value of the SCD2_ITERATION parameter is 2, and the value of the INITIALIZE parameter is 0, then this pipeline is executed.
Check Not InitializeFilter Snap

Contains the following setting in the Filter expression field:

Paste code macro

parseInt(_INITITALIZE) == 0


Iteration 3
SCD2 Data (Iteration 3)JSON GeneratorContains the updated data for the gas_dim_test DB table.

Contains the following data:

Expand
titleView the data used for Iteration 3


Paste code macro

[
    {
     "premium": 4.33,
     "regular": 3.98,
     "station_id": "U01",
     "zipcode": "94402"
    },
    {
     "premium": 4.12,
     "regular": 3.78,
     "station_id": "V01",
     "zipcode": "94402"
    },
    {
     "premium": 4.30,
     "regular": 3.90,
     "station_id": "C01",
     "zipcode": "94402"
    }        
]




Iteration 3 updates the data associated with station_id U01, V01, and C01.
Check Iteration 3Filter SnapChecks whether the third data update iteration should be executed.

Contains the following setting in the Filter expression field:

Paste code macro

parseInt(_SCD2_ITERATION) == 3


If, in the pipeline properties, the value of the SCD2_ITERATION parameter is 2, and the value of the INITIALIZE parameter is 0, then this pipeline is executed.
Check Not InitializeFilter Snap

Contains the following setting in the Filter expression field:

Paste code macro

parseInt(_INITITALIZE) == 0


Processing the Output of the Three Iterations
UnionUnion SnapTakes the output from the iteration triggered upline and passes it on to the downstream Snaps.

Prepare Upload Data
Redshift - SCD2
Prepares the data collected by the upstream Union Snap.

Contains settings that ensure that the correct DB table is updated, and that the appropriate fields are read the same way as they were created in the pipeline above.

Click the screenshot below to review the settings in detail.

These settings basically instruct Redshift to read the public.gas_dim table as follows:

  • Natural Key: The values in the following fields will never change: station_id, zipcode.
  • Cause-historization fields: When these values change, create a new row containing the new values. Mark the existing value as old.
  • SCD fields
    • All rows containing current data must contain the value 'true' in the 'active' field.
    • Similarly, all fields containing historical data must contain the value 'false' in the 'active' field.
    • The start date of the current row must always be the current date-time.
    • Assign the end date to the current data when updated data is received.


Upsert Updated Data
Redshift - Bulk Upsert
Upserts the data prepared by the Redshift - SCD2 Snap to the gas_dim_test DB table.

Contains the following settings:

Key columns: These are the columns used to check for existing entries in the target table:
  • station_id
  • zipcode

Once this Snap completes execution, the updated data is populated into the DB table, the 'active' column in the row containing the old data is updated to now read 'false', and a new row containing the updated data is added, where the 'active' column reads 'true'.

Truncate DB Table

Once the simuation is run, and you have seen how the Redshift - SCD 2 and Redshift - Upsert Snaps work together to update and historize DB data as new data arrives, you may want to clear the concerned table to ensure that, when the simulation is run again, an empty table is available.

This pipeline contains the following Snaps:

  • Check Iteration 4: Checks whether the fourth iteration should be executed. If the check succeeds, the downstream Snaps are executed; else, the pipeline exits.
  • Check Not Initialize: Checks whether the INITIALIZE parameter in the pipeline properties is set to '0'. If the check succeeds, the downstream Snaps are executed; else, the pipeline exits.
  • Redshift - Execute: Runs the following command on the gas_dim_test table to clear all its contents:

    Paste code macro
    
    truncate table public.gas_dim_test


The Redshift_Select_Write Pipeline

Run this pipeline after each iteration to view the updated output.

This pipeline contains the following Snaps:

  • Redshift - Select: Reads the contents of the public.gas_dim_test DB table.
  • JSON Formatter: Formats the data received from the Redshift - Select Snap as a JSON file.
  • File Writer: Saves the JSON data as RS_File.json in the SLDB.

Changes Made to the Public.gas_dim DB Table with Each Iteration

As the values entered in the SCD2_ITERATION and INITIALIZE pipeline parameters change, and as the pipelines are run iteratively, the public.gas_dim_test DB table is created and iteratively updated. This section lists out the contents of the table at the end of each iteration.

EventTriggered PipelineWhat Happens HereOutput
InitializationCreate DB Table and Upload Initial Data

The table is created, and data from the Initialization Data Snap is uploaded into it.

No. of rows of data: 3


Paste code macro

[
    {
        "start_date": "2018-07-05T07:10:39.418",
        "end_date": null,
        "active": true,
        "station_id": "U01",
        "zipcode": "94402",
        "premium": 4.13,
        "regular": 3.83
    },
    {
        "start_date": "2018-07-05T07:10:39.418",
        "end_date": null,
        "active": true,
        "station_id": "C01",
        "zipcode": "94402",
        "premium": 3.45,
        "regular": 3.25
    },
    {
        "start_date": "2018-07-05T07:10:39.418",
        "end_date": null,
        "active": true,
        "station_id": "V01",
        "zipcode": "94402",
        "premium": 3.05,
        "regular": 2.75
    }
]


Iteration 1Upload Updated Data

New data corresponding to the U01 station ID is uploaded, the existing row of data is end-dated, and the active status of the existing data is changed to false. The active status of the new data is saved as true.

No. of rows of data: 4

  • 3 current
  • 1 historical (for U01)


Paste code macro

[
    {
        "start_date": "2018-07-05T07:10:39.418",
        "end_date": null,
        "active": true,
        "station_id": "C01",
        "zipcode": "94402",
        "premium": 3.45,
        "regular": 3.25
    },
    {
        "start_date": "2018-07-05T07:10:39.418",
        "end_date": "2018-07-05T07:12:33.087",
        "active": false,
        "station_id": "U01",
        "zipcode": "94402",
        "premium": 4.13,
        "regular": 3.83
    },
    {
        "start_date": "2018-07-05T07:10:39.418",
        "end_date": null,
        "active": true,
        "station_id": "V01",
        "zipcode": "94402",
        "premium": 3.05,
        "regular": 2.75
    },
    {
        "start_date": "2018-07-05T07:12:33.088",
        "end_date": null,
        "active": true,
        "station_id": "U01",
        "zipcode": "94402",
        "premium": 4.25,
        "regular": 3.98
    }
]


Iteration 2Upload Updated Data

New data corresponding to the U01 and V01 station IDs is uploaded, the existing rows of data are end-dated, and the active status of the existing data is changed to false. The active status of the new data is saved as true.

No. of rows of data: 6

  • 3 current
  • 3 historical
    • 2 historical rows of station ID U01 data
    • 1 historical row of station ID V01 data


Paste code macro

[
    {
        "start_date": "2018-07-05T07:10:39.418",
        "end_date": "2018-07-05T07:12:33.087",
        "active": false,
        "station_id": "U01",
        "zipcode": "94402",
        "premium": 4.13,
        "regular": 3.83
    },
    {
        "start_date": "2018-07-05T07:41:09.779",
        "end_date": null,
        "active": true,
        "station_id": "V01",
        "zipcode": "94402",
        "premium": 3.99,
        "regular": 3.5
    },
    {
        "start_date": "2018-07-05T07:10:39.418",
        "end_date": "2018-07-05T07:41:09.779",
        "active": false,
        "station_id": "V01",
        "zipcode": "94402",
        "premium": 3.05,
        "regular": 2.75
    },
    {
        "start_date": "2018-07-05T07:12:33.088",
        "end_date": "2018-07-05T07:41:09.779",
        "active": false,
        "station_id": "U01",
        "zipcode": "94402",
        "premium": 4.25,
        "regular": 3.98
    },
    {
        "start_date": "2018-07-05T07:10:39.418",
        "end_date": null,
        "active": true,
        "station_id": "C01",
        "zipcode": "94402",
        "premium": 3.45,
        "regular": 3.25
    },
    {
        "start_date": "2018-07-05T07:41:09.779",
        "end_date": null,
        "active": true,
        "station_id": "U01",
        "zipcode": "94402",
        "premium": 4,
        "regular": 3.71
    }
]


Iteration 3Upload Updated Data

New data corresponding to the C01 station IDs is uploaded, and the active status of the existing data is changed to false.

New data corresponding to the U01 and V01 station IDs is uploaded, the existing rows of data are end-dated, and the active status of the existing data is changed to false. The active status of the new data is saved as true.

No. of rows of data: 9

  • 3 current
  • 6 historical
    • 3 historical rows of station ID U01 data
    • 2 historical row of station ID V01 data
    • 1 historical row of station ID C01 data


Paste code macro

[
    {
        "start_date": "2018-07-05T07:10:39.418",
        "end_date": "2018-07-05T07:12:33.087",
        "active": false,
        "station_id": "U01",
        "zipcode": "94402",
        "premium": 4.13,
        "regular": 3.83
    },
    {
        "start_date": "2018-07-05T07:41:09.779",
        "end_date": "2018-07-05T07:44:49.878",
        "active": false,
        "station_id": "V01",
        "zipcode": "94402",
        "premium": 3.99,
        "regular": 3.5
    },
    {
        "start_date": "2018-07-05T07:44:49.878",
        "end_date": null,
        "active": true,
        "station_id": "C01",
        "zipcode": "94402",
        "premium": 4.3,
        "regular": 3.9
    },
    {
        "start_date": "2018-07-05T07:10:39.418",
        "end_date": "2018-07-05T07:41:09.779",
        "active": false,
        "station_id": "V01",
        "zipcode": "94402",
        "premium": 3.05,
        "regular": 2.75
    },
    {
        "start_date": "2018-07-05T07:12:33.088",
        "end_date": "2018-07-05T07:41:09.779",
        "active": false,
        "station_id": "U01",
        "zipcode": "94402",
        "premium": 4.25,
        "regular": 3.98
    },
    {
        "start_date": "2018-07-05T07:44:49.878",
        "end_date": null,
        "active": true,
        "station_id": "U01",
        "zipcode": "94402",
        "premium": 4.33,
        "regular": 3.98
    },
    {
        "start_date": "2018-07-05T07:10:39.418",
        "end_date": "2018-07-05T07:44:49.878",
        "active": false,
        "station_id": "C01",
        "zipcode": "94402",
        "premium": 3.45,
        "regular": 3.25
    },
    {
        "start_date": "2018-07-05T07:41:09.779",
        "end_date": "2018-07-05T07:44:49.878",
        "active": false,
        "station_id": "U01",
        "zipcode": "94402",
        "premium": 4,
        "regular": 3.71
    },
    {
        "start_date": "2018-07-05T07:44:49.878",
        "end_date": null,
        "active": true,
        "station_id": "V01",
        "zipcode": "94402",
        "premium": 4.12,
        "regular": 3.78
    }
]


Iteration 4Truncate DB TableClears the contents of the gas_dim_test DB table.


Paste code macro

[]





Related Information

Downloads

...