PySpark

PySpark

 

In this article

Overview

You can use this Snap to execute a PySpark script. It formats and executes a 'spark-submit' command in a command line interface, and then monitors the execution status.

pyspark-overview-image.png

 

Snap Type

The Pyspark Snap is a Write-type Snap.

Support for Ultra Pipelines

Works in Ultra Pipelines.

Prerequisites

The Snap must be executed in a Groundplex on a Spark cluster node or an edge node.

Snap Views

Type

Format

Number of Views

Examples of Upstream and Downstream Snaps

Description

Type

Format

Number of Views

Examples of Upstream and Downstream Snaps

Description

Input 

Document

 

  • Min: 0

  • Max: 1

JSON Generator

If the upstream Snap is connected, this Snap executes after each input document and produces a document in the output view or an error document in the error view. Each input document is used to evaluate expression properties in the Snap.

Output

Document

  • Min: 0

  • Max: 1

Mapper

If the script executes successfully with an exit code 0, the Snap produces output documents with the status. If the script is coded to produce a standard output, it is also included in the output document. It produces one output document for each execution of the PySpark script.

If the script fails (with an exit code other than 0), the Snap produces an error document in the error view.

Error

Error handling is a generic way to handle errors without data loss or Snap execution failure. You can handle the errors that the Snap might encounter when running the pipeline with one of the following options from the When errors occur list under the Views tab. The available options are:

  • Stop Pipeline Execution: Stops the current pipeline execution when the Snap encounters an error.

  • Discard Error Data and Continue: Ignores the error, discards that record, and continues with the remaining records.

  • Route Error Data to Error View: Routes the error data to an error view without stopping the Snap execution.

Learn more about Error handling in Pipelines.

 

Snap Settings

Field

Field Type

Description

Label*

String

Specify the name for the Snap. You can modify this to be more specific, especially if you have more than one of the same Snaps in your pipeline.

Default value: PySpark
Example: PySpark

Spark home

String/Expression

Specify the Spark home directory where spark-submit command is located under the bin/ subdirectory. If this property is empty, the Snap tries to find a value for "SPARK_HOME" or "CDH_SPARK_HOME" in the environment variables or system properties.

Default value: None
Example: /opt/cloudera/parcels/CDH-5.8.4-1.cdh5.8.4.p0.5/lib/spark

Spark Submit Command

Dropdown list

Spark-submit is a command-line tool provided by Apache Spark for submitting applications written in PySpark to a Spark cluster. It handles the submission of your script and controls where the application runs.

Choose the Spark command to run your PySpark application on a cluster. The available options are:

  • spark-submit: This is the generic command-line tool to submit applications to Apache Spark.

  • spark2-submit: Specifically refers to the Apache Spark 2.x binary. Use this option when your pipeline relies on APIs or behaviors specific to Spark 2.

  • spark3-submit: Specifically refers to Apache Spark 3.x. Use this option when you want to take advantage of the new features introduced in Spark 3, such as adaptive query execution, new SQL functions, and improved ANSI compliance.

Ensure the program is accessible from the Spark Home under the bin folder. Learn more about the Spark submit command.

Default value: spark-submit
Example: spark2-submit

Spark submit args

String/Expression

Specify the arguments for the spark-submit command, if any.

Default value: [None]
Example:   $sparkSubmitArgs

                   _sparkSubmitArgs

                   --master yarn --deploy-mode cluster (to submit the PySpark script to YARN)

                   --principal snaplogic/admin@cloudev.snaplogic.com --keytab /snaplogic.keytab.new (to submit the PySpark script to Kerberos-enabled cluster)

Edit PySpark script

Button

This property enables you to edit a PySpark script. A 'word-count' sample script is included with the Snap. Click to open an editor and save. To try the sample script, enter a file path to an input text file in the Script args property. In the script editor, a script can be exported, imported, or a template can be generated as required. Learn more: RDD Programming Guide - Spark 4.0.0 Documentation

Default value: 'wordcount' sample script

Script args

String/Expression

Specify the arguments for the PySpark script.

Default value: [None]
Example: hdfs:///tmp/sample.txt  hdfs:///tmp/output.dir/ (input file and output directory for 'wordcount' sample script)

YARN RM (host:port)

String/Expression

Specify the hostname and port number of the Yarn Resource Manager in the 'host:port' format. This property is required to stop a PySpark job in progress. 

Default value: [None]
Example: rm01.hadoop.cluster:8032

If YARN is not used to submit the PySpark script, then stopping the Snap will not halt the job submitted to Spark.

Timeout (sec)

String/Expression

Specify the timeout limit in seconds. If negative or empty, the Snap will not time out until spark-submit returns the result.

Default value: -1
Example:   600 (10 minutes)

Dropdown list

 

Troubleshooting

The Snap produces an error document if a given PySpark script fails to execute. It may be helpful in troubleshooting to execute the script in a command line interface of a Groundplex where the Snap is executed.


Example

Execute PySpark script

The following example pipeline demonstrates how the PySpark Snap executes the PySpark script:

Configure the PySpark Snap as follows:

ex1-pyspark-config.png

The following PySpark Script is executed:

PySpark Word Count script template

### PySpark Word Count script template ### ################################################################ # If you are not familiar with Spark Programming, # # please check Spark programming guide for more detail. # # http://spark.apache.org/docs/latest/programming-guide.html # ################################################################ import sys from operator import add from pyspark.sql import SQLContext from pyspark import SparkContext if __name__ == "__main__": # check if the number of arguments is 2 if len(sys.argv) < 3: print >> sys.stderr, "Usage: wordcount <input file> <output file>" exit(-1) # get contexts sc = SparkContext() sqlContext = SQLContext(sc) # do word-count lines = sc.textFile(sys.argv[1], 1) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) output = counts.collect() # define column names for CSV output df = sqlContext.createDataFrame(counts) oldColumns = df.schema.names newColumns = ["word", "count"] newdf = reduce(lambda df, idx: df.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), df) # write wordcount result to output file with overwrite mode newdf.write.mode("overwrite").format("com.databricks.spark.csv").option("header", "true").save(sys.argv[2]) # print wordcount result to standard output print "word, count" for (word, count) in output: print "%s, %i" % (word, count)

 

On validation, the Snap produces output as shown below:

 

Downloads