Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

On this Page

...

Customer churn is a big problem for service providers because losing customers results in losing revenue and could indicate service deficiencies. There are many reasons why customers decide to leave services. With data analytics and machine learning, we can identify the important factors of churning, create a retention plan, and predict which customers are likely to churn.

The live demo is available at our Machine Learning Showcase.

...

  1. Profiling: Use Profile Snap from ML Analytics Snap Pack to compute statistics of this dataset.
  2. Data Preparation: Perform data preparation on this dataset using Snaps from ML Data Preparation Snap Pack.
  3. Cross Validation: Use Cross Validator (Classification) Snap from ML Core Snap Pack to perform 10-fold cross validation on various Machine Learning algorithms. The result lets us know the accuracy of each algorithm in the churn prediction.
  4. Model Building: Use Trainer (Classification) Snap from ML Core Snap Pack to build the logistic regression model based on this dataset; then serialize and store.
  5. Model Hosting: Use Predictor (Classification) Snap from ML Core Snap Pack to host the model and build the API using Ultra Task.
  6. API Testing: Use REST Post Snap to send a sample request to the Ultra Task to make sure the API is working as expected.
  7. Visualization API: Use Remote Python Script Snap from ML Core Snap Pack to host API that can provide data visualization of the selected field in the dataset.

We are going to build 8 pipelines: one pipeline for each objective except for Cross Validation where we have 2 pipelines to automate the process of trying multiple algorithms. Each of these pipelines is described in the Pipelines section below.

Pipelines

Profiling

The File Reader Snap reads the dataset which is in CSV format. Then, the CSV Parser Snap parses the dataset into documents. In SnapLogic, there are two types of input/output view. The diamond shape is the binary view while the circle shape is the document view. Parser Snaps convert binary into document while Formatter Snaps convert document to binary.

...

The File Reader Snap reads the dataset from Data Preparation pipeline which is in JSON Format. Then, the JSON Parser Snap converts binary data into documents. We do not need the Type Converter Snap since JSON file contains the data type. Then, the Trainer (Classification) Snap trains the model using logistic regression algorithm. The model consists of two parts: metadata describing the schema (field names and types) of the dataset, and the actual model. Both metadata and model are serialized. If the Readable property in the Trainer (Classification) Snap is selected, the readable model will be included in the output. Finally, the model is written as a JSON file to SLFS using JSON Formatter and File Writer Snaps.

Model Hosting

This pipeline is scheduled as an Ultra Task to provide a REST API that is accessible by external applications. The File Reader and JSON Parser Snaps read the model built by the previous pipeline. The Predictor (Classification) Snap takes the data from API requests and model from the JSON Parser Snap to give predictions. The Filter Snap authenticates the requests by checking the token that can be changed in the pipeline parameters. The Extract Params Snap (Mapper) extracts the required fields from the request. The Prepare Response Snap (Mapper) maps from prediction to $content.pred which will be the response body. This Snap also adds headers to allow Cross-Origin Resource Sharing (CORS).

Image ModifiedImage Modified

The following image on the left shows the expressions in the Extract Params Snap that extract required fields from the request. The image on the right shows the configuration of the Prepare Response Snap.

Building API

To deploy this pipeline as a REST API, click the calendar icon in the toolbar. Either Triggered Task or Ultra Task can be used.

Triggered Task is good for batch processing since it starts a new pipeline instance for each request. Ultra Task is good to provide REST API to external applications that require low latency. In this case, Ultra Task is preferable. Bearer token is not needed here since the Filter Snap will perform authentication inside the pipeline.

In order to get the URL, click Show tasks in this project in Manager in the Create Task window. Click the small triangle next to the task then Details. The task detail will show up with the URL.

API Testing

In this pipeline, a sample request is generated by the JSON Generator. The request is sent to the Ultra Task by the REST Post Snap. The Mapper Snap extracts response which is in $response.entity.

Below is the content of the JSON Generator Snap. It contains $token and $params which will be included in the request body sent by the REST Post Snap.

The REST Post Snap sends the request to the Ultra Task. Your URL can be found in the Manager page. In some cases, it is required to check Trust all certificates in the REST Post Snap.

The output of the REST Post Snap is shown below. The Mapper Snap extracts $response.entity from the response. In this case, the model predicts that this customer is likely to churn with the probability of 63.6%.

Visualization API

This pipeline reads the dataset using File Reader and JSON Parser Snaps. The Group By N Snap packs all documents (rows) in the dataset into one big document. Then, the dataset is buffered in the Remote Python Script Snap. When the request comes in, the Remote Python Script Snap draws a visualization and outputs as HTML which can be rendered in the UI. This pipeline is scheduled as an Ultra Task to provide an API to external applications. The instruction is the same as described in the previous section.

