Target audience: Beginner
Estimated reading time: 4'
This post describes a methodology to manage the Spark context while testing you application using ScalaTest.
Table of contents
Introduction
Debugging Apache Spark application using ScalaTest seems quite simple when dealing with a single test:- Specify your Spark context configuration SparkConf
- Create the Spark context
- Add test code related to your application
- Clean up resources (Spark context, Akka context, File handles...)
This post introduces two basic ScalaTest methods beforeAll and afterAll of the trait BeforeAndAfterAll to manage the context life-cyle of your test application.
Wrapping the Spark context
The objective is to create a small framework that create or retrieve an existing Spark context before executing a test and closing it after the test is completed, independently of the status of the test.The initialization of the Spark context consist of specifying the configuration for the sequence of tests. Some of these parameters can be dynamically defined through a simple parameterization. The context is created only if it is not already defined within the scope of the test using the getOrCreate method.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 | trait SparkContextWrapper {
protected[this] var sc: SparkContext = _
def getContext: SparkContext = {
val conf = new SparkConf().setAppName(s"Test-App")
.set("spark.executor.instances", "2")
.set("spark.driver.memory", "2g")
.set("spark.executor.memory", "2g")
.set("spark.executor.cores", "2")
.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.set("spark.rdd.compress", "true")
.set("spark.shuffle.spill", "true")
.set("spark.shuffle.spill.compress", "true")
.set("spark.shuffle.memoryFraction", "0.4")
.set("spark.io.compression.codec", "snappy")
.set("spark.network.timeout", "600")
sc = SparkContext.getOrCreate(conf.setMaster("local[4]"))
sc.setLogLevel("ERROR")
sc
}
def close: Unit = {
if (sc != null) sc.stop
}
}
|
Scala test context
The solution is to let ScalaTest method manage the lifecycle of the Spark context for testing.
1
2
3
4
5
6
7
8
9
10
11
12
13
14 | trait SparkContextForTesting
extends SparkContextWrapper with BeforeAndAfterAll {
self: Suite =>
override def beforeAll: Unit = {
getContext
super.beforeAll
}
override def afterAll: Unit = {
close
super.afterAll
}
}
|
Note: The BeforeAndAfterAll trait can only be sub-classed by a test suite inheriting the Suite trait. The method beforeAll (line: 5) is automatically called before the first test of your test suite and the method afterAll (line 10)is called after your last test is completed, whether those tests succeeded or not.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 | class MyTest
extends FlatSpec
with Matchers
with SparkContextForTesting {
val x: Int = -4
it should "My first test" in {
val sqlContext = new SQLContext(sc)
// ....
}
// ... other tests
it should "My last test" in {
// clean up after it completes
}
}
|
"That's all folks!"