Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

...

Problem Scenario

Taxonomy is the science of classifying organisms including plants, animals, and microorganisms. In September 1936, R. A. Fisher published a paper named "The Use of Multiple Measurements in Taxonomic Problems". In this paper, four measurements (sepal length, sepal width, petal length, and petal width) of 150 flowers are included. There are 50 samples of each type of Iris flowers: Iris setosa, Iris versicolor, and Iris virginica. The author demonstrated that it is possible to find good enough linear functions of measurements that can be applied to distinguish types of Iris flowers. 

Description

Almost 100 years have passed, Iris dataset is now one of the best-known datasets for people who study Machine Learning and data science. This dataset is a multi-class classification setting with four numeric features. The screenshot below shows a preview of this dataset, there are three types of Iris flowers: setosa, versicolor, and virginica. The numbers indicating the size of sepal and petal are in centimeters. You can find more details about this dataset here. If you are familiar with Python, you can also get this dataset from Sci-Kit Learn library as described here. We will build simple Neural Networks to tackle this classification problem. Then, we will host the model as an API inside the SnapLogic platform.

Image RemovedImage Added

Objectives

  1. Model Building: Use Remote Python Script Snap from ML Core Snap Pack to deploy python script to train neural networks model on Iris flower dataset.
  2. Model Testing: Test the model with a few samples.
  3. Model Hosting: Use Remote Python Script Snap from ML Core Snap Pack to deploy python script to host the model and schedule an Ultra Task to provide API.
  4. API Testing: Use REST Post Snap to send a sample request to the Ultra Task to make sure the API is working as expected.

We are going to build 4 pipelines: Model Building, Model Testing, Model Hosting, and API Testing; and an Ultra Task to accomplish the above objectives. Each of these pipelines is described in the Pipelines section below.

Pipelines

Model Building

In this pipeline, the File Reader Snap reads the training set containing 100 samples. Then, the Remote Python Script Snap trains the model using Neural Networks algorithm. The model consists of two parts: target_encoder describes the mapping between the encoded class to actual Iris flower name; and the model that is serialized. The model is converted into JSON format and saved on SnapLogic File System (SLFS) using JSON Formatter Snap and File Writer Snap.

Image RemovedImage Added

Remote Python Script Snap executes python script on Remote Python Executor (RPE). If no account is provided, it will assume RPE at localhost:5301 without a token.

Below is the output of the Remote Python Script Snap.

Python Script

Below is the script from the Remote Python Script Snap used in this pipeline. There are 3 main functions: snaplogic_init, snaplogic_process, and snaplogic_final. The first function (snaplogic_init) will be is executed before consuming input data. The second function (snaplogic_process) will be is called on each of the incoming documents. The last function (snaplogic_final) will be processed after all incoming documents have been consumed by snaplogic_process.

...

In snaplogic_init, we create a new session. For snaplogic_process, we extract features and target from incoming documents, then, store them in lists. Once we have all the data, we build the neural networks model in snaplogic_final. We start by converting the list of features to numpy array and encoding Iris flower names as integers using LabelEncoder from scikit-learn library, then, we apply one hot encoding. Our neural networks model has 1 hidden layer with 16 neurons, we . We train the model with adam optimizer (epochs=5, batch_size=10). After training the model, we serialize the model using base64, and, we use SL.encode to serialize the target encoder. Model and target encoder are now in text format which will be sent to the next Snap along with the training history.

Paste code macro
languagepython
# Imports
import base64
import os
import uuid
import jsonpickle
import jsonpickle.ext.numpy as jsonpickle_numpy
jsonpickle_numpy.register_handlers()
import numpy
import tensorflow
import keras
import sklearn.preprocessing

# Global Variables
features = []
targets = []

# This function will be executed once before consuming the data.
def snaplogic_init():
    session = tensorflow.Session()
    keras.backend.set_session(session)
    return None

# This function will be executed on each document from the upstream snap.
def snaplogic_process(row):
    global features
    global targets
    # Extract features and target from row.
    features.append([row["sepal_length"], row["sepal_width"], row["petal_length"], row["petal_width"]])
    targets.append(row["class"])
    return None

# This function will be executed after consuming all documents from the upstream snap.
def snaplogic_final():
    global features
    global targets
    # Convert features to numpy array.
    features = numpy.array(features)
    # Encode targets and do one hot encoding.
    target_encoder = sklearn.preprocessing.LabelEncoder()
    target_encoder.fit(targets)
    targets_encoded = target_encoder.transform(targets)
    targets_onehot = keras.utils.np_utils.to_categorical(targets_encoded)
    # Define model.
    model = keras.models.Sequential()
    model.add(keras.layers.Dense(16, activation="relu", input_dim=4))
    model.add(keras.layers.Dense(3, activation="softmax"))
    # Compile model.
    model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])
    # Train model.
    history = model.fit(features, targets_onehot, epochs=50, batch_size=10, verbose=0)
    # Encode target encoder.
    target_encoder_json = SL.encode(target_encoder)
    # Encode model.
    model_tmp_path = SL.get_random_path()
    model.save(model_tmp_path)
    with open(model_tmp_path, "rb") as model_file:
        model_bin = model_file.read()
    model_base64 = base64.b64encode(model_bin)
    os.remove(model_tmp_path)
    # Output target encoder and model to output view.
    return {"target_encoder": target_encoder_json, "model": model_base64, "model_history": history.history}

