Testing Spark tasks with PyTest, Mock and Luigi
TL;DR
In this blog post I describe briefly how to test PySpark tasks using a combination [Luigi](), [PyTest]() and [Mock]().
Intro
At Trust You we have a lot of [Hadoop streaming]() Python jobs. Most of them are written in Python (and some in Pig). One of the things that bothered me a lot of working in such way is that testing may become complicated as simulating the cluster setting might impose some restrictions.
Althought not the only reason, the complexity of testing such types of processing pipelines might contribute to ignore testing part, mostly under the believe that it is not need it or worth. The trickiest part is that problems in a particular part a data processing pipeline might only become evident in a upstream stage, making debugging difficult.
Luckily, Spark and PySpark make testing simpler as they allow to run Spark application in local cluster making available all the hight level abstractions such as [DataFrames](). This combined with Pytest, Luigi and Pytest-Fixtures.
PySpark Tasks with Luigi
Let's start with the basics of how to run a PySpark with Luigi. Luigi has the concept of [Task](), which is basically a step in a data pipeline. For example dumping data from a database or running a MapReduce job. To run a Spark job you simple need to set the spark configuration in the Luigi configuration file (luigi.cfg) and create a class that inherit from [~luigi.contrib.spark.PySparkTask~]():
from luigi.contrib.spark import PySparkTask
from luigi.contrib.hdfs import HdfsTarget
class SamplePySparkTask(PySparkTask):
# Spark options can be set a class attributes
driver_memory = '4g'
executor_memory = '16g'
num_executors = 8
executor_cores = 2
def main(self, sparkContext):
# This is where implement the method
pass
def output(self):
# After executing main this file should exists for this task to be consided completed
return HdfsTarget('myresult.txt')
def requires(self):
# This should return either a task of a required file.
Above is the basic structure of a task. The method main
receives the Spark context as variable. For Luigi is does not matter what we do with the context as long
as we have the output declared the output
.
Now Let's test for example a Task that loads a CSV with the following structure.
TODO find out how make a good looking table with Bootstrap
Out little Spark task will group by user and get the average and will output the result as JSON Line file using the following format.
{
"customer": "Mario X.",
"month": "June",
"average": 123.42
}
The necessary Luigi configuration would be as following (assuming Spark in installed):
[spark]
master: local
Testing with fixtures
For running a Luigi pipe we require to have a luigi configuration loaded into memory. In a real world pipe it will contain luigi specific confguration along with application specific setting.
This is is a good example for a fixture, that is BLAH BLAH BLAH BLAH.
Comments
Comments powered by Disqus