On this Page

Overview

Snap Type:

Read

Description:

This Snap provides the functionality of SCD (slowly changing dimension) Type 2 on a target Redshift table. The Snap executes one SQL lookup request per set of input documents to avoid making a request for every input record. Its output is typically a stream of documents for the Redshift - Bulk Upsert Snap, which updates or inserts rows into the target table. Therefore, this Snap must be connected to the Redshift - Bulk Upsert Snap to accomplish the complete SCD2 functionality.

ETL Transformations and Data Flow

This Snap enables the following ETL operations/flows:

  1. Take the incoming document from the upstream snap and perform a lookup operation in the database, producing one or two documents in the output view.
  2. If a record exists in the database (with the values provided in the input document), generate two output documents; otherwise generates only one.
  3. Feed the output documents to the Redshift Bulk Upsert Snap, which will insert them into the destination table to preserve history.

Input & Output

  • Expected upstream Snaps: Any Snap, such as a Mapper or JSON Parser Snap, whose output contains a map of key-value entries.
  • Expected downstream Snaps: Any Snap, such as the Redshift Bulk Upsert or Structure Snap, that accepts documents containing data organized as key-value pairs.
  • Input Each document in the input view should contain a data map of key-value entries. The input data must contain data in the Natural Key and Cause-historization fields.
  • Output: Each document in the output view contains a data map of key-value entries for all fields of a row in the target Redshift table.

Modes

Prerequisites:
  • Redshift database installed
  • Able to connect to the database from the desired plex nodes
Limitations and Known Issues:
Configurations:

Account & Access

This Snap uses account references created on the Accounts page of SnapLogic Manager to handle access to this endpoint. See Configuring Redshift Accounts for information on setting up this type of account.

Views:

InputThis Snap has exactly one input view and expects documents in the view.
OutputThis Snap allows zero or one output views and produces documents in the view.
ErrorThis Snap has at most one error view and produces zero or more documents in the view.


Settings

Label

Required. The name for the Snap. You can modify this to be more specific, especially if you have more than one of the same Snap in your pipeline.

Schema name



The name of the database schema that contains the table whose data you want to update. Selecting a schema filters the Table name list to only show tables created in the selected schema. If you do not specify a schema in the Schema name field, the Table name field lists out all tables with the name you specify in all schemas in the database.

Default value: [None]

The values can be passed using the pipeline parameters but not the upstream parameter fields.


Table name
The name of the table that contains the data you want to update.

Default value: [None]

The values can be passed using the pipeline parameters but not the upstream parameter fields.


Natural key

Required. Names of fields that identify a unique row in the target table. The identity key cannot be used as the Natural key, since a current row and its historical rows cannot have the same natural key value.

The values can be passed using the pipeline parameters but not the upstream parameter fields.


Example:  id (Each record has to have a unique value.)

Default value:  [None]

Cause-historization fields

Required. Names of fields where any change in values causes the historization of an existing row and the insertion of a new 'current' row.

Example: gold bullion rate

Default value:  [None]

SCD Fields

Required. Enter the field names you want to use as dimension fields–or select them from the suggestion list. The columns in the table specified in the Table name field above are used to populate the suggestion list.

Example:

 Meaning
 
 Field Value
 Current row 
 
 current_row 1
 Historical row 
 
 current_row 0
 Start date
 
 start_date $start_date
 End date end_date
 
 $end_date


Default value:
 

 Meaning
 
 Field Value
 Current row 
 
 [None] 1
 Historical row 
 
 [None] 0
 Start date
 
 [None] Date.now()
 End date [None]
 
 Date.now()
 


Ignore unchanged rows

Specifies whether the Snap must ignore writing unchanged rows from the source table to the target table. If you enable this option, the Snap generates a corresponding document in the target only if the Cause-historization column in the source row is changed. Else, the Snap does not generate any corresponding document in the target. 

Default value: Not selected

Snap Execution


Default Value: Validate & Execute
Example: Execute only

Select an option to specify how the Snap must be executed. Available options are:

  • Validate & Execute: Performs limited execution of the Snap (up to 50 records) during Pipeline validation; performs full execution of the Snap (unlimited records) during Pipeline execution.

  • Execute only: Performs full execution of the Snap during Pipeline execution; does not execute the Snap during Pipeline validation.

  • Disabled: Disables the Snap and, by extension, its downstream Snaps.

Troubleshooting


