PySpark
In this article
Overview
You can use this Snap to execute a PySpark script. It formats and executes a 'spark-submit' command in a command line interface, and then monitors the execution status.
Snap Type
The Pyspark Snap is a Write-type Snap.
Support for Ultra Pipelines
Works in Ultra Pipelines.
Prerequisites
The Snap must be executed in a Groundplex on a Spark cluster node or an edge node.
Snap Views
Type | Format | Number of Views | Examples of Upstream and Downstream Snaps | Description |
---|---|---|---|---|
Input | Document
|
| JSON Generator | If the upstream Snap is connected, this Snap executes after each input document and produces a document in the output view or an error document in the error view. Each input document is used to evaluate expression properties in the Snap. |
Output | Document |
| Mapper | If the script executes successfully with an exit code 0, the Snap produces output documents with the status. If the script is coded to produce a standard output, it is also included in the output document. It produces one output document for each execution of the PySpark script. If the script fails (with an exit code other than 0), the Snap produces an error document in the error view. |
Error | Error handling is a generic way to handle errors without data loss or Snap execution failure. You can handle the errors that the Snap might encounter when running the pipeline with one of the following options from the When errors occur list under the Views tab. The available options are:
Learn more about Error handling in Pipelines. |
Snap Settings
Field | Field Type | Description |
Label* | String | Specify the name for the Snap. You can modify this to be more specific, especially if you have more than one of the same Snaps in your pipeline. Default value: PySpark |
Spark home | String/Expression | Specify the Spark home directory where spark-submit command is located under the bin/ subdirectory. If this property is empty, the Snap tries to find a value for "SPARK_HOME" or "CDH_SPARK_HOME" in the environment variables or system properties. Default value: None |
Spark Submit Command | Dropdown list | Spark-submit is a command-line tool provided by Apache Spark for submitting applications written in PySpark to a Spark cluster. It handles the submission of your script and controls where the application runs. Choose the Spark command to run your PySpark application on a cluster. The available options are:
Ensure the program is accessible from the Spark Home under the bin folder. Learn more about the Spark submit command. Default value: spark-submit |
Spark submit args | String/Expression | Specify the arguments for the spark-submit command, if any. Default value: [None] _sparkSubmitArgs --master yarn --deploy-mode cluster (to submit the PySpark script to YARN) --principal snaplogic/admin@cloudev.snaplogic.com --keytab /snaplogic.keytab.new (to submit the PySpark script to Kerberos-enabled cluster) |
Edit PySpark script | Button | This property enables you to edit a PySpark script. A 'word-count' sample script is included with the Snap. Click to open an editor and save. To try the sample script, enter a file path to an input text file in the Script args property. In the script editor, a script can be exported, imported, or a template can be generated as required. Learn more: RDD Programming Guide - Spark 4.0.0 Documentation Default value: 'wordcount' sample script |
Script args | String/Expression | Specify the arguments for the PySpark script. Default value: [None] |
YARN RM (host:port) | String/Expression | Specify the hostname and port number of the Yarn Resource Manager in the 'host:port' format. This property is required to stop a PySpark job in progress. Default value: [None] If YARN is not used to submit the PySpark script, then stopping the Snap will not halt the job submitted to Spark. |
Timeout (sec) | String/Expression | Specify the timeout limit in seconds. If negative or empty, the Snap will not time out until spark-submit returns the result. Default value: -1 |
Dropdown list |
Troubleshooting
The Snap produces an error document if a given PySpark script fails to execute. It may be helpful in troubleshooting to execute the script in a command line interface of a Groundplex where the Snap is executed.
Example
Execute PySpark script
The following example pipeline demonstrates how the PySpark Snap executes the PySpark script:
Configure the PySpark Snap as follows:
The following PySpark Script is executed:
PySpark Word Count script template
### PySpark Word Count script template ###
################################################################
# If you are not familiar with Spark Programming, #
# please check Spark programming guide for more detail. #
# http://spark.apache.org/docs/latest/programming-guide.html #
################################################################
import sys
from operator import add
from pyspark.sql import SQLContext
from pyspark import SparkContext
if __name__ == "__main__":
# check if the number of arguments is 2
if len(sys.argv) < 3:
print >> sys.stderr, "Usage: wordcount <input file> <output file>"
exit(-1)
# get contexts
sc = SparkContext()
sqlContext = SQLContext(sc)
# do word-count
lines = sc.textFile(sys.argv[1], 1)
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
output = counts.collect()
# define column names for CSV output
df = sqlContext.createDataFrame(counts)
oldColumns = df.schema.names
newColumns = ["word", "count"]
newdf = reduce(lambda df, idx: df.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), df)
# write wordcount result to output file with overwrite mode
newdf.write.mode("overwrite").format("com.databricks.spark.csv").option("header", "true").save(sys.argv[2])
# print wordcount result to standard output
print "word, count"
for (word, count) in output:
print "%s, %i" % (word, count)
On validation, the Snap produces output as shown below:
Downloads
Have feedback? Email documentation@snaplogic.com | Ask a question in the SnapLogic Community
© 2017-2025 SnapLogic, Inc.