Aggregating with Apache Spark

An aggregate in mathematics is defined as a « collective amount, sum, or mass arrived at by adding or putting together all components, elements, or parts of an assemblage or group without implying that the resulting total is whole. » While there are many uses for aggregation in data science–examples include log aggregation, spatial aggregation, and network aggregation–it always pertains to some form of summation or collection. In this article, we’ll look at the mechanics of aggregation in Apache Spark, a top-level Apache project that is popularly used for lightning-fast cluster computing.

Aggregation is abstract in theory, but we use it all the time in the real world. Imagine a garden (call it the JavaWorld garden) that is divided into four quadrants: east, west, south, and north. Each quadrant is landscaped with beautiful roses of different varieties, from Alba to Mini-flora to Polyantha. Three people are assigned to each quadrant of the garden, and each person is tasked with picking all of the roses in their section of the quadrant. In total, twelve people are picking roses in the garden.

Our task is to find the aggregate number of all the flowers picked. We’ll then divide that number by 12, to determine the average number of flowers picked by each person.


Get the source code for the example applications demonstrated in this article: « Aggregating with Apache Spark. » Created by Ravishankar Nair for JavaWorld.

Aggregation with the Streams API

At first, the problem statement appears very simple. Let’s consider the first part: find the aggregate number of roses picked from the entire garden. Starting in Java 8, we can use Stream to represent a sequence of elements and run various computational operations on those elements. In this case, we could use it to process the elements of an array, reducing them to a function able to take in another function, like so:

Listing 1. Aggregation with Stream

public class NormalAggregate
public static void main(String[] args)
/* First store number of flowers picked by each person in an
array called flowers */
int[] flowers = new int[]{11,12,13,24,25, 26, 35,36,37, 24,15,16};
int noofpersons=12; int sum = IntStream.of(flowers).reduce( 0,(a, b) -> a + b); System.out.println("The no of flowers aggregated: " + sum);
System.out.println("The average flowers picked per person: " + (float)sum/(float)noofpersons); System.out.println("Another way to find aggregate :" + IntStream.of(flowers).sum());

Streams and pipes

If you’ve ever worked with pipes in Unix, the code in Listing 1 will look familiar. Here’s how we would use a pipe to filter out the Apache logs for the month of August:

Listing 1. Filtering with pipes

ls -al | grep Aug

As its name indicates, a pipe (|) is a component in a Unix pipeline, taking output from one command and letting it flow to the next one. Going back to the problem in Java, we would first store the number of flowers picked by each person in an array called flowers. Using a lambda function, we would then add the elements with an accumulator value of 0. Internally, the parameter starts with the initial element of the array, then adds in succession until it reaches the last element:

Listing 2. Calculating in aggregate with Stream

int accumulator = 0;
for( int i = 0; i < flowers.length; i++)
accumulator += flowers[i];

So far, so good. Now let’s see what happens when we distribute the load.

A better approach: Parallel aggregation with Java threads

Multicore CPUs are common enough now that we can take a multithreaded approach to solving aggregation. Assuming for the sake of example that we have an Intel i7 four-core processor, we would start by dividing the array, with a lower and upper bound for each available processor. We’d then calculate the aggregate. Listing 3 shows the most important part of the solution. You can find the rest in the source code for this article.

Listing 3. Multithreaded calculation using multicore processing

public static int parallelAggregate(int[] arr, int threads) { int size = (int) Math.ceil(arr.length * 1.0 / threads); ParallelAggregation[] individualTotals = new ParallelAggregation[threads]; for (int i = 0; i < threads; i++) { individualTotals[i] = new ParallelAggregation(arr, i * size, (i + 1) * size); individualTotals[i].start(); } try { for (ParallelAggregation sum : indivdualTotals) { sum.join(); } } catch (InterruptedException e) { } int total = 0; for (ParallelAggregation sum : individualTotals) { total += sum.getPerThreadAggregate(); } return total; }

The function in Listing 3 starts by dividing the size of the array almost equally among the number of threads. We can then run parallel threads, taking the sum from each. Finally, we add all of the sums to get the aggregate.

Here’s how we would invoke the above method:

