## Virtual Environments and Hadoop Streaming

If you are using Python to write Hadoop streaming job you might have experience the issues of keeping the nodes with the required packages. Furthermore, if you happen to have different set of jobs, workflows or pipelines that require different version of packages you might find yourself in not so conformable situation.

A former work colleague wrote on how to aleviate this by using Python's virtual environments. So I am going to assume you quickly browse to the article and you are wondering how to do something similar but with Luigi.

Before taling about Luigi, a summary of running streaming jobs with virtualenvs (without Luigi):

Normally, if you don't need a virtualenv, you will write a Python script for the mapper and one for the reducer and assuming you have already the data you need to process on HDFS you will call it something like this:

[mc@host]$hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -D mapreduce.job.name="My Cool MR Job" -files reducer.py, mapper.py -mapper mapper.py -reducer reducer.py  So here mapper.py is the mapper and reducer.py is the reducer. Nothing new if you have used Python for Hadoop Streaming. So, let's asumme we want a particular module that is not installed at the system level on the nodes of the cluster. Say, spaCy: [mc@host]$ virtualenv-2.7 demoenv
New python executable in demoenv/bin/python2.7 ... done.
[mc@host]$source demoenv/bin/activate (demoenv)[mc@host]$ pip install spacy
Collecting spacy
...
Successfully built plac semver
Installing collected packages: cymem, preshed, murmurhash, thinc, semver, sputnik, cloudpickle, plac, spacy
Successfully installed cloudpickle-0.2.1 cymem-1.31.1 murmurhash-0.26.3 plac-0.9.1 preshed-0.46.3 semver-2.4.1 spacy-0.100.6 sputnik-0.9.3 thinc-5.0.7
(demoenv)[mc@host]$deactivate [mc@host]$ virtualenv-2.7 --relocatable demoenv
cd demoenv; zip --quiet --recurse-paths ../demoenv.zip *


I make the virtualenv relocatable so that it can be moved and both the binaries and libraries are referenced using relative paths. Bear that the documentation also says that this feature is experimental and has some caveats, so use it at your own risk. I also compress it and upload it to HDFS.

Now to run it we need to do two thing, change the shebang of the script to point to the venv and point to the archive with -archives parameter when running the hadoop streaming job. Assuming we are creating a link to the archive with the name demoenv we change the beginning of mapper.py and reducer.py:

#!./demoenv/bin/python

import spacy
....


And then we execute:

[mc@host]\$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar
-D mapreduce.job.name="My Cool MR Job"
-files reducer.py, mapper.py
-mapper mapper.py
-reducer reducer.py
-archives hdfs://[host]:8020/user/mc/demoenv.zip#demoenv


Note the archives parameter with the symlink. That symlink has to match the path specified on the shebang.

So far I have showed nothing new but a compressed version of Hennig's post. So far it was impossible to do something similar with Luigi unless you created a JobRunner by basically rewriting (i.e. copy and pasting) some of Luigi's code. So I decided to make a small contribution to Luigi that would allow me to implement something similar to the things described in the previous section.
I created the VenvJobTask that read the virtual environment location from the configuration. It can be local or it can be located on HDFS. It overrides the job_runner method to setup properly the python executable path (so no shebang modification is needed in this case). It references a small custom runner class that changes the default behavior of DefaultHadoopJobRunner to pass the appropriate parameter.