Using Amazon’s Elastic Map Reduce to compute recommendations with Apache Mahout 0.8

18 Flares 18 Flares ×

Apache Mahout is a “scalable machine learning library” which, among others, contains implementations of various single-node and distributed recommendation algorithms. In my last blog post I described how to implement an on-line recommender system processing data on a single node. What if the data is too large to fit into memory (>100M preference data points)? Then we have no choice, but to take a look at Mahout’s distributed recommenders implementation!

The distributed recommender is based on Apache Hadoop; it’s a job which takes as input a list of user preferences, computes an item co-occurence matrix, and outputs top-K recommendations for each user. For an introductory blog on how this works and how to run it locally, see for example this blog post.

We can of course run this job on a custom Hadoop cluster, but it’s much faster (and less painful) to just use a pre-configured one, like EMR. However, there’s a slight problem. The latest Hadoop version that is available on EMR is 1.0.3, and it contains jars for Apache Lucene 2.9.4. However, the recommender job depends on Lucene 4.3.0, which results in the following beautiful stack trace:

1
2
3
4
5
6
7
8
9
10
11
12
2013-10-04 11:05:03,921 FATAL org.apache.hadoop.mapred.Child (main): Error running child : java.lang.NoSuchMethodError: org.apache.lucene.util.PriorityQueue.<init>(I)V
  at org.apache.mahout.math.hadoop.similarity.cooccurrence.TopElementsQueue.<init>(TopElementsQueue.java:33)
	at org.apache.mahout.math.hadoop.similarity.cooccurrence.RowSimilarityJob$UnsymmetrifyMapper.map(RowSimilarityJob.java:405)
	at org.apache.mahout.math.hadoop.similarity.cooccurrence.RowSimilarityJob$UnsymmetrifyMapper.map(RowSimilarityJob.java:389)
	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:771)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
	at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:415)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1132)
	at org.apache.hadoop.mapred.Child.main(Child.java:249)

How to solve this? Well, we “just” need to update Lucene in the EMR Hadoop installation. We can use a bootstrap action for that. Here are the exact steps:

1. Download lucene-4.3.0.tgz (e.g. from here) and upload it into a S3 bucket; make the file public.

2. Upload this script to the bucket as well; call it e.g. update-lucene.sh:

1
2
3
4
5
6
7
8
9
#!/bin/bash
cd /home/hadoop
wget https://s3.amazonaws.com/bucket_name/bucket_path/lucene-4.3.0.tgz
tar -xzf lucene-4.3.0.tgz
cd lib
rm lucene-*.jar
cd ..
cd lucene-4.3.0
find . | grep lucene- | grep jar$ | xargs -I {} cp {} ../lib

This script will be run on the Hadoop nodes and will update the Lucene version. Make sure to change the script and enter the correct bucket name and bucket path, so that it points to the public Lucene archive.

3. Upload mahout-core-0.8-job.jar to the bucket

4. Finally, we need to upload the input data into S3. Output data will be saved on S3 as well.

5. Now we can start setting up the EMR job flow. Go to the EMR page on Amazon’s console, and start creating a new job flow. We’ll be using the “Amazon Distribution” Hadoop version and using a “Custom JAR” as the job type.

2013-10-15_1230

6. The “JAR location” must point to the place where we’ve uploaded the Mahout jar, e.g. s3n://bucket_name/bucket_path/mahout-0.8-job.jar (make sure to change this to point to the real bucket!). As for the jar arguments, we’ll be running the RecommenderJob and using the log-likelihood similarity:

1
2
3
4
5
org.apache.mahout.cf.taste.hadoop.item.RecommenderJob 
--booleanData 
--similarityClassname SIMILARITY_LOGLIKELIHOOD 
--output s3n://bucket_name/output 
--input s3n://bucket_name/input.dat

That’s also the place to specify where the input data on S3 is, and where output should be written.

2013-10-15_1232

7. Then we can choose how many machines we want to use. This depends of course on the size of the input data and how fast you want the results. The main thing to change here is the “core instance group” count. 2 is a reasonable default for testing.

2013-10-15_1236

8. We can leave the advanced options as-is

9. Now we get to one of the more important steps: setting up bootstrap actions. We’ll need to setup two:

  • a Memory Intenstive Configuration (otherwise you’ll see an OOM quickly)
  • our custom update-lucene action (the path should point to S3, e.g. s3://bucket_name/bucket_path/update-lucene.sh)

2013-10-15_1240

And that’s it! You can now create and run the job flow, and after a couple of minutes/hours/days you’ll have the results waiting on S3.

Adam

  • Guillaume Pitel

    Funny that you consider that 100M data points do not fit into memory. Plus even for huge datasets, it doesn’t HAVE to fit into memory, you can just stream them from disk just like hadoop does.

    With a reasonable workstation, one can deal with multi-billion datapoints easily, and one certainly shouldn’t use mahout for less than a few trillion datapoints.

  • http://www.warski.org/ Adam Warski

    These numbers come both from my tests, and from the “Mahout in Action” book (intro to chapter 6). I’m not saying that it’s not possible to do recommendations for such a dataset using one machine, just not with Mahout (a very interesting recent example of the streaming approach you are writing about is http://www.slideshare.net/akyrola/largescale-recommendation-systems-on-just-a-pc). This also depends on what algorithm you are using (is it neighbourhood-based user-user recommendations, item-based, etc.).

    However, I am not familiar with a widely-used open-source recommendation library which would implement the disk-streaming scenario. So any pointers appreciated :)

  • Guillaume Pitel

    Mahout is a wonderful piece of work, but the use cases that are advertised for it have to be taken with a grain of salt. 100M data point are, if we take the very worst case, (64bits indexes and 64 FP values, COO storage) 2.4GBytes of RAM. It’s 1.2GB if you use 32bits indexes and floats (way enough for this kind of stuff), and likely about 500MB with a proper CSR or CSC storage.

    For the “streaming” stuff, even a basic swapping mecanism is enough to take care of this if your access pattern is the right one. But if you don’t want the OS to care care of it (and you’d be very right), SciKit-learn (I think they use numpy for it) or GraphChi can probably work with hundred of Gigabytes of datapoints.

  • http://www.warski.org/ Adam Warski

    Well that depends how much RAM you have ;) Sure, having 10s of GB isn’t uncommon, but there’s a point where this stops scaling (sooner rather than later, as you need twice the amount to refresh the recommender while still keeping the old one).

    Anyway, RAM isn’t the only problem; response times are a problem as well – the more data you have, the slower the recommendations are going to be (when using the single-node Mahout recommenders). For an on-line system there’s some threshold above which the requests simply take too long.

    Pre-computing might be a good idea in such a case; and that’s exactly what the distributed algorithms do.

    The presentation that I linked earlier is about GraphChi, and that’s what I’m going to explore next :)

  • neha khandelwal

    http://www.youtube.com/watch?v=DNUliYXrSZo According to me for more and clear information one should go on this link and should see the video .

    this video gives you the clarity on mahout

18 Flares Twitter 11 Facebook 3 Google+ 1 LinkedIn 3 Email -- 18 Flares ×