ErrorReasonResolution

type "e" does not exist

This issue occurs due to incompatibilities with the recent upgrade in the Postgres JDBC drivers.

Download the latest 4.1 Amazon Redshift driver here and use this driver in your Redshift Account configuration and retry running the Pipeline.

Example


Typical Snap Configurations

Do it Yourself

Task Overview

This example enables us to record and store changing gas prices and see how they change over time. To do so, we need to:

  1. Insert a table into the Redshift database and gas price data.
  2. Add new rows of gas price data as they come in and check to see how existing values are historized.

Download and Import Sample Pipelines

Before we start, you need to download, import, and configure the pipelines associated with this example:

  1. Download the Redshift_Iteration_Initialization_Example ZIP file. It contains two pipelines:
    • Redshift_Iteration_Initialization_Example: Creates a DB table with initial data in Redshift. Once this is done, it upserts data into the DB.
    • Redshift_Select_Write: Retrieves the latest data from Redshift and writes it to a file, enabling us to see how the data in Redshift changes with every upsert iteration.
  2. Import the pipelines into SnapLogic and configure the Redshift Snaps with your Redshift account.

Pipeline Properties

The Redshift_Iteration_Initialization_Example pipeline creates a Gas Dimensions (gas_dim_test) table and inserts new gas price values into it. To do so, it needs to initialize the DB using one branch of the pipeline, and then iteratively upsert new data using other branches. To control which branch of the pipeline gets triggered, two parameters must be declared at the pipeline level:

  • INITIALIZE: Indicates whether the task being performed is one of initialization.
  • SCD2_ITERATION: Indicates whether the task being performed is an iterative upsert.

Later, in the pipeline, we shall use these parameters to create trigger configurations to control when each child pipeline is executed.

Understanding the Sample Pipelines

This section describes the two pipelines used in this example and explains how they work together.

The Redshift_Iteration_Initialization_Example Pipeline

The Redshift_Iteration_Initialization_Example pipeline comprises three pipelines:

  1. Create DB Table and Upload Initial Data: Creates the DB table and uploads three rows of data.
  2. Upload Updated Data: Uploads new data associated with the rows created by Pipeline 1 above.
  3. Truncate DB Table: Clears the contents of the DB table, resetting the pipeline for reuse from scratch.
Create DB Table and Upload Initial Data

This pipeline contains the following Snaps:

Snap NameSnapPurposeSettingsComments
Initialization DataJSON Generator SnapContains the initial data for the Gas Dimensions DB table.

Contains the initial upload data:


 
[
    {
     "premium": 4.13,
     "regular": 3.83,
     "station_id": "U01",
     "zipcode": "94402"
    },
    {
     "premium": 3.05,
     "regular": 2.75,
     "station_id": "V01",
     "zipcode": "94402"
    },
    {
     "premium": 3.45,
     "regular": 3.25,
     "station_id": "C01",
     "zipcode": "94402"
    }
]




The data is organized into four columns:

  • premium: Lists the price of premium fuel.
  • regular: Lists the price of regular fuel.
  • station_id: Lists the identifier of the gas station from where the data was collected.
  • zipcode: Lists the zip code in which the gas station operates.

The important thing to note here is that, while gas rates may change, the data in the station_id and zipcode fields will not. Thus, these can be used as key fields later in the pipeline.

Check InitializeFilter SnapChecks whether the DB table can be created.

Contains the following configuration in the Filter field:


 
parseInt(_INITITALIZE) > 0


This Snap checks whether the value of the INITIALIZE pipeline parameter is greater than 0. If it is, then the Snap executes the Snaps that follow it; else, it exits.

Thus, when we need to create the gas_dim table, we set the INITIALIZE parameter to 1. Once the pipeline completes execution, we change the value of the Initialize parameter to 0. This ensures that the table is not re-created.

Map Attributes to RedshiftMapper SnapMaps attributes in the input data to column headers that must be created in the Redshift DB.

Contains the following mappings:

  • Date.now(): $start_date: Instructs the DB to take the current date-time as the time from which the uploaded data is active.
  • null: $end_date: Sets end_date to 'null'.
  • true: $active: Maps the value 'true' to indicate that a record is active.
  • $station_id: $station_id: Maps station_id in the input data to station_id in the Redshift DB table.
  • $zipcode: $zipcode: Maps zipcode in the input data to zipcode in the Redshift DB table.
  • $premium: $premium: Maps premium in the input data to premium in the Redshift DB table.
  • $regular: $regular: Maps regular in the input data to regular in the Redshift DB table.

