Problem Scenario
Taxonomy is the science of classifying organisms including plants, animals, and microorganisms. In In September 1936, R. A. Fisher published a paper named named "The Use of Multiple Measurements in Taxonomic Problems". In this In this paper, four measurements (sepal length, sepal width, petal length, and petal width) of 150 flowers are included. There are 50 samples of each type of Iris flowers: Iris setosa, Iris versicolor, and Iris virginica. The author demonstrated that it is possible to find good enough linear functions of measurements that can be applied to distinguish types of Iris flowers.
Description
Almost 100 years have passed, Iris dataset is now one of the best-known datasets for people who study Machine Learning and data science. This dataset is a multi-class classification setting with four numeric features. The screenshot below shows a preview of this dataset, there are three types of Iris flowers: setosa, versicolor, and virginica. The numbers indicating the size of sepal and petal are in centimetrescentimeters. You can find more details about this dataset dataset here. If you are familiar with Python, you can also get this dataset from Sci-Kit Learn library as described here. We will use Logistic Regression algorithm will build simple Neural Networks to tackle this classification problem. We will also build Then, we will host the model and host it as an API inside the SnapLogic platform.
Objectives
- Model Building: Use Remote Python Script Snap from ML Core Snap Pack to Pack to deploy Python python script to train neural networks model on Iris flower dataset.
- Model Testing: Test the model with few samples.
- Model Hosting: Use Remote Python Script Snap from ML Core Snap Pack to Pack to deploy Python python script to host the model and schedule an Ultra Task to provide API.
- API Testing the API: Build the API as a Task then execute the Task to test the API.
Pipelines
- : Use REST Post Snap to send a sample request to the Ultra Task to make sure the API is working as expected.
We are going to build three 4 pipelines: Model Building, Model Testing, and Model Hosting, and API Testing; and an Ultra Task to accomplish the above objectives. Each of these pipelines is described in the Pipelines section below.
Pipelines
Model Building
In this pipeline, the File Reader Snap reads the training set that contains containing 100 samples. Then, the Remote Python Script Snap trains the model using Neural Networks Algorithmalgorithm. The model consists of two parts: target_encoder describes the mapping between encoded class to actual class Iris flower name; and the model that is serialized. The model is converted into JSON format and saved on SnapLogic File System (SLFS) .using JSON Formatter Snap and File Writer Snap.
Remote Python Script Snap executes python script on Remote Python Executor (RPE). If no account is provided, it will assume RPE at localhost:5301 without token.
Below is the output of the Remote Python Script Snap.
Python Script
Below is a piece of code the script from the Remote Python Script Snap used in this pipeline. There are three 3 main functions: Snaplogicsnaplogic_init, Snaplogicsnaplogic_process, and Snaplogicsnaplogic_final. The first function (Snaplogicsnaplogic_init) will be executed before consuming input data. The second function (Snaplogicsnaplogic_process) will be called on each of the incoming documentdocuments. The last function (Snaplogicsnaplogic_final) will be processed after all incoming documents have been consumed by Snaplogic_process.In Snaplogicsnaplogic_process.
First of all, we get SL class from here. This class contains useful methods: ensure, execute, encode, decode, etc. The script below can be placed at the top to get SL class.
Paste code macro | ||
---|---|---|
| ||
# Get SnapLogic utility class called SL.
import urllib.request
sl_url = "https://snaplogic-ds-public.s3.amazonaws.com/snaplogic-py/sl.py"
exec(urllib.request.urlopen(sl_url).read().decode("utf-8")) |
Then we use SL.ensure to automatically install python libraries required by this script. In this case, we need jsonpickle, scikit-learn, keras, and tensorflow. The tensorflow 1.5.0 does not have optimization so it is recommended for old CPUs.
Paste code macro | ||
---|---|---|
| ||
# Ensure libraries.
SL.ensure("jsonpickle", "1.0")
SL.ensure("scikit-learn", "0.20.0")
SL.ensure("keras", "2.2.4")
SL.ensure("tensorflow", "1.5.0") |
In snaplogic_init, we create a new session. For Snaplogicsnaplogic_process, we simply format the incoming documents, extract features and target from incoming documents, then, store them in lists. Once we have all the data, we build the neural networks model in Snaplogicsnaplogic_final. We start by encoding iris converting the list of features to numpy array and encoding Iris flower names as integers using LabelEncoder from scikit-learn library, then, we do apply one hot encoding. At this point, features and targets are numpy array. Our neural networks model has 1 hidden layer with 16 neurons, we train the model with adam optimizer (epochs=50 and 5, batch_size=10). After training the model, we use SnapLogicUtil.model_to_text to serialize the neural networks model using base64, and, we use SnapLogicUtilSL.encode to serialize the target encoder. Model and target encoder are now in text format are which will be sent to the next Snap along with the training history.
Paste code macro | ||
---|---|---|
| ||
# Imports
import base64
import os
import uuid
import jsonpickle
import jsonpickle.ext.numpy as jsonpickle_numpy
jsonpickle_numpy.register_handlers()
import numpy
import tensorflow
import keras
import sklearn.preprocessing
# Global Variables
features = []
targets = []
# This function will be executed once before consuming the data.
def snaplogic_init():
session = tensorflow.Session()
keras.backend.set_session(session)
return None
# This function will be executed on each document from the upstream snap.
def snaplogic_process(row):
global features
global targets
# Extract features and target from row.
features.append([row["sepal_length"], row["sepal_width"], row["petal_length"], row["petal_width"]])
targets.append(row["class"])
return None
# This function will be executed after consuming all documents from the upstream snap.
def snaplogic_final():
global features
global targets
# Convert features to numpy array.
features = numpy.array(features)
# Encode targets and do one hot encoding.
target_encoder = sklearn.preprocessing.LabelEncoder()
target_encoder.fit(targets)
targets_encoded = target_encoder.transform(targets)
targets_onehot = keras.utils.np_utils.to_categorical(targets_encoded)
# Define model.
model = keras.models.Sequential()
model.add(keras.layers.Dense(16, activation="relu", input_dim=4))
model.add(keras.layers.Dense(3, activation="softmax"))
# Compile model.
model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])
# Train model.
history = model.fit(features, targets_onehot, epochs=50, batch_size=10, verbose=0)
# Encode target encoder.
target_encoder_json = SL.encode(target_encoder)
# Encode model.
model_tmp_path = SL.get_random_path()
model.save(model_tmp_path)
with open(model_tmp_path, "rb") as model_file:
model_bin = model_file.read()
model_base64 = base64.b64encode(model_bin)
os.remove(model_tmp_path)
# Output target encoder and model to output view.
return {"target_encoder": target_encoder_json, "model": model_base64, "model_history": history.history} |
Model Testing
In the bottom flow, File Reader Snap reads the neural networks model from SLFSbuilt in previous pipeline. In the top flow, CSV Generator Snap contains 3 samples. The correct labels are setosa, versicolor, and virginica respectively.
The left picture below shows the content of CSV Generator Snap. The right picture below shows the predictions from Remote Python Script Snap along with the confidence level.
Python Script
The input of the Remote Python Script Snap can be either the neural networks model or a sample. If it is the model, we use SnapLogicUtiluse SL.text_decode to _model to deserialize the model and load it into the memory, we use SnapLogicUtil.decode to deserialize the target encoder. If target encoder and use base64 to decode the model. If the incoming document is a sample, we will add it to the queue. Once the model is loaded, we will apply the model to samples in the queue and output predictions. SnapLogicUtil.predict accepts the model and a sample, it returns the prediction. In In order to preserve lineage property in Ultra Task, SnapLogicUtil SL.get_drop_doc is () is returned for the document describing the model.
Paste code macro | ||
---|---|---|
| ||
# Imports import base64 import os import uuid import jsonpickle import jsonpickle.ext.numpy as jsonpickle_numpy jsonpickle_numpy.register_handlers() import numpy import tensorflow import keras import sklearn.preprocessing # Global Variables model = None request_queue = [] # This function will be executed once before consuming the data. def snaplogic_init(): return None # This function will be executed on each document from the upstream snap. def snaplogic_process(row): global model global request_queue # Create a list of outputs to be sent to the downstream snap. output_list = [] is_model = False # Load model. if "model" in row: is_model = True model = {} model["target_encoder"] = SL.decode(row["target_encoder"]) model_tmp_path = SL.get_random_path() model_bin = base64.b64decode(row["model"]) with open(model_tmp_path, "wb") as model_file: model_file.write(model_bin) model["graph"] = tensorflow.Graph() with model["graph"].as_default(): model["session"] = tensorflow.Session() with model["session"].as_default(): model["model"] = keras.models.load_model(model_tmp_path) model["model"]._make_predict_function() os.remove(model_tmp_path) # Add new request to the queue. else: request_queue.append(row) # If the model is ready, process requests in the queue. if model is not None: while len(request_queue) > 0: try: request = request_queue.pop(0) feature = [request["sepal_length"], request["sepal_width"], request["petal_length"], request["petal_width"]] feature = numpy.array([feature]) with model["graph"].as_default(): with model["session"].as_default(): pred_list = model["model"].predict(feature) pred = pred_list[0].argmax() conf_level = pred_list[0][pred] pred = model["target_encoder"].inverse_transform([pred])[0] output_list.append({"pred": pred, "conf": conf_level}) except: output_list.append({"pred": "The request is not valid."}) if is_model: # Do not output a model document to preserve lineage property. output_list.append(SL.get_drop_doc()) return output_list # This function will be executed after consuming all documents from the upstream snap. def snaplogic_final(): return None |
Model Hosting
This pipeline is scheduled as an Ultra Task to provide a REST API that is accessible by external applications. The core components of this pipeline are File Reader, JSON Parser, Union, and Remote Python Script Snap Snaps that are the same as in the Model Testing pipeline. Instead of taking the data from the CSV Generator Snap, the Remote Python Script Snap takes the data from an API request. The Check Token Filter Snap (Router) is used to authenticate the request by checking the token that can be changed in pipeline parameters. The Extract Params Snap (Mapper) extracts the data required fields from the request. The Body Wrapper Prepare Response Snap (Mapper) maps from prediction to $content.pred and confidence level to $content.conf which will be the response body. Finally, CORS Wrapper Snap (Mapper) This Snap also adds headers to allows allow Cross-Origin Resource Sharing .
This pipeline is created as Ultra Task to provide REST API to external applications. The core Snaps are File Reader, JSON Parser, and Remote Python Script. The rest are for authentication, parameter extraction, and CORS handling.
Testing the API
In order to test the API, we must first build it as a Task and execute this Task.
Building the API
To build an API from this pipeline. Go to (CORS).
Building API
To deploy this pipeline as a REST API. Click the calendar icon in the toolbar. You can either use Either Triggered Task or or Ultra Task can be used.
Triggered Task is good for batch processing since it starts a new pipeline instance for each request. Ultra Task is good to provide REST API to external applications that require low latency. In this case, we use Ultra Task . You do not need to specify the bearer token here since we use the Router Snap to is preferable. Bearer token is not needed here since the Filter Snap will perform authentication inside the pipeline. You can go to Manager by clicking Show
In order to get the URL, click Show tasks in this project in Manager in the Manager in the Create Task window to see task details as shown in the screenshot below.
Testing
After creating an Ultra Task, you can test it. The screenshot shows a sample request and response. Based on the sepal and petal size shown below, the pipeline returns setosa as the first prediction.
window. Click the small triangle next to the task then Details. The task detail will show up with the URL.
API Testing
In this pipeline, a sample request is generated by JSON Generator. The request is sent to the Ultra Task by REST Post Snap. The Mapper Snap is used to extract response which is in $response.entity.
Below is the content of the JSON Generator Snap. It contains $token and $params which will be included in the request body sent by REST Post Snap.
The REST Post Snap gets the URL from the pipeline parameters. Your URL can be found in the Manager page. In some cases, it is required to check Trust all certificates in the REST Post Snap.
The output of REST Post Snap is shown below. The last Mapper Snap is used to extract $response.entity from the request. In this case, the prediction is Iris setosa with the confidence level of 0.88.