public static int parallelAggregate(int[] arr){
return parallelAggregate(arr, Runtime.getRuntime().availableProcessors());

Figure 1 is a screenshot of the number of processors used to run this test in our standalone system:

jw sparkaggregate fig1Ravishankar Nair
Figure 1. Number of available processors

The function in Listing 3 uses all of the available processors, with each processor running its own thread. Figure 2 shows the details of the Intel i7 processor.

jw sparkaggregate fig2Ravishankar Nair
Figure 2. Screenshot of Intel processor details

Figure 2 shows that we have up to four threads available in the i7 Intel processor, so our program will divide the number of elements in the array into four equal sizes and calculate the aggregate. For the sake of comparison, we will run the aggregation using both threadless and multithreaded aggregation. For fun, let’s assume we have 500 million flower pickers:

Listing 4. Threaded and multithreaded aggregation

public static void main(String[] args)
{ java.util.Random rand = new java.util.Random(); int[] flowers = new int[500000000]; for (int i = 0; i < flowers.length; i++) { flowers[i] = rand.nextInt(50) + 1; // simulate 1..50 }
long start = System.currentTimeMillis();
System.out.println("Single: " + (System.currentTimeMillis() - start));
start = System.currentTimeMillis();
System.out.println("Parallel: " + (System.currentTimeMillis() - start)); }

Figure 3 shows the sample output from an intel i7-5500 CPU based system.

jw sparkaggregate fig3Ravishankar Nair
Figure 3. Output of threadless and multithreaded aggregations

Comparing threadless and threaded approaches

The program in Listing 4 works, and it produces accurate results for both multithreaded and threadless solutions. But how would these solutions scale for big data? Let’s say we wanted to aggregate for trillions of flower pickers. Figure 4 shows the results for the multithreaded program:

jw sparkaggregate fig4Ravishankar Nair
Figure 4. Output with a trillion flower pickers

The program doesn’t have sufficient processing power for an array of that size.  Next, we run the program again. This time we keep the number of flower pickers to 500 million, but increase our thread count from four to 1,000.

jw sparkaggregate fig5Ravishankar Nair
Figure 5. Output for the same program with 1000 threads

The program runs this time, but the output isn’t as promising as it was when running four threads on four processors. In multicore, a common load-balancing recommendation is n+1 threads, with n being the number of CPU cores available. That way, n threads can work with the CPU while one thread waits for disk I/O. Having fewer threads would not fully utilize the CPU resource (because at some point there will always be I/O to wait for), while having more threads forces them to fight for CPU.

Threads come at a cost, which pays off when you have the dedicated CPU cores to run your code. On a single-core CPU, a single-process (threadless) solution is usually faster. Threads alone don’t automatically increase the speed of processing, but they do require more work. As you saw, threads also don’t necessarily scale for big data. Given these issues, our search for an ideal solution is not over.

MapReduce: When scalability is the goal

In order to solve our problem statement and aggregate at a scale of one trillion or more flower pickers, we need a solution that can scale without dependency on the underlying machine’s processing capacity. We want accuracy, consistency, fault tolerance, fail safety (gradual degradation), and efficient resource utilization.

MapReduce 2, with YARN, is a good framework for this challenge. The script below generates 500 million random numbers of flowers between 1 and 50:

Listing 5. Aggregation with MapReduce and YARN

 for i in {1 .. 500000000 }; do echo $[($RANDOM % 50 +1)]; done > test.dat

To run this example for yourself, find the complete « test.dat » script in the source download. Create a directory in your HDFS and place the test data file inside, then follow the instructions.

Here’s the mapper for the MapReduce aggregation:

Listing 6. Aggregation mapper

public void map(Object key,Text value,Context context) throws IOException, InterruptedException { StringTokenizer tokenizer = new StringTokenizer(value.toString(), " \t\n\r\f,.:;?![]'"); while (tokenizer.hasMoreTokens()) { // make the words lowercase so words like "an" and "An" are counted as one word String s = tokenizer.nextToken().toLowerCase().trim(); IntWritable val = new IntWritable(Integer.parseInt(s)); word.set("aggregate"); context.write(word, val); } }

The map function in Listing 6 reads the input data line by line. It then creates a key called aggregate. It emits every number on each line, along with the aggregate key as a tuple. Note that we’ve used the same key for the reducer in Listing 7, thus directing the output from both map and reduce to a single node for aggregation.

Listing 7. Aggregation reducer

public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } total.set(sum); // this writes the word and the count, like this: // ("aggregate", 2) context.write(key, total); }

The reducer gets the emitted key and list of values from the mapper and aggregates all values. Because we’ve used a constant key, the reduce operation occurs in the same node. Using the reducer class as a combiner in the driver program makes this MapReduce program very efficient for executing on a large cluster.

Using MapReduce with YARN, we can efficiently execute data for a trillion rows on a two-node cluster. If we need to add even more flower pickers, we can scale by increasing application nodes. The inherent design of MapReduce allows speculative execution, which ensures fail safety by running a filed or slow task in a secondary node, where a copy of the original data resides. Figure 6 shows the output for a successful single-node execution.

jw sparkaggregate fig6Ravishankar Nair
Figure 6. MapReduce output

InnoValeur | Data Science | Smart Data | Machine Learning | AI

Publier un commentaire

Ce site utilise Akismet pour réduire les indésirables. En savoir plus sur comment les données de vos commentaires sont utilisées.