Create Table in Redshift with Input DataRedshift - InsertCreates a DB table in Redshift.

Contains the following settings:

  • Schema Name: "public"
  • Table Name: "gas_dim_test"

This creates a table named "gas_dim_test" in the "public" schema in Redshift with the data in the Initialization Data Snap.

Upload Updated Data

This pipeline updates the data in the gas_dim_test DB table in three iterations. This pipeline contains the following Snaps:

Snap NameSnapPurposeSettingsComments

Iteration 1

SCD2 Data (Iteration 1)JSON GeneratorContains the updated data for the gas_dim_test DB table.

Contains the following data:


[
    {
     "premium": 4.25,
     "regular": 3.98,
     "station_id": "U01",
     "zipcode": "94402"
    }
]




Iteration 1 updates the data associated with station_id U01.
Check Iteration 1Filter SnapChecks whether the first data update iteration should be executed.

Contains the following setting in the Filter expression field:

 
parseInt(_SCD2_ITERATION) == 1


If, in the pipeline properties, the value of the SCD2_ITERATION parameter is 1, and the value of the INITIALIZE parameter is 0, then this pipeline is executed.
Check Not InitializeFilter Snap

Contains the following setting in the Filter expression field:

 
parseInt(_INITITALIZE) == 0


Iteration 2
SCD2 Data (Iteration 2)JSON GeneratorContains the updated data for the gas_dim_test DB table.

Contains the following data:


[
    {
     "premium": 4.00,
     "regular": 3.71,
     "station_id": "U01",
     "zipcode": "94402"
    },
    {
     "premium": 3.99,
     "regular": 3.50,
     "station_id": "V01",
     "zipcode": "94402"
    }    
]




Iteration 2 updates the data associated with station_id U01 and V01.
Check Iteration 2Filter SnapChecks whether the second data update iteration should be executed.

Contains the following setting in the Filter expression field:

 
parseInt(_SCD2_ITERATION) == 2


If, in the pipeline properties, the value of the SCD2_ITERATION parameter is 2, and the value of the INITIALIZE parameter is 0, then this pipeline is executed.
Check Not InitializeFilter Snap

Contains the following setting in the Filter expression field:

 
parseInt(_INITITALIZE) == 0


Iteration 3
SCD2 Data (Iteration 3)JSON GeneratorContains the updated data for the gas_dim_test DB table.

Contains the following data:


[
    {
     "premium": 4.33,
     "regular": 3.98,
     "station_id": "U01",
     "zipcode": "94402"
    },
    {
     "premium": 4.12,
     "regular": 3.78,
     "station_id": "V01",
     "zipcode": "94402"
    },
    {
     "premium": 4.30,
     "regular": 3.90,
     "station_id": "C01",
     "zipcode": "94402"
    }        
]




Iteration 3 updates the data associated with station_id U01, V01, and C01.
Check Iteration 3Filter SnapChecks whether the third data update iteration should be executed.

Contains the following setting in the Filter expression field:

 
parseInt(_SCD2_ITERATION) == 3


If, in the pipeline properties, the value of the SCD2_ITERATION parameter is 2, and the value of the INITIALIZE parameter is 0, then this pipeline is executed.
Check Not InitializeFilter Snap

Contains the following setting in the Filter expression field:

 
parseInt(_INITITALIZE) == 0


Processing the Output of the Three Iterations
UnionUnion SnapTakes the output from the iteration triggered upline and passes it on to the downstream Snaps.

Prepare Upload Data
Redshift - SCD2
Prepares the data collected by the upstream Union Snap.

Contains settings that ensure that the correct DB table is updated, and that the appropriate fields are read the same way as they were created in the pipeline above.

Click the screenshot below to review the settings in detail.

These settings basically instruct Redshift to read the public.gas_dim table as follows:

  • Natural Key: The values in the following fields will never change: station_id, zipcode.
  • Cause-historization fields: When these values change, create a new row containing the new values. Mark the existing value as old.
  • SCD fields
    • All rows containing current data must contain the value 'true' in the 'active' field.
    • Similarly, all fields containing historical data must contain the value 'false' in the 'active' field.
    • The start date of the current row must always be the current date-time.
    • Assign the end date to the current data when updated data is received.