Model Testing

In the bottom flow, File Reader Snap reads the neural networks model built in the previous pipeline. In the top flow, CSV Generator Snap contains 3 samples. The correct labels are setosa, versicolor, and virginica, respectively.

The left picture below shows the content of the CSV Generator Snap. The right picture below shows the predictions from Remote Python Script Snap along with the confidence level.

Image RemovedImage RemovedImage AddedImage Added

Python Script

The input of the Remote Python Script Snap can be either the neural networks model or a sample. If it is the model, we use SL.decode to deserialize the target encoder and use base64 to decode the model. If the incoming document is a sample, we will add it to the queue. Once the model is loaded, we apply the model to samples in the queue and output predictions. In order to preserve lineage property in Ultra Task, SL.get_drop_doc() is returned for the document describing the model.

Paste code macro
languagepython
# Imports
import base64
import os
import uuid
import jsonpickle
import jsonpickle.ext.numpy as jsonpickle_numpy
jsonpickle_numpy.register_handlers()
import numpy
import tensorflow
import keras
import sklearn.preprocessing

# Global Variables
model = None
request_queue = []

# This function will be executed once before consuming the data.
def snaplogic_init():
    return None

# This function will be executed on each document from the upstream snap.
def snaplogic_process(row):
    global model
    global request_queue

    # Create a list of outputs to be sent to the downstream snap.
    output_list = []

    is_model = False
    # Load model.
    if "model" in row:
        is_model = True
        model = {}
        model["target_encoder"] = SL.decode(row["target_encoder"])
        model_tmp_path = SL.get_random_path()
        model_bin = base64.b64decode(row["model"])
        with open(model_tmp_path, "wb") as model_file:
            model_file.write(model_bin)
        model["graph"] = tensorflow.Graph()
        with model["graph"].as_default():
            model["session"] = tensorflow.Session()
            with model["session"].as_default():
                model["model"] = keras.models.load_model(model_tmp_path)
                model["model"]._make_predict_function()
        os.remove(model_tmp_path)
    # Add new request to the queue.
    else:
        request_queue.append(row)

    # If the model is ready, process requests in the queue.
    if model is not None:
        while len(request_queue) > 0:
            try:
                request = request_queue.pop(0)
                feature = [request["sepal_length"], request["sepal_width"], request["petal_length"], request["petal_width"]]
                feature = numpy.array([feature])
                with model["graph"].as_default():
                    with model["session"].as_default():
                        pred_list = model["model"].predict(feature)
                        pred = pred_list[0].argmax()
                        conf_level = pred_list[0][pred]
                pred = model["target_encoder"].inverse_transform([pred])[0]
                output_list.append({"pred": pred, "conf": conf_level})
            except:
                output_list.append({"pred": "The request is not valid."})

    if is_model:
        # Do not output a model document to preserve lineage property.
        output_list.append(SL.get_drop_doc())
    
    return output_list

# This function will be executed after consuming all documents from the upstream snap.
def snaplogic_final():
    return None

Model Hosting

This pipeline is scheduled as an Ultra Task to provide a REST API that is accessible by external applications. The core components of this pipeline are File Reader, JSON Parser, Union, and Remote Python Script Snap Snaps that are the same as in the Model Testing pipeline. Instead of taking the data from the CSV Generator, the Remote Python Script Snap takes the data from API request. The Filter Snap is used to authenticate the request by checking the token that can be changed in pipeline parameters. The Extract Params Snap (Mapper) extracts the required fields from the request. The Prepare Response Snap (Mapper) maps from prediction to $content.pred and confidence level to $content.conf which will be the response body. This Snap also adds headers to allow Cross-Origin Resource Sharing (CORS).

Image RemovedImage Added

Image RemovedImage RemovedImage AddedImage Added

Building API

To deploy this pipeline as a REST API. Click the calendar icon in the toolbar. Either Triggered Task or Ultra Task can be used.

Triggered Task is good for batch processing since it starts a new pipeline instance for each request. Ultra Task is good to provide REST API to external applications that require low latency. In this case, the Ultra Task is preferable. Bearer token is not needed here since the Filter Snap will perform authentication inside the pipeline.

In order to get the URL, click Show tasks in this project in Manager in the Create Task window. Click the small triangle next to the task then Details. The task detail will show up with the URL.

API Testing

In this pipeline, a sample request is generated by the JSON Generator. The request is sent to the Ultra Task by REST Post Snap. The Mapper Snap is used to extract response which is in $response.entity.

Image RemovedImage Added

Below is the content of the JSON Generator Snap. It contains $token and $params which will be included in the request body sent by REST Post Snap.

The REST Post Snap gets the URL from the pipeline parameters. Your URL can be found in the Manager page. In some cases, it is required to check Trust all certificates in the REST Post Snap.

Image RemovedImage RemovedImage AddedImage Added

The output of REST Post Snap is shown below. The last Mapper Snap is used to extract $response.entity from the request. In this case, the prediction is Iris setosa with the confidence level of 0.88.