Running Luigi Hadoop JobTask in a Virtual Environment

Virtual Environments and Hadoop Streaming

If you are using Python to write Hadoop streaming job you might have experience the issues of keeping the nodes with the required packages. Furthermore, if you happen to have different set of jobs, workflows or pipelines that require different version of packages you might find yourself in not so conformable situation.

A former work colleague wrote on how to aleviate this by using Python's virtual environments. So I am going to assume you quickly browse to the article and you are wondering how to do something similar but with Luigi.

Before taling about Luigi, a summary of running streaming jobs with virtualenvs (without Luigi):

Normally, if you don't need a virtualenv, you will write a Python script for the mapper and one for the reducer and assuming you have already the data you need to process on HDFS you will call it something like this:

[mc@host]$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar
-D mapreduce.job.name="My Cool MR Job"
-files reducer.py, mapper.py
-mapper mapper.py
-reducer reducer.py

So here mapper.py is the mapper and reducer.py is the reducer. Nothing new if you have used Python for Hadoop Streaming. So, let's asumme we want a particular module that is not installed at the system level on the nodes of the cluster. Say, spaCy:

[mc@host]$ virtualenv-2.7 demoenv
New python executable in demoenv/bin/python2.7 ... done.
[mc@host]$ source demoenv/bin/activate
(demoenv)[mc@host]$ pip install spacy
Collecting spacy
...
Successfully built plac semver
Installing collected packages: cymem, preshed, murmurhash, thinc, semver, sputnik, cloudpickle, plac, spacy
Successfully installed cloudpickle-0.2.1 cymem-1.31.1 murmurhash-0.26.3 plac-0.9.1 preshed-0.46.3 semver-2.4.1 spacy-0.100.6 sputnik-0.9.3 thinc-5.0.7
(demoenv)[mc@host]$ deactivate
[mc@host]$ virtualenv-2.7 --relocatable demoenv
cd demoenv; zip --quiet --recurse-paths ../demoenv.zip *
hadoop fs -put -f demoenv.zip

I make the virtualenv relocatable so that it can be moved and both the binaries and libraries are referenced using relative paths. Bear that the documentation also says that this feature is experimental and has some caveats, so use it at your own risk. I also compress it and upload it to HDFS.

Now to run it we need to do two thing, change the shebang of the script to point to the venv and point to the archive with -archives parameter when running the hadoop streaming job. Assuming we are creating a link to the archive with the name demoenv we change the beginning of mapper.py and reducer.py:

#!./demoenv/bin/python

import spacy
....

And then we execute:

[mc@host]$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar
-D mapreduce.job.name="My Cool MR Job"
-files reducer.py, mapper.py
-mapper mapper.py
-reducer reducer.py
-archives hdfs://[host]:8020/user/mc/demoenv.zip#demoenv

Note the archives parameter with the symlink. That symlink has to match the path specified on the shebang.

Running Luigi HadoopJobTask in a Python Venv

So far I have showed nothing new but a compressed version of Hennig's post. So far it was impossible to do something similar with Luigi unless you created a JobRunner by basically rewriting (i.e. copy and pasting) some of Luigi's code. So I decided to make a small contribution to Luigi that would allow me to implement something similar to the things described in the previous section.

With that in Luigi code, is easy to create a new base class that pull the virtual environment location from Luigi's configuration and set-up a runner that pass the parameter to add the archive in underlying Hadoop streaming call.

I created the VenvJobTask that read the virtual environment location from the configuration. It can be local or it can be located on HDFS. It overrides the job_runner method to setup properly the python executable path (so no shebang modification is needed in this case). It references a small custom runner class that changes the default behavior of DefaultHadoopJobRunner to pass the appropriate parameter.

At the time of writing, this change has not been yet released, so it will be probably part of Luigi 2.1.2. In the mean time, you can install Luigi directly from Github. I tested the above code on Python 2.7.6 and Hadoop 2.7.1 in Hortoworks HDP 2.3.

Comments

Comments powered by Disqus