Upsert Updated Data
Redshift - Bulk Upsert
Upserts the data prepared by the Redshift - SCD2 Snap to the gas_dim_test DB table.

Contains the following settings:

Key columns: These are the columns used to check for existing entries in the target table:
  • station_id
  • zipcode

For the Bulk Upsert Snap to work, at least one Natural key must be included in the Key columns field; else the upsert will fail.


Once this Snap completes execution, the updated data is populated into the DB table, the 'active' column in the row containing the old data is updated to now read 'false', and a new row containing the updated data is added, where the 'active' column reads 'true'.

Truncate DB Table

Once the simuation is run, and you have seen how the Redshift - SCD 2 and Redshift - Upsert Snaps work together to update and historize DB data as new data arrives, you may want to clear the concerned table to ensure that, when the simulation is run again, an empty table is available.

This pipeline contains the following Snaps:

  • Check Iteration 4: Checks whether the fourth iteration should be executed. If the check succeeds, the downstream Snaps are executed; else, the pipeline exits.
  • Check Not Initialize: Checks whether the INITIALIZE parameter in the pipeline properties is set to '0'. If the check succeeds, the downstream Snaps are executed; else, the pipeline exits.
  • Redshift - Execute: Runs the following command on the gas_dim_test table to clear all its contents:

     
    truncate table public.gas_dim_test


The Redshift_Select_Write Pipeline

Run this pipeline after each iteration to view the updated output.

This pipeline contains the following Snaps:

  • Redshift - Select: Reads the contents of the public.gas_dim_test DB table.
  • JSON Formatter: Formats the data received from the Redshift - Select Snap as a JSON file.
  • File Writer: Saves the JSON data as RS_File.json in the SLDB.

Changes Made to the Public.gas_dim DB Table with Each Iteration

As the values entered in the SCD2_ITERATION and INITIALIZE pipeline parameters change, and as the pipelines are run iteratively, the public.gas_dim_test DB table is created and iteratively updated. This section lists out the contents of the table at the end of each iteration.

EventTriggered PipelineWhat Happens HereOutput
InitializationCreate DB Table and Upload Initial Data

The table is created, and data from the Initialization Data Snap is uploaded into it.

No. of rows of data: 3


[
    {
        "start_date": "2018-07-05T07:10:39.418",
        "end_date": null,
        "active": true,
        "station_id": "U01",
        "zipcode": "94402",
        "premium": 4.13,
        "regular": 3.83
    },
    {
        "start_date": "2018-07-05T07:10:39.418",
        "end_date": null,
        "active": true,
        "station_id": "C01",
        "zipcode": "94402",
        "premium": 3.45,
        "regular": 3.25
    },
    {
        "start_date": "2018-07-05T07:10:39.418",
        "end_date": null,
        "active": true,
        "station_id": "V01",
        "zipcode": "94402",
        "premium": 3.05,
        "regular": 2.75
    }
]


Iteration 1Upload Updated Data

New data corresponding to the U01 station ID is uploaded, the existing row of data is end-dated, and the active status of the existing data is changed to false. The active status of the new data is saved as true.

No. of rows of data: 4

  • 3 current
  • 1 historical (for U01)


[
    {
        "start_date": "2018-07-05T07:10:39.418",
        "end_date": null,
        "active": true,
        "station_id": "C01",
        "zipcode": "94402",
        "premium": 3.45,
        "regular": 3.25
    },
    {
        "start_date": "2018-07-05T07:10:39.418",
        "end_date": "2018-07-05T07:12:33.087",
        "active": false,
        "station_id": "U01",
        "zipcode": "94402",
        "premium": 4.13,
        "regular": 3.83
    },
    {
        "start_date": "2018-07-05T07:10:39.418",
        "end_date": null,
        "active": true,
        "station_id": "V01",
        "zipcode": "94402",
        "premium": 3.05,
        "regular": 2.75
    },
    {
        "start_date": "2018-07-05T07:12:33.088",
        "end_date": null,
        "active": true,
        "station_id": "U01",
        "zipcode": "94402",
        "premium": 4.25,
        "regular": 3.98
    }
]


Iteration 2Upload Updated Data

New data corresponding to the U01 and V01 station IDs is uploaded, the existing rows of data are end-dated, and the active status of the existing data is changed to false. The active status of the new data is saved as true.

