Bringing Apache Spark™ Closer to SIMD and GPU

by Gita Koblents, Kazuaki Ishizaki, Hiroshi Inoue

Accelerating the Apache Spark™ execution engine has always been a focus of the Spark development community. As a result, significant performance improvements were delivered in Spark 2.0 compared with Spark 1.6. Most of the improvements were implemented as part of Project Tungsten.

The goal of Project Tungsten is to push Spark performance closer to the hardware limits by introducing:

  • Customized memory management to eliminate the overhead of JVM object model and garbage collection
  • Cache-aware algorithms and data structures to exploit memory hierarchy
  • New DataFrame and Dataset APIs to take advantage of structured data access
  • Code generation that exploits modern compilers and CPUs

So far, Project Tungsten has been successfully improving Spark performance on CPUs. One of the longer term goals is to take advantage of the SIMD and GPU capabilities of the underlying hardware. The improvements can be achieved in several ways:

  • Implement SIMD and GPU enabled Spark APIs such as Alternating Least Squares (ALS).
  • Allow a programmer to implement arbitrary RDD operations using native code, by writing GPU kernels in CUDA using the GPUEnabler package, for example.
  • Combining Spark with other libraries, such as TensorFlow, that provide new APIs for GPU exploitation in some specific domains.
  • Implicit translation of arbitrary DataFrame and Dataset operations into SIMD or GPU code by the execution engine or with the help of a just-in-time (JIT) compiler.

In this article, we discuss how arbitrary DataFrame and Dataset operations that were originally written using the high level Spark API (Scala, Java, or Python), can be automatically compiled into SIMD-enabled or GPU-enabled native code by a JIT compiler. In this model, the JIT compiler compiles code generated by the Spark execution engine into sequential code, SIMD instructions, or GPU kernels, based on the hardware capabilities and runtime conditions on a particular node.

In contrast to other approaches, our SIMD and GPU exploitation is completely implicit and doesn't require any involvement from the Spark programmers while accelerating DataFrame and Dataset operations in their code. We think our approach has its own advantages and will be complementary to the other ones listed above.

We first give a brief introduction to Tungsten, identify some of the automatic parallelization challenges we encountered, describe the general approach for solving them, and finally, outline a roadmap for the JIT-based SIMD and GPU exploitation in Spark. We will use subsequent blogs to describe individual Spark changes and the improvements they provide.

Is our work beneficial only with SIMD and GPU?

Even though the Spark execution model is inherently data parallel, we found that a few changes need to be applied to Tungsten data representation and code generation to make them more suitable for SIMD and GPU exploitation. While some of these changes are specifically designed to make the JIT compiler's parallelization task easier, most contributions provide substantial improvements on their own. For example, we achieved the following results when running on CPU without SIMD:

  • Up to 3.4 times speed-up on programs that use cached DataFrames and Datasets
  • Up to 16.6 times speed-up on programs that use DataFrames and Datasets that contain array elements

The full results that we have achieved so far, along with the corresponding JIRA numbers, are listed in the subsequent discussion, "The Roadmap to SIMD and GPU", toward the end of this post.

Introduction to Tungsten

Tungsten introduced two main APIs: DataFrame and Dataset. Let's take a quick look at simple examples of both. In this first example, a DataFrame is created by using the toDF method on an RDD.

// Take each integer element in the range [1, 16] and create a <Long, Double> pair where the double value is 2.5X the long element value

 val df = sparkContext.parallelize(1 to 16, 1).map(i => (i.toLong, i * 2.5)).toDF("l", "d")
 // cache
 df.cache
 df.count  // force cache creation
 val c = df.filter("l > 8").filter("d > 24").count
 print(s"COUNT:$c\n")
 //COUNT:7

