Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Should I learn/use MapReduce, or some other type of parallelization for this task?

After talking with a friend of mine from Google, I'd like to implement some kind of Job/Worker model for updating my dataset.

This dataset mirrors a 3rd party service's data, so, to do the update, I need to make several remote calls to their API. I think a lot of time will be spent waiting for responses from this 3rd party service. I'd like to speed things up, and make better use of my compute hours, by parallelizing these requests and keeping many of them open at once, as they wait for their individual responses.

Before I explain my specific dataset and get into the problem, I'd like to clarify what answers I'm looking for:

  1. Is this a flow that would be well suited to parallelizing with MapReduce?
  2. If yes, would this be cost effective to run on Amazon's mapreduce module, which bills by the hour, and rounds hour's up when the job is complete? (I'm not sure exactly what counts as a "Job", so I don't know exactly how I'll be billed)
  3. If no, Is there another system/pattern I should use? and Is there a library that will help me do this in python (On AWS, usign EC2 + EBS)?
  4. Are there any problems you see with the way I've designed this job flow?

Ok, now onto the details:

The dataset consists of users who have favorite items and who follow other users. The aim is to be able to update each user's queue -- the list of items the user will see when they load the page, based on the favorite items of the users she follows. But, before I can crunch the data and update a user's queue, I need to make sure I have the most up-to-date data, which is where the API calls come in.

There are two calls I can make:

  • Get Followed Users -- Which returns all the users being followed by the requested user, and
  • Get Favorite Items -- Which returns all the favorite items of the requested user.

After I call get followed users for the user being updated, I need to update the favorite items for each user being followed. Only when all of the favorites are returned for all the users being followed can I start processing the queue for that original user. This flow looks like:

Updating UserX's Queue

Jobs in this flow include:

  • Start Updating Queue for user -- kicks off the process by fetching the users followed by the user being updated, storing them, and then creating Get Favorites jobs for each user.
  • Get Favorites for user -- Requests, and stores, a list of favorites for the specified user, from the 3rd party service.
  • Calculate New Queue for user -- Processes a new queue, now that all the data has been fetched, and then stores the results in a cache which is used by the application layer.

So, again, my questions are:

  1. Is this a flow that would be well suited to parallelizing with MapReduce? I don't know if it would let me start the process for UserX, fetch all the related data, and come back to processing UserX's queue only after that's all done.
  2. If yes, would this be cost effective to run on Amazon's mapreduce module, which bills by the hour, and rounds hour's up when the job is complete? Is there a limit on how many "threads" I can have waiting on open API requests if I use their module?
  3. If no, Is there another system/pattern I should use? and Is there a library that will help me do this in python (On AWS, usign EC2 + EBS?)?
  4. Are there any problems you see with the way I've designed this job flow?

Thanks for reading, I'm looking forward to some discussion with you all.

Edit, in response to JimR:

Thanks for a solid reply. In my reading since I wrote the original question, I've leaned away from using MapReduce. I haven't decided for sure yet how I want to build this, but I'm beginning to feel MapReduce is better for distributing / parallelizing computing load when I'm really just looking to parallelize HTTP requests.

What would have been my "reduce" task, the part that takes all the fetched data and crunches it into results, isn't that computationally intensive. I'm pretty sure it's going to wind up being one big SQL query that executes for a second or two per user.

So, what I'm leaning towards is:

  • A non-MapReduce Job/Worker model, written in Python. A google friend of mine turned me onto learning Python for this, since it's low overhead and scales well.
  • Using Amazon EC2 as a compute layer. I think this means I also need an EBS slice to store my database.
  • Possibly using Amazon's Simple Message queue thingy. It sounds like this 3rd amazon widget is designed to keep track of job queues, move results from one task into the inputs of another and gracefully handle failed tasks. It's very cheap. May be worth implementing instead of a custom job-queue system.
like image 330
Jordan Feldstein Avatar asked Nov 21 '10 03:11

Jordan Feldstein


Video Answer


1 Answers

The work you describe is probably a good fit for either a queue, or a combination of a queue and job server. It certainly could work as a set of MapReduce steps as well.

For a job server, I recommend looking at Gearman. The documentation isn't awesome, but the presentations do a great job documenting it, and the Python module is fairly self-explanatory too.

Basically, you create functions in the job server, and these functions get called by clients via an API. The functions can be called either synchronously or asynchronously. In your example, you probably want to asynchronously add the "Start update" job. That will do whatever preparatory tasks, and then asynchronously call the "Get followed users" job. That job will fetch the users, and then call the "Update followed users" job. That will submit all the "Get Favourites for UserA" and friend jobs together in one go, and synchronously wait for the result of all of them. When they have all returned, it will call the "Calculate new queue" job.

This job-server-only approach will initially be a bit less robust, since ensuring that you handle errors and any down servers and persistence properly is going to be fun.

For a queue, SQS is an obvious choice. It is rock-solid, and very quick to access from EC2, and cheap. And way easier to set up and maintain than other queues when you're just getting started.

Basically, you will put a message onto the queue, much like you would submit a job to the job server above, except you probably won't do anything synchronously. Instead of making the "Get Favourites For UserA" and so forth calls synchronously, you will make them asynchronously, and then have a message that says to check whether all of them are finished. You'll need some sort of persistence (a SQL database you're familiar with, or Amazon's SimpleDB if you want to go fully AWS) to track whether the work is done - you can't check on the progress of a job in SQS (although you can in other queues). The message that checks whether they are all finished will do the check - if they're not all finished, don't do anything, and then the message will be retried in a few minutes (based on the visibility_timeout). Otherwise, you can put the next message on the queue.

This queue-only approach should be robust, assuming you don't consume queue messages by mistake without doing the work. Making a mistake like that is hard to do with SQS - you really have to try. Don't use auto-consuming queues or protocols - if you error out, you might not be able to ensure that you put a replacement message back on the queue.

A combination of queue and job server may be useful in this case. You can get away with not having a persistence store to check job progress - the job server will allow you to track job progress. Your "get favourites for users" message could place all the "get favourites for UserA/B/C" jobs into the job server. Then, put a "check all favourites fetching done" message on the queue with a list of tasks that need to be complete (and enough information to restart any jobs that mysteriously disappear).

For bonus points:

Doing this as a MapReduce should be fairly easy.

Your first job's input will be a list of all your users. The map will take each user, get the followed users, and output lines for each user and their followed user:

"UserX" "UserA"
"UserX" "UserB"
"UserX" "UserC"

An identity reduce step will leave this unchanged. This will form the second job's input. The map for the second job will get the favourites for each line (you may want to use memcached to prevent fetching favourites for UserX/UserA combo and UserY/UserA via the API), and output a line for each favourite:

"UserX" "UserA" "Favourite1"
"UserX" "UserA" "Favourite2"
"UserX" "UserA" "Favourite3"
"UserX" "UserB" "Favourite4"

The reduce step for this job will convert this to:

 "UserX" [("UserA", "Favourite1"), ("UserA", "Favourite2"), ("UserA", "Favourite3"), ("UserB", "Favourite4")]

At this point, you might have another MapReduce job to update your database for each user with these values, or you might be able to use some of the Hadoop-related tools like Pig, Hive, and HBase to manage your database for you.

I'd recommend using Cloudera's Distribution for Hadoop's ec2 management commands to create and tear down your Hadoop cluster on EC2 (their AMIs have Python set up on them), and use something like Dumbo (on PyPI) to create your MapReduce jobs, since it allows you to test your MapReduce jobs on your local/dev machine without access to Hadoop.

Good luck!

like image 182
Neil Blakey-Milner Avatar answered Sep 19 '22 12:09

Neil Blakey-Milner