No. of rows of data: 6

  • 3 current
  • 3 historical
    • 2 historical rows of station ID U01 data
    • 1 historical row of station ID V01 data


[
    {
        "start_date": "2018-07-05T07:10:39.418",
        "end_date": "2018-07-05T07:12:33.087",
        "active": false,
        "station_id": "U01",
        "zipcode": "94402",
        "premium": 4.13,
        "regular": 3.83
    },
    {
        "start_date": "2018-07-05T07:41:09.779",
        "end_date": null,
        "active": true,
        "station_id": "V01",
        "zipcode": "94402",
        "premium": 3.99,
        "regular": 3.5
    },
    {
        "start_date": "2018-07-05T07:10:39.418",
        "end_date": "2018-07-05T07:41:09.779",
        "active": false,
        "station_id": "V01",
        "zipcode": "94402",
        "premium": 3.05,
        "regular": 2.75
    },
    {
        "start_date": "2018-07-05T07:12:33.088",
        "end_date": "2018-07-05T07:41:09.779",
        "active": false,
        "station_id": "U01",
        "zipcode": "94402",
        "premium": 4.25,
        "regular": 3.98
    },
    {
        "start_date": "2018-07-05T07:10:39.418",
        "end_date": null,
        "active": true,
        "station_id": "C01",
        "zipcode": "94402",
        "premium": 3.45,
        "regular": 3.25
    },
    {
        "start_date": "2018-07-05T07:41:09.779",
        "end_date": null,
        "active": true,
        "station_id": "U01",
        "zipcode": "94402",
        "premium": 4,
        "regular": 3.71
    }
]


Iteration 3Upload Updated Data

New data corresponding to the C01 station IDs is uploaded, and the active status of the existing data is changed to false.

New data corresponding to the U01 and V01 station IDs is uploaded, the existing rows of data are end-dated, and the active status of the existing data is changed to false. The active status of the new data is saved as true.

No. of rows of data: 9

  • 3 current
  • 6 historical
    • 3 historical rows of station ID U01 data
    • 2 historical row of station ID V01 data
    • 1 historical row of station ID C01 data


[
    {
        "start_date": "2018-07-05T07:10:39.418",
        "end_date": "2018-07-05T07:12:33.087",
        "active": false,
        "station_id": "U01",
        "zipcode": "94402",
        "premium": 4.13,
        "regular": 3.83
    },
    {
        "start_date": "2018-07-05T07:41:09.779",
        "end_date": "2018-07-05T07:44:49.878",
        "active": false,
        "station_id": "V01",
        "zipcode": "94402",
        "premium": 3.99,
        "regular": 3.5
    },
    {
        "start_date": "2018-07-05T07:44:49.878",
        "end_date": null,
        "active": true,
        "station_id": "C01",
        "zipcode": "94402",
        "premium": 4.3,
        "regular": 3.9
    },
    {
        "start_date": "2018-07-05T07:10:39.418",
        "end_date": "2018-07-05T07:41:09.779",
        "active": false,
        "station_id": "V01",
        "zipcode": "94402",
        "premium": 3.05,
        "regular": 2.75
    },
    {
        "start_date": "2018-07-05T07:12:33.088",
        "end_date": "2018-07-05T07:41:09.779",
        "active": false,
        "station_id": "U01",
        "zipcode": "94402",
        "premium": 4.25,
        "regular": 3.98
    },
    {
        "start_date": "2018-07-05T07:44:49.878",
        "end_date": null,
        "active": true,
        "station_id": "U01",
        "zipcode": "94402",
        "premium": 4.33,
        "regular": 3.98
    },
    {
        "start_date": "2018-07-05T07:10:39.418",
        "end_date": "2018-07-05T07:44:49.878",
        "active": false,
        "station_id": "C01",
        "zipcode": "94402",
        "premium": 3.45,
        "regular": 3.25
    },
    {
        "start_date": "2018-07-05T07:41:09.779",
        "end_date": "2018-07-05T07:44:49.878",
        "active": false,
        "station_id": "U01",
        "zipcode": "94402",
        "premium": 4,
        "regular": 3.71
    },
    {
        "start_date": "2018-07-05T07:44:49.878",
        "end_date": null,
        "active": true,
        "station_id": "V01",
        "zipcode": "94402",
        "premium": 4.12,
        "regular": 3.78
    }
]


Iteration 4Truncate DB TableClears the contents of the gas_dim_test DB table.


 
[]




Downloads