In this example, each DataFrame element contains a Long and a Double field. The elements meeting the conditions specified by the two filter operators are counted by the count method. The same task implemented using Dataset would look like this:

 case class Data(l: Long, d: Double)
 // Enumerate each <Long, Double> pair where the double value is 2.5X the long element value in the range [1, 16]
 val ds = Seq(Data(1, 2.5), Data(2, 5.0), ..., Data(16, 40.0).toDS()
 // cache
 ds.cache
 ds.count  // force cache creation 
 val c = ds.filter(e => e.l > 8).filter(e => e.d > 24).count
 print(s"COUNT:$c\n")
 //COUNT:7

The main difference between the two interfaces is that DataFrame accepts only SQL-style operators (such as filter, select, and sum) while Dataset can run an arbitrary user code specified in the closure. Dataset also provides strong type checking while DataFrame does not. To fully optimize these programs, Spark provides a component called Catalyst that consists of an SQL optimizer and a Java code generator. Catalyst performs the following steps to exploit modern compilers and hardware:

  1. The optimizer applies SQL-level optimizations.
  2. Code generator generates a loop written in Java that accesses and processes the customized storage by multiple operators, one row at a time (referred to as whole-stage code generation).
  3. The generated code is compiled by a lightweight Java compiler (called Janino) into class files, which in turn are loaded and executed by the JVM.
  4. Finally, a JIT compiler can compile some of the generated Java methods into native code.

As an example, here's a pseudocode representation of the Java code generated by Catalyst for the small DataFrame program above:

count = 0  
while (next row is non-empty?) {  
  read a row by calling getRow()
  get l from the row by calling getLong()
  if (l <= 8) continue;
  get d from the row by calling getDouble()
  if (d <= 24) continue;
  count++;
}

You can see that this generated code can't be easily translated into native SIMD or GPU code because it's hard to recognize the while loop as parallelizable. In addition, there are some overheads that should be eliminated even without the SIMD or GPU exploitation.

We'll describe the changes that we deem necessary to exploit SIMD and GPU in the code generated by Catalyst.

JIT-based approach to SIMD and GPU

There are two ways to bring the generated code closer to exploiting SIMD and GPU:

  • The execution engine could generate SIMD or GPU code explicitly: for example, by using LLVM IR or openCL.
  • The execution engine could still generate Java code and then let the JIT compiler produce SIMD-enabled or GPU-enabled native code.

We chose the JIT-based approach for a few reasons:

  • Code generation in Spark happens on the driver node. The Java code is sent to worker nodes where it's compiled by Janino and executed by the JVM. For that reason, the generated code is common among all the executors and is not specialized for the actual hardware it will run on. We would like the JIT compiler to generate appropriate native code based on the actual hardware capabilities and runtime conditions on a specific node. We believe that the JIT compiler is most suitable for this task due to its dynamic features, runtime profiling, and recompilation techniques.
  • The Java code generated by Catalyst contains calls to other generated code, to internal Spark classes, and to the Java Class Library. In addition, because it is automatic, the code generator might produce some redundant code. An optimizing compiler is needed to inline the calls and remove any redundant code before converting the program into SIMD or GPU code.

At the same time, to facilitate the aggressive optimizations by the JIT compiler, the generated Java code should be as clear and as concise as possible. Since Java bytecode does not support statement annotations and we don't want to introduce any proprietary interface, we need to convey the semantics of the generated code to the JIT compiler by using standard Java. For example, we would like to make it easier for the JIT compiler to deduce the fact that the loop iterations are independent from each other so that it can parallelize the loop.

The following are the requirements for the loop to be eventually SIMDized or sent to GPU by the JIT compiler:

  1. The loop should be a counted loop (for example, controlled by a local induction variable that increases from a lower to an upper bound) to facilitate loop parallelization.
  2. It should directly access consecutive array elements in a column-oriented storage.
  3. It should have as few branches as possible because branches are costly for both SIMD and GPU.
  4. It should not have calls or should contain only calls that can be inlined.

Note that the loop in our example above simply iterates through all the rows of the DataFrame and counts the elements that meet the conditions specified by the two filter operators. Ideally, it should look like this:

// init col0_l and col1_d
long[] col0_l = {1, 2, ..., 16}  
double[] col1_d = {2.5, 5.0, ..., 40.0}

count = 0  
for (idx = 0; idx < 16; idx++) {  // counted loop  
  get e.l from col0_l[idx]  // consecutive array elements
  get e.d from col1_d[idx]  // consecutive array elements
  if (e.l <= 8) && (e.d <= 24) continue
  count++
}

Presenting the loop as a simple counted loop should help the JIT compiler to generate optimal native code. On the other hand, for the reasons described above, the code generator might not be able to produce such a perfect loop. To achieve our final goal of automatic parallelization, we therefore need to strike a balance between changes to Spark and to the JIT compiler.

The Roadmap to SIMD and GPU

Because we need some cooperation between Apache Spark and the JIT compiler in order to produce the most efficient native code, we can divide the required changes into two categories:

Spark improvements:

  • Data representation. We would like to make sure that the column-oriented storage has the most efficient layout to facilitate direct and consecutive access to the DataFrame and Dataset elements, including the ones that contain complex data structures. We also want to make the cache method that creates such storage as fast as possible.
  • Code generation. Some changes in the way code is generated need to be made to assist the JIT compiler with automatic parallelization. For example, currently, the generated loop is not a counted loop. In addition, some unnecessary data transformations from columnar to row-based format are present inside the loop. These should be eliminated.
  • Dataset improvements. Code generation for a Dataset includes a conversion from the customized Spark format to the Scala representation so that it can be consumed by the closure function. One way to eliminate this conversion is to transform the bytecode of the closure function so that it accesses the internal storage directly. This is a big project in itself and we'll talk about it in some of our future blogs.

JIT compiler improvements:

  • Redundant code elimination. The JIT compiler applies advanced optimizations to eliminate redundant code across one or multiple methods, by using method inlining.
  • Loop dependence analysis. The JIT compiler needs to be able to perform basic loop-dependence analysis so that loop iterations can be executed in parallel.
  • SIMD and GPU transformations. Finally, the JIT compiler has to transform the loop into SIMD instructions or into a GPU kernel.

So far, we've implemented the data representation and code generation changes in Spark. These changes have provided considerable performance improvements on some commonly used DataFrame and Dataset operations:

alt

Some of the pull requests are still under review but we hope they'll be merged into Spark soon.

We plan to implement the necessary changes in the JIT compiler as part of the soon-to-be-open-sourced J9 JVM (called Open J9) and in the open source Eclipse OMR project.

Conclusion

Generating highly optimized native code that exploits all the capabilities of the underlying hardware, including SIMD and GPU, is one of the ultimate goals of Project Tungsten. However, our investigation has shown that a number of challenges remain in achieving this goal.

In this blog, we've summarized our findings and outlined a roadmap based on the JIT approach. We've already implemented some of the Spark improvement items in this roadmap. Most of the changes provided considerable performance improvements on their own, even without the actual SIMD and GPU exploitation. We plan to dedicate our subsequent blogs to describing those changes in more detail and also to provide updates on our future progress in bringing Apache Spark closer to SIMD and GPU.

References
1. Spark 2.0: https://databricks.com/blog/2016/07/26/introducing-apache-spark-2-0.html
2. Project Tungsten: https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html
3. DataFrames and Datasets: http://spark.apache.org/docs/latest/sql-programming-guide.html
4. ALS: https://github.com/IBMSparkGPU/CUDA-MLlib/tree/master/als
5. GPUEnabler: http://www.spark.tc/gpu-acceleration-on-apache-spark-2
6. TensorFlow: https://databricks.com/blog/2016/01/25/deep-learning-with-apache-spark-and-tensorflow.html
7. Catalyst: https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html
8. Whole-stage code generation: https://databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html
9. Janino: http://janino-compiler.github.io/janino/
10. Open J9: https://www.infoq.com/news/2016/09/JavaOne-2016-IBM-Keynote-OpenJ9
11. Eclipse OMR: https://github.com/eclipse/omr

Newsletter

You Might Also Enjoy

Gidon Gershinsky
Gidon Gershinsky
21 days ago

How Alluxio is Accelerating Apache Spark Workloads

Alluxio is fast virtual storage for Big Data. Formerly known as Tachyon, it’s an open-source memory-centric virtual distributed storage system (yes, all that!), offering data access at memory speed and persistence to a reliable storage. This technology accelerates analytic workloads in certain scenarios, but doesn’t offer any performance benefits in other scenarios. The purpose of this blog is to... Read More

James Spyker
James Spyker
3 months ago

Streaming Transformations as Alternatives to ETL

The strategy of extracting, transforming and then loading data (ETL) to create a version of your data optimized for analytics has been around since the 1970s and its challenges are well understood. The time it takes to run an ETL job is dependent on the total data volume so that the time and resource costs rise as an enterprise’s data volume grows. The requirement for analytics databases to be mo... Read More