Advanced Spark Programming

Spark contains two different types of shared variables − one is broadcast variables and second is accumulators.

  • Broadcast variables − used to efficiently, distribute large values.
  • Accumulators − used to aggregate the information of particular collection.

Broadcast Variables

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node, a copy of a large input dataset, in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.

Spark actions are executed through a set of stages, separated by distributed “shuffle” operations. Spark automatically broadcasts the common data needed by tasks within each stage.

The data broadcasted this way is cached in serialized form and is deserialized before running each task. This means that explicitly creating broadcast variables, is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.

Broadcast variables are created from a variable v by calling SparkContext.broadcast(v). The broadcast variable is a wrapper around v, and its value can be accessed by calling the value method. The code given below shows this −

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

Output −

broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

After the broadcast variable is created, it should be used instead of the value v in any functions run on the cluster, so that v is not shipped to the nodes more than once. In addition, the object v should not be modified after its broadcast, in order to ensure that all nodes get the same value of the broadcast variable.

Accumulators

Accumulators are variables that are only “added” to through an associative operation and can therefore, be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers can add support for new types. If accumulators are created with a name, they will be displayed in Spark’s UI. This can be useful for understanding the progress of running stages (NOTE − this is not yet supported in Python).

An accumulator is created from an initial value v by calling SparkContext.accumulator(v). Tasks running on the cluster can then add to it using the add method or the += operator (in Scala and Python). However, they cannot read its value. Only the driver program can read the accumulator’s value, using its value method.

The code given below shows an accumulator being used to add up the elements of an array −

scala> val accum = sc.accumulator(0) 
 
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

If you want to see the output of above code then use the following command −

scala> accum.value 

Output

res2: Int = 10 

Numeric RDD Operations

Spark allows you to do different operations on numeric data, using one of the predefined API methods. Spark’s numeric operations are implemented with a streaming algorithm that allows building the model, one element at a time.

These operations are computed and returned as a StatusCounter object by calling status() method.

The following is a list of numeric methods available in StatusCounter.

S.NoMethods & Meaning
1count()Number of elements in the RDD.
2Mean()Average of the elements in the RDD.
3Sum()Total value of the elements in the RDD.
4Max()Maximum value among all elements in the RDD.
5Min()Minimum value among all elements in the RDD.
6Variance()Variance of the elements.
7Stdev()Standard deviation.

If you want to use only one of these methods, you can call the corresponding method directly on RDD.

Leave a Reply