PySpark

On this Page

Snap type:

Write

Description:

This Snap executes a PySpark script. It formats and executes a 'spark-submit' command in a command line interface, and then monitors the execution status. If the script executes successfully with an exit code 0, the Snap produces output documents with the status. If the script is coded to produce a standard output, it is also included in the output document. If the script fails (with an exit code other than 0), the Snap produces an error document in the error view. For more details on the 'spark-submit' command, refer to the Apache Spark document.

Input & Output

  • InputThis Snap can have an upstream Snap that can pass a document output view such as Mapper or JSON Generator. If the upstream Snap is connected, this Snap executes once per each input document and produces a document in the output view or an error document in the error view. Each input document is used to evaluate expression properties in the Snap.

  • Output: This Snap can have an downstream Snap that has a document input view such as Mapper or JSON Formatter.

Modes

Prerequisites:

The Snap must be executed in a Groundplex on a Spark cluster node or an edge node.

Limitations and Known Issues:

None at this time.

Configurations:

Account & Access

No account is required.

Views

InputThis Snap has at most one document input view.
OutputThis Snap has at most one document output view and produces one output document for each execution of the PySpark script.
ErrorThis Snap has at most one document error view and produces zero or more documents in the view. The error document contains fields for error, reason, resolution, stacktrace, and spark-submit commands. The reason field contains the exit code. The spark-submit command is formatted by the Snap based on the values of the Snap properties. The error document may also contain standard and error outputs if available.
Troubleshooting:The Snap produces an error document if a given PySpark script fails to execute. It may be helpful in troubleshooting to execute the script in a command line interface of a Goundplex where the Snap is executed.

Settings

Label

Required. The name for the Snap. You can modify this to be more specific, especially if you have more than one of the same Snaps in your pipeline.

Spark home

The Spark home directory where spark-submit command is located under the bin/ subdirectory. If this property is empty, the Snap tries to find a value for "SPARK_HOME" or "CDH_SPARK_HOME" in the environment variables or system properties.

Example: /opt/cloudera/parcels/CDH-5.8.4-1.cdh5.8.4.p0.5/lib/spark

Default value: [None]

Spark submit args

The arguments for the spark-submit command if any.

Example:   $sparkSubmitArgs

                   _sparkSubmitArgs

                   --master yarn --deploy-mode cluster (to submit the PySpark script to YARN)

                   --principal snaplogic/admin@cloudev.snaplogic.com --keytab /snaplogic.keytab.new (to submit the PySpark script to Kerberos-enabled cluster)

Default value: [None]

Edit PySpark script

RequiredThis property enables you to edit a PySpark script. A 'word-count' sample script is included with the Snap. Click to open an editor and save. To try the sample script, enter a file path to an input text file in the Script args property. In the script editor, a script can be exported, imported, or a template can be generated as required.

Default value: 'wordcount' sample script

Script args

The arguments for the PySpark script.

Example: hdfs:///tmp/sample.txt  hdfs:///tmp/output.dir/ (input file and output directory for 'wordcount' sample script)

Default value: [None]

YARN RM (host:port)

The hostname and port number of the Yarn Resource Manager in the 'host:port' format. This property is needed to be able to stop a PySpark job in progress. 

Default value: [None]


If YARN is not used to submit PySpark script, stopping the Snap will not stop the job submitted to Spark.

Timeout (sec)

Timeout limit in seconds. If negative or empty, the Snap will not time out until spark-submit returns the result.

Example:   600 (10 minutes)

Default value: -1

Snap execution

Select one of the three modes in which the Snap executes. Available options are:
  • Validate & Execute: Performs limited execution of the Snap, and generates a data preview during Pipeline validation. Subsequently, performs full execution of the Snap (unlimited records) during Pipeline runtime.
  • Execute only: Performs full execution of the Snap during Pipeline execution without generating preview data.
  • Disabled: Disables the Snap and all Snaps that are downstream from it.

Temporary Files

During execution, data processing on Snaplex nodes occurs principally in-memory as streaming and is unencrypted. When larger datasets are processed that exceeds the available compute memory, the Snap writes Pipeline data to local storage as unencrypted to optimize the performance. These temporary files are deleted when the Snap/Pipeline execution completes. You can configure the temporary data's location in the Global properties table of the Snaplex's node properties, which can also help avoid Pipeline errors due to the unavailability of space. For more information, see Temporary Folder in Configuration Options

Examples


Basic Use Case

Below is a sample pipeline to demonstrate how the PySpark Snap executes the PySpark script:


The PySpark Snap is configured as:

 

The following PySpark Script is executed:


 The PySpark script for easy reference:
### PySpark Word Count script template ###
################################################################
# If you are not familiar with Spark Programming,              #
# please check Spark programming guide for more detail.        #
# http://spark.apache.org/docs/latest/programming-guide.html   #
################################################################

import sys
from operator import add
from pyspark.sql import SQLContext
from pyspark import SparkContext