Below is the configuration of the Extract Params Snap. It extracts information from the request required by the Remote Python Script Snap to draw a visualization.

Python Script

Following is the script from the Remote Python Script Snap used in this pipeline. The script has the following 3 important functions:

...

The first function (snaplogic_init) is executed before consuming the input data. The second function (snaplogic_process) is called on each of the incoming documents. The last function (snaplogic_final) is processed after all the incoming documents are consumed by snaplogic_process.

First, we use SLTool.ensure to automatically install the required libraries. SLTool class contains useful methods: ensure, execute, encode, decode, etc. In this case, we need bokeh which is the library we use to draw a visualization.

In snaplogic_process, the requests are buffered until the dataset is loaded. The dataset from the Group By N Snap is in row["group"]. Once the dataset is loaded, the requests can be processed. In this case, numeric and categorical fields will result in different types of visualization.

Paste code macro
languagepython
from snaplogic.tool import SLTool as slt

slt.ensure("bokeh", "0.13.0")

# Imports
import os
from collections import Counter
import pandas as pd
from bokeh.io import output_file, save
from bokeh.models import ColumnDataSource, FactorRange
from bokeh.transform import factor_cmap
from bokeh.plotting import figure

# Global Variables
df = None
request_queue = []
tmp_path = os.path.join(tmp_root, "viz.html")
output_file(tmp_path)


# This function will be executed once before consuming the data.
def snaplogic_init():
    return None


# This function will be executed on each document from the upstream snap.
def snaplogic_process(row):
    global df
    output_list = []

    if "group" in row:
        # Load dataset and convert into DataFrame.
        df = pd.DataFrame(row["group"])
        # Replace 1 and 0 in $SeniorCitizen field with "Yes" and "No".
        df["SeniorCitizen"] = df["SeniorCitizen"].replace("1", "Yes")
        df["SeniorCitizen"] = df["SeniorCitizen"].replace("0", "No")
    else:
        request_queue.append(row)

    if df is not None:
        while len(request_queue) > 0:
            try:
                request = request_queue.pop(0)
                original_field = request["original_field"]
                display_field = request["display_field"]
                field_type = request["type"]
                options = request["options"]

                # Plot numeric field.
                if field_type == "number":
                    p = figure(x_axis_label="Churn", y_axis_label=display_field,
                               x_range=df["Churn"].unique(), plot_width=600, plot_height=580)
                    p.vbar(df["Churn"],
                           top=df[original_field].add([df[original_field].max() / 100] * len(df)),
                           bottom=df[original_field],
                           width=0.7, line_color=None, fill_alpha=0.008)
                    p.y_range.start = options[0]
                    save(p)
                    with open(tmp_path, "r") as tmp_file:
                        viz_html = tmp_file.read()
                    output_list.append({"viz": viz_html})

                # Plot categorical field.
                elif field_type == "dropdown":
                    churn_unique = df["Churn"].unique()
                    col_unique = df[original_field].unique()
                    counter = Counter()
                    for i in range(0, len(df)):
                        counter[(df["Churn"][i], df[original_field][i])] += 1
                    palette = ["#c9d9d3", "#718dbf", "#e84d60"]
                    x = [(churn_value, col_value) for churn_value in churn_unique for col_value in col_unique]
                    source = ColumnDataSource(data=dict(x=x, counts=[counter[point] for point in x]))
                    p = figure(x_axis_label="Churn", y_axis_label="Count", plot_width=600, plot_height=580,
                               x_range=FactorRange(*x))
                    p.vbar(x="x", top="counts", width=0.9, source=source, line_color=None,
                           fill_color=factor_cmap("x", palette=palette, factors=col_unique, start=1, end=2))
                    p.y_range.start = 0
                    p.x_range.range_padding = 0.1
                    p.xaxis.major_label_orientation = 1
                    save(p)
                    with open(tmp_path, "r") as tmp_file:
                        viz_html = tmp_file.read()
                    output_list.append({"viz": viz_html})

                # Ignore invalid field type.
                else:
                    output_list.append(slt.get_drop_doc())

            except:
                output_list.append({"viz": "The request is not valid."})

    if "group" in row:
        # Do not output if the input document is a dataset. This is needed in order to preserve lineage property.
        output_list.append(slt.get_drop_doc())

    return output_list


# This function will be executed after consuming all documents from the upstream snap.
def snaplogic_final():
    return None

Downloads

...


Attachments
patterns*.slp, *.zip