Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

On this Page

Table of Contents
maxLevel2
excludeOlder Versions|Additional Resources|Related Links|Related Information

Snap type:

Write

Description:

This Snap executes a PySpark script. It formats and executes a 'spark-submit' command in a command line interface, and then monitors the execution status. If the script executes successfully with an exit code 0, the Snap produces output documents with the status. If the script is coded to produce a standard output, it is also included in the output document. If the script fails (with an exit code other than 0), the Snap produces an error document in the error view. For more details on the 'spark-submit' command, refer to the Apache Spark document.

Input & Output

  • InputThis Snap can have an upstream Snap that can pass a document output view such as Mapper or JSON Generator. If the upstream Snap is connected, this Snap executes once per each input document and produces a document in the output view or an error document in the error view. Each input document is used to evaluate expression properties in the Snap.

  • Output: This Snap can have an downstream Snap that has a document input view such as Mapper or JSON Formatter.

Modes

Prerequisites:

The Snap must be executed in a Groundplex on a Spark cluster node or an edge node.

Limitations and Known Issues:

None at this time.

Configurations:

Account & Access

No account is required.

Views

InputThis Snap has at most one document input view.
OutputThis Snap has at most one document output view and produces one output document for each execution of the PySpark script.
ErrorThis Snap has at most one document error view and produces zero or more documents in the view. The error document contains fields for error, reason, resolution, stacktrace, and spark-submit commands. The reason field contains the exit code. The spark-submit command is formatted by the Snap based on the values of the Snap properties. The error document may also contain standard and error outputs if available.


Troubleshooting:The Snap produces an error document if a given PySpark script fails to execute. It may be helpful in troubleshooting to execute the script in a command line interface of a Goundplex where the Snap is executed.

Settings

Label

Required. The name for the Snap. You can modify this to be more specific, especially if you have more than one of the same Snaps in your pipeline.

Spark home

The Spark home directory where spark-submit command is located under the bin/ subdirectory. If this property is empty, the Snap tries to find a value for "SPARK_HOME" or "CDH_SPARK_HOME" in the environment variables or system properties.

Example: /opt/cloudera/parcels/CDH-5.8.4-1.cdh5.8.4.p0.5/lib/spark

Default value: [None]

Spark submit args

The arguments for the spark-submit command if any.

Example:   $sparkSubmitArgs

                   _sparkSubmitArgs

                   --master yarn --deploy-mode cluster (to submit the PySpark script to YARN)

                   --principal snaplogic/admin@cloudev.snaplogic.com --keytab /snaplogic.keytab.new (to submit the PySpark script to Kerberos-enabled cluster)

Default value: [None]

Edit PySpark script

RequiredThis property enables you to edit a PySpark script. A 'word-count' sample script is included with the Snap. Click to open an editor and save. To try the sample script, enter a file path to an input text file in the Script args property. In the script editor, a script can be exported, imported, or a template can be generated as required.

Default value: 'wordcount' sample script

Script args

The arguments for the PySpark script.

Example: hdfs:///tmp/sample.txt  hdfs:///tmp/output.dir/ (input file and output directory for 'wordcount' sample script)

Default value: [None]

YARN RM (host:port)


The hostname and port number of the Yarn Resource Manager in the 'host:port' format. This property is needed to be able to stop a PySpark job in progress. 

Default value: [None]


Note

If YARN is not used to submit PySpark script, stopping the Snap will not stop the job submitted to Spark.


Timeout (sec)

Timeout limit in seconds. If negative or empty, the Snap will not time out until spark-submit returns the result.

Example:   600 (10 minutes)

Default value: -1

Multiexcerpt include macro
nameSnap Execution
pageSOAP Execute


Multiexcerpt include macro
nameExecution_Detail_Write
pageSOAP Execute

Multiexcerpt include macro
nameTemporary Files
pageJoin

Examples


Basic Use Case

Below is a sample pipeline to demonstrate how the PySpark Snap executes the PySpark script:


The PySpark Snap is configured as:

 

The following PySpark Script is executed:


Expand
titleThe PySpark script for easy reference:


Code Block
### PySpark Word Count script template ###
################################################################
# If you are not familiar with Spark Programming,              #
# please check Spark programming guide for more detail.        #
# http://spark.apache.org/docs/latest/programming-guide.html   #
################################################################

import sys
from operator import add
from pyspark.sql import SQLContext
from pyspark import SparkContext

if __name__ == "__main__":
    # check if the number of arguments is 2
    if len(sys.argv) < 3:
        print >> sys.stderr, "Usage: wordcount <input file> <output file>"
        exit(-1)
    # get contexts
    sc = SparkContext()
    sqlContext = SQLContext(sc)
    # do word-count
    lines = sc.textFile(sys.argv[1], 1)
    counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)
    output = counts.collect()
    # define column names for CSV output
    df = sqlContext.createDataFrame(counts)
    oldColumns = df.schema.names
    newColumns = ["word", "count"]
    newdf = reduce(lambda df, idx: df.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), df)
    # write wordcount result to output file with overwrite mode
    newdf.write.mode("overwrite").format("com.databricks.spark.csv").option("header", "true").save(sys.argv[2])
    # print wordcount result to standard output
    print "word, count"
    for (word, count) in output:
        print "%s, %i" % (word, count)



A preview of the output upon successful execution of the Snap:


Exported pipeline is available in the Downloads section below.

Typical Configuration

Key configuration of the Snap lies in how the values are passed. Values can be passed to the Snap:

  • Without Expressions:

Values are passed to the Snap directly.

  • With Expressions:
    • Using Pipeline parameters:

Values are passed as pipeline parameters:

 


Downloads

Multiexcerpt include macro
namedownload_instructions
pageOpenAPI

Attachments
patterns.*slp, .*zip, .*csv

Insert excerpt
Script Snap Pack
Script Snap Pack
nopaneltrue