Task OverviewThis example enables us to record and store changing gas prices and see how they change over time. To do so, we need to: - Insert a table into the Redshift database and gas price data.
- Add new rows of gas price data as they come in and check to see how existing values are historized.
Download and Import Sample PipelinesBefore we start, you need to download, import, and configure the pipelines associated with this example: - Download the Redshift_Iteration_Initialization_Example ZIP file. It contains two pipelines:
- Redshift_Iteration_Initialization_Example: Creates a DB table with initial data in Redshift. Once this is done, it upserts data into the DB.
- Redshift_Select_Write: Retrieves the latest data from Redshift and writes it to a file, enabling us to see how the data in Redshift changes with every upsert iteration.
- Import the pipelines into SnapLogic and configure the Redshift Snaps with your Redshift account.
Pipeline PropertiesThe Redshift_Iteration_Initialization_Example pipeline creates a Gas Dimensions (gas_dim_test) table and inserts new gas price values into it. To do so, it needs to initialize the DB using one branch of the pipeline, and then iteratively upsert new data using other branches. To control which branch of the pipeline gets triggered, two parameters must be declared at the pipeline level: - INITIALIZE: Indicates whether the task being performed is one of initialization.
- SCD2_ITERATION: Indicates whether the task being performed is an iterative upsert.
Later, in the pipeline, we shall use these parameters to create trigger configurations to control when each child pipeline is executed.
Understanding the Sample PipelinesThis section describes the two pipelines used in this example and explains how they work together. The Redshift_Iteration_Initialization_Example Pipeline
The Redshift_Iteration_Initialization_Example pipeline comprises three pipelines: - Create DB Table and Upload Initial Data: Creates the DB table and uploads three rows of data.
- Upload Updated Data: Uploads new data associated with the rows created by Pipeline 1 above.
- Truncate DB Table: Clears the contents of the DB table, resetting the pipeline for reuse from scratch.
Create DB Table and Upload Initial DataThis pipeline contains the following Snaps: Snap Name | Snap | Purpose | Settings | Comments |
---|
Initialization Data | JSON Generator Snap | Contains the initial data for the Gas Dimensions DB table. | Contains the initial upload data: Expand |
---|
title | View initial upload data |
---|
|
Paste code macro |
---|
[
{
"premium": 4.13,
"regular": 3.83,
"station_id": "U01",
"zipcode": "94402"
},
{
"premium": 3.05,
"regular": 2.75,
"station_id": "V01",
"zipcode": "94402"
},
{
"premium": 3.45,
"regular": 3.25,
"station_id": "C01",
"zipcode": "94402"
}
] |
|
| The data is organized into four columns: - premium: Lists the price of premium fuel.
- regular: Lists the price of regular fuel.
- station_id: Lists the identifier of the gas station from where the data was collected.
- zipcode: Lists the zip code in which the gas station operates.
The important thing to note here is that, while gas rates may change, the data in the station_id and zipcode fields will not. Thus, these can be used as key fields later in the pipeline. | Check Initialize | Filter Snap | Checks whether the DB table can be created. | Contains the following configuration in the Filter field:
Paste code macro |
---|
parseInt(_INITITALIZE) > 0 |
| This Snap checks whether the value of the INITIALIZE pipeline parameter is greater than 0. If it is, then the Snap executes the Snaps that follow it; else, it exits. Thus, when we need to create the gas_dim table, we set the INITIALIZE parameter to 1. Once the pipeline completes execution, we change the value of the Initialize parameter to 0. This ensures that the table is not re-created. | Map Attributes to Redshift | Mapper Snap | Maps attributes in the input data to column headers that must be created in the Redshift DB. | Contains the following mappings: - Date.now(): $start_date: Instructs the DB to take the current date-time as the time from which the uploaded data is active.
- null: $end_date: Sets end_date to 'null'.
- true: $active: Maps the value 'true' to indicate that a record is active.
- $station_id: $station_id: Maps station_id in the input data to station_id in the Redshift DB table.
- $zipcode: $zipcode: Maps zipcode in the input data to zipcode in the Redshift DB table.
- $premium: $premium: Maps premium in the input data to premium in the Redshift DB table.
- $regular: $regular: Maps regular in the input data to regular in the Redshift DB table.
|
| Create Table in Redshift with Input Data | Redshift - Insert | Creates a DB table in Redshift. | Contains the following settings: - Schema Name: "public"
- Table Name: "gas_dim_test"
| This creates a table named "gas_dim_test" in the "public" schema in Redshift with the data in the Initialization Data Snap. |
Upload Updated DataThis pipeline updates the data in the gas_dim_test DB table in three iterations. This pipeline contains the following Snaps: Snap Name | Snap | Purpose | Settings | Comments |
---|
Iteration 1 | SCD2 Data (Iteration 1) | JSON Generator | Contains the updated data for the gas_dim_test DB table. | Contains the following data: Expand |
---|
title | View the data used for Iteration 1 |
---|
|
Paste code macro |
---|
[
{
"premium": 4.25,
"regular": 3.98,
"station_id": "U01",
"zipcode": "94402"
}
] |
|
| Iteration 1 updates the data associated with station_id U01. | Check Iteration 1 | Filter Snap | Checks whether the first data update iteration should be executed. | Contains the following setting in the Filter expression field: Paste code macro |
---|
parseInt(_SCD2_ITERATION) == 1 |
| If, in the pipeline properties, the value of the SCD2_ITERATION parameter is 1, and the value of the INITIALIZE parameter is 0, then this pipeline is executed. | Check Not Initialize | Filter Snap | Contains the following setting in the Filter expression field: Paste code macro |
---|
parseInt(_INITITALIZE) == 0 |
| Iteration 2 | SCD2 Data (Iteration 2) | JSON Generator | Contains the updated data for the gas_dim_test DB table. | Contains the following data: Expand |
---|
title | View the data used for Iteration 2 |
---|
|
Paste code macro |
---|
[
{
"premium": 4.00,
"regular": 3.71,
"station_id": "U01",
"zipcode": "94402"
},
{
"premium": 3.99,
"regular": 3.50,
"station_id": "V01",
"zipcode": "94402"
}
] |
|
| Iteration 2 updates the data associated with station_id U01 and V01. | Check Iteration 2 | Filter Snap | Checks whether the second data update iteration should be executed. | Contains the following setting in the Filter expression field: Paste code macro |
---|
parseInt(_SCD2_ITERATION) == 2 |
| If, in the pipeline properties, the value of the SCD2_ITERATION parameter is 2, and the value of the INITIALIZE parameter is 0, then this pipeline is executed. | Check Not Initialize | Filter Snap | Contains the following setting in the Filter expression field: Paste code macro |
---|
parseInt(_INITITALIZE) == 0 |
| Iteration 3 | SCD2 Data (Iteration 3) | JSON Generator | Contains the updated data for the gas_dim_test DB table. | Contains the following data: Expand |
---|
title | View the data used for Iteration 3 |
---|
|
Paste code macro |
---|
[
{
"premium": 4.33,
"regular": 3.98,
"station_id": "U01",
"zipcode": "94402"
},
{
"premium": 4.12,
"regular": 3.78,
"station_id": "V01",
"zipcode": "94402"
},
{
"premium": 4.30,
"regular": 3.90,
"station_id": "C01",
"zipcode": "94402"
}
] |
|
| Iteration 3 updates the data associated with station_id U01, V01, and C01. | Check Iteration 3 | Filter Snap | Checks whether the third data update iteration should be executed. | Contains the following setting in the Filter expression field: Paste code macro |
---|
parseInt(_SCD2_ITERATION) == 3 |
| If, in the pipeline properties, the value of the SCD2_ITERATION parameter is 2, and the value of the INITIALIZE parameter is 0, then this pipeline is executed. | Check Not Initialize | Filter Snap |
| Contains the following setting in the Filter expression field: Paste code macro |
---|
parseInt(_INITITALIZE) == 0 |
| Processing the Output of the Three Iterations | Union | Union Snap | Takes the output from the iteration triggered upline and passes it on to the downstream Snaps. |
|
| Prepare Upload Data | | Prepares the data collected by the upstream Union Snap. | Contains settings that ensure that the correct DB table is updated, and that the appropriate fields are read the same way as they were created in the pipeline above. Click the screenshot below to review the settings in detail.
| These settings basically instruct Redshift to read the public.gas_dim table as follows:
| Upsert Updated Data | | Upserts the data prepared by the Redshift - SCD2 Snap to the gas_dim_test DB table. | Contains the following settings: Key columns: These are the columns used to check for existing entries in the target table: Note |
---|
For the Bulk Upsert Snap to work, at least one Natural key must be included in the Key columns field; else the upsert will fail. |
| Once this Snap completes execution, the updated data is populated into the DB table, the 'active' column in the row containing the old data is updated to now read 'false', and a new row containing the updated data is added, where the 'active' column reads 'true'. |
Truncate DB TableOnce the simuation is run, and you have seen how the Redshift - SCD 2 and Redshift - Upsert Snaps work together to update and historize DB data as new data arrives, you may want to clear the concerned table to ensure that, when the simulation is run again, an empty table is available. This pipeline contains the following Snaps: The Redshift_Select_Write PipelineRun this pipeline after each iteration to view the updated output. This pipeline contains the following Snaps: - Redshift - Select: Reads the contents of the public.gas_dim_test DB table.
- JSON Formatter: Formats the data received from the Redshift - Select Snap as a JSON file.
- File Writer: Saves the JSON data as RS_File.json in the SLDB.
Changes Made to the Public.gas_dim DB Table with Each IterationAs the values entered in the SCD2_ITERATION and INITIALIZE pipeline parameters change, and as the pipelines are run iteratively, the public.gas_dim_test DB table is created and iteratively updated. This section lists out the contents of the table at the end of each iteration. Event | Triggered Pipeline | What Happens Here | Output |
---|
Initialization | Create DB Table and Upload Initial Data | The table is created, and data from the Initialization Data Snap is uploaded into it. No. of rows of data: 3 |
Paste code macro |
---|
[
{
"start_date": "2018-07-05T07:10:39.418",
"end_date": null,
"active": true,
"station_id": "U01",
"zipcode": "94402",
"premium": 4.13,
"regular": 3.83
},
{
"start_date": "2018-07-05T07:10:39.418",
"end_date": null,
"active": true,
"station_id": "C01",
"zipcode": "94402",
"premium": 3.45,
"regular": 3.25
},
{
"start_date": "2018-07-05T07:10:39.418",
"end_date": null,
"active": true,
"station_id": "V01",
"zipcode": "94402",
"premium": 3.05,
"regular": 2.75
}
] |
| Iteration 1 | Upload Updated Data | New data corresponding to the U01 station ID is uploaded, the existing row of data is end-dated, and the active status of the existing data is changed to false. The active status of the new data is saved as true. No. of rows of data: 4 - 3 current
- 1 historical (for U01)
|
Paste code macro |
---|
[
{
"start_date": "2018-07-05T07:10:39.418",
"end_date": null,
"active": true,
"station_id": "C01",
"zipcode": "94402",
"premium": 3.45,
"regular": 3.25
},
{
"start_date": "2018-07-05T07:10:39.418",
"end_date": "2018-07-05T07:12:33.087",
"active": false,
"station_id": "U01",
"zipcode": "94402",
"premium": 4.13,
"regular": 3.83
},
{
"start_date": "2018-07-05T07:10:39.418",
"end_date": null,
"active": true,
"station_id": "V01",
"zipcode": "94402",
"premium": 3.05,
"regular": 2.75
},
{
"start_date": "2018-07-05T07:12:33.088",
"end_date": null,
"active": true,
"station_id": "U01",
"zipcode": "94402",
"premium": 4.25,
"regular": 3.98
}
] |
| Iteration 2 | Upload Updated Data | New data corresponding to the U01 and V01 station IDs is uploaded, the existing rows of data are end-dated, and the active status of the existing data is changed to false. The active status of the new data is saved as true. No. of rows of data: 6 - 3 current
- 3 historical
- 2 historical rows of station ID U01 data
- 1 historical row of station ID V01 data
|
Paste code macro |
---|
[
{
"start_date": "2018-07-05T07:10:39.418",
"end_date": "2018-07-05T07:12:33.087",
"active": false,
"station_id": "U01",
"zipcode": "94402",
"premium": 4.13,
"regular": 3.83
},
{
"start_date": "2018-07-05T07:41:09.779",
"end_date": null,
"active": true,
"station_id": "V01",
"zipcode": "94402",
"premium": 3.99,
"regular": 3.5
},
{
"start_date": "2018-07-05T07:10:39.418",
"end_date": "2018-07-05T07:41:09.779",
"active": false,
"station_id": "V01",
"zipcode": "94402",
"premium": 3.05,
"regular": 2.75
},
{
"start_date": "2018-07-05T07:12:33.088",
"end_date": "2018-07-05T07:41:09.779",
"active": false,
"station_id": "U01",
"zipcode": "94402",
"premium": 4.25,
"regular": 3.98
},
{
"start_date": "2018-07-05T07:10:39.418",
"end_date": null,
"active": true,
"station_id": "C01",
"zipcode": "94402",
"premium": 3.45,
"regular": 3.25
},
{
"start_date": "2018-07-05T07:41:09.779",
"end_date": null,
"active": true,
"station_id": "U01",
"zipcode": "94402",
"premium": 4,
"regular": 3.71
}
] |
| Iteration 3 | Upload Updated Data | New data corresponding to the C01 station IDs is uploaded, and the active status of the existing data is changed to false. New data corresponding to the U01 and V01 station IDs is uploaded, the existing rows of data are end-dated, and the active status of the existing data is changed to false. The active status of the new data is saved as true. No. of rows of data: 9 - 3 current
- 6 historical
- 3 historical rows of station ID U01 data
- 2 historical row of station ID V01 data
- 1 historical row of station ID C01 data
|
Paste code macro |
---|
[
{
"start_date": "2018-07-05T07:10:39.418",
"end_date": "2018-07-05T07:12:33.087",
"active": false,
"station_id": "U01",
"zipcode": "94402",
"premium": 4.13,
"regular": 3.83
},
{
"start_date": "2018-07-05T07:41:09.779",
"end_date": "2018-07-05T07:44:49.878",
"active": false,
"station_id": "V01",
"zipcode": "94402",
"premium": 3.99,
"regular": 3.5
},
{
"start_date": "2018-07-05T07:44:49.878",
"end_date": null,
"active": true,
"station_id": "C01",
"zipcode": "94402",
"premium": 4.3,
"regular": 3.9
},
{
"start_date": "2018-07-05T07:10:39.418",
"end_date": "2018-07-05T07:41:09.779",
"active": false,
"station_id": "V01",
"zipcode": "94402",
"premium": 3.05,
"regular": 2.75
},
{
"start_date": "2018-07-05T07:12:33.088",
"end_date": "2018-07-05T07:41:09.779",
"active": false,
"station_id": "U01",
"zipcode": "94402",
"premium": 4.25,
"regular": 3.98
},
{
"start_date": "2018-07-05T07:44:49.878",
"end_date": null,
"active": true,
"station_id": "U01",
"zipcode": "94402",
"premium": 4.33,
"regular": 3.98
},
{
"start_date": "2018-07-05T07:10:39.418",
"end_date": "2018-07-05T07:44:49.878",
"active": false,
"station_id": "C01",
"zipcode": "94402",
"premium": 3.45,
"regular": 3.25
},
{
"start_date": "2018-07-05T07:41:09.779",
"end_date": "2018-07-05T07:44:49.878",
"active": false,
"station_id": "U01",
"zipcode": "94402",
"premium": 4,
"regular": 3.71
},
{
"start_date": "2018-07-05T07:44:49.878",
"end_date": null,
"active": true,
"station_id": "V01",
"zipcode": "94402",
"premium": 4.12,
"regular": 3.78
}
] |
| Iteration 4 | Truncate DB Table | Clears the contents of the gas_dim_test DB table. |
|
|