if __name__ == "__main__":
    # check if the number of arguments is 2
    if len(sys.argv) < 3:
        print >> sys.stderr, "Usage: wordcount <input file> <output file>"
        exit(-1)
    # get contexts
    sc = SparkContext()
    sqlContext = SQLContext(sc)
    # do word-count
    lines = sc.textFile(sys.argv[1], 1)
    counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)
    output = counts.collect()
    # define column names for CSV output
    df = sqlContext.createDataFrame(counts)
    oldColumns = df.schema.names
    newColumns = ["word", "count"]
    newdf = reduce(lambda df, idx: df.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), df)
    # write wordcount result to output file with overwrite mode
    newdf.write.mode("overwrite").format("com.databricks.spark.csv").option("header", "true").save(sys.argv[2])
    # print wordcount result to standard output
    print "word, count"
    for (word, count) in output:
        print "%s, %i" % (word, count)


A preview of the output upon successful execution of the Snap:


Exported pipeline is available in the Downloads section below.

Typical Configuration

Key configuration of the Snap lies in how the values are passed. Values can be passed to the Snap:

  • Without Expressions:

Values are passed to the Snap directly.

  • With Expressions:
    • Using Pipeline parameters:

Values are passed as pipeline parameters:

 


Downloads

Important steps to successfully reuse Pipelines

  1. Download and import the Pipeline into SnapLogic.
  2. Configure Snap accounts as applicable.
  3. Provide Pipeline parameters as applicable.

  File Modified

File Basic Use Case_PySpark_ Wordcount.slp

Nov 07, 2017 by Mohammed Iqbal

Snap Pack History

 Click to view/expand
ReleaseSnap Pack VersionDateType Updates
February 2024main25112 StableUpdated and certified against the current SnapLogic Platform release.
November 2023main23721 StableUpdated and certified against the current SnapLogic Platform release.

August 2023

main22460

 

Stable

Updated and certified against the current SnapLogic Platform release.

May 2023main21015 StableUpgraded with the latest SnapLogic Platform release.
February 2023main19844 StableUpgraded with the latest SnapLogic Platform release.
November 2022main18944 StableUpgraded with the latest SnapLogic Platform release.
August 2022main17386 StableUpgraded with the latest SnapLogic Platform release.

4.29

main15993

  

Stable

Upgraded with the latest SnapLogic Platform release.

4.28main14627
 
StableUpgraded with the latest SnapLogic Platform release.

4.27

main12833

 

Stable

Upgraded with the latest SnapLogic Platform release.
4.26main11181
 
StableUpgraded with the latest SnapLogic Platform release.
4.25main9554
 
StableUpgraded with the latest SnapLogic Platform release.
4.24main8556
 
StableUpgraded with the latest SnapLogic Platform release.
4.23 Patch423patches7671
 
Latest

Fixes an issue with the PySpark Snap by removing the dependency on the json-path library, thus avoiding a conflict between the external library version and the SnapLogic json-path.jar.

4.23main7430
 
Stable

Upgraded with the latest SnapLogic Platform release.

4.22main6403
 
Latest

Upgraded the Jython engine from version 2.7-b3 (a beta version from 2014) to the current version, 2.7.2 (March, 2020). See the breaking changes note for Script Snap and the deprecated Execute Script Snap in the Limitations and Known Issues section for potential impacts of this upgrade.   

4.21snapsmrc542-Stable

Adds a new Cleanup method in the ScriptHook interface associated with the Script Snap. This method enables the Script Snap to automatically execute a clean-up once the configured script completes executing.

4.20snapsmrc535-StableUpgraded with the latest SnapLogic Platform release.
4.19snaprsmrc528-StableUpgraded with the latest SnapLogic Platform release.
4.18snapsmrc523-Latest

Enhanced the PySpark Snap to work in Kerberized Hadoop clusters with the addition of a Kerberos account.

4.17 Patch ALL7402-Latest

Pushed automatic rebuild of the latest version of each Snap Pack to SnapLogic UAT and Elastic servers.

4.17snapsmrc515-Stable

Added the Snap Execution field to all Standard-mode Snaps. In some Snaps, this field replaces the existing Execute during preview check box.

4.16 snapsmrc508-StableUpgraded with the latest SnapLogic Platform release.
4.15snapsmrc500-StableUpgraded with the latest SnapLogic Platform release.

4.14

snapsmrc490

-StableUpgraded with the latest SnapLogic Platform release.
4.13snapsmrc486-StableUpgraded with the latest SnapLogic Platform release.

4.12

snapsmrc480

-Stable

Updated the PySpark Snap to enable users to stop a PySpark job in progress by pressing the STOP button.

4.11

snapsmrc465

-Stable

Added a new Snap, the PySpark Snap which is supported on the Groundplex on edge node and Hadooplex node.

4.10

snapsmrc414

-StableUpgraded with the latest SnapLogic Platform release.

4.9

snapsmrc405

-StableUpgraded with the latest SnapLogic Platform release.

4.8

snapsmrc398

-StableUpgraded with the latest SnapLogic Platform release.

4.7

snapsmrc382

-StableUpgraded with the latest SnapLogic Platform release.

4.6.0

snapsmrc344

-StableUpgraded with the latest SnapLogic Platform release.

4.4.1

--Latest

Updated default generated template to ensure compatibility with both JDK 7 and 8 JSR-223 Script Engines.

March 2015

--Stable

Script Snap: Generate Template a link has been added within the Edit Script page to provide a template for each of the supported languages.

January 2015

--Stable

Script Snap: Optional Execute during preview property added.

December 20, 2014

--Stable

Documentation enhancement: updated script samples with a sample pipeline

May 2014

--Stable

Script Snap was introduced in this release.

Initial Release (June 2013)

--Stable

Execute Script introduced in this release.