P2P-MapReduce

Introduction

MapReduce is a programming model widely used in data centers for processing large data sets in a highly parallel way. Current MapReduce systems are based on master-slave architectures that do not cope well with dynamic node participation, since they are mostly designed for conventional parallel computing platforms. On the contrary, in dynamic computing environments, node churn and failures – including master failures – are likely to happen since nodes join and leave the network at an unpredictable rate. P2P-MapReduce is a framework developed at the University of Calabria that exploits a peer-to-peer model to manage intermittent node participation, master failures and job recovery in a decentralized but effective way, so as to provide a more robust MapReduce middleware that can be effectively used in dynamic computing environments.

The P2P-MapReduce framework is currently available as an open-source research prototype. It employs the JXTA framework for managing the peer-to-peer network, and code from the Apache Hadoop project for managing the MapReduce computations.

Installation

Distribution file

The distribution file of P2P-MapReduce 1.0 contains:

  • The core library of the P2P-MapReduce framework: P2PMapReduce.jar
  • All the required external libraries inside the ‘lib’ directory:
Library Name Description Dependencies
JXSE_2.7.jar JXTA 2.7 core library http://jxta.kenai.com/ bcprov-jdk16-145.jar; h2-1.2.127.jar; org.mortbay.jetty.jar; netty-3.1.5.GA.jar; httptunnel-0.92.jar
bcprov-jdk16-145.jar Used by JXTA for encryption
h2-1.2.127.jar Used by JXTA for H2 implementation of the cache manager (Cm)
org.mortbay.jetty.jar Required by JXTA HTTP transport javax.servlet.jar
javax.servlet.jar Required by Jetty
netty-3.1.5.GA.jar Required by JXTA TCP transport
httptunnel-0.92.jar Required by JXTA
commons-net-ftp-2.0.jar Used for the implementation of a FTP client
ftplet-api-1.0.5.jar FTP Server API
ftpserver-core-1.0.5.jar FTP Server API implementation mina-core-2.0.0-RC1.jar; slf4j-api-1.5.2.jar
mina-core-2.0.0-RC1.jar mina-core-2.0.0-RC1.jar; slf4j-api-1.5.2.jar
slf4j-api-1.5.2.jar slf4j-log4j12-1.5.2.jar
slf4j-log4j12-1.5.2.jar log4j-1.2.14.jar
log4j-1.2.14.jar
  • Configuration files inside the ‘conf’ directory:
    • P2PMapReduce.properties: A property file for specifying some configuration parameters of P2P-MapReduce
    • user.properties: FTP user configurations
  • Binary files:
    • startNode.sh: Simple bash script to start a new P2P-MapReduce node on a Unix/Linux platform

How to install

To install the P2P-MapReduce framework just extract P2P-MapReduce-1.0.tar.gz into a directory of your choice, calling it <installation_dir>. The directory structure will be as follows:

  • <installation_dir>
    • P2PMapReduce.jar
    • lib/
      • <all the required external libraries>
    • conf/
      • <all the configuration files>
    • src/
      • <all the source files>
    • doc/
      • <Javadoc files>
    • startNode.sh

Run a computing node

All the configurable framework parameters can be modified through the <conf/P2PMapReduce.properties> file. The most important parameters are:

Key Description Default value
direct.find.waiting.time Time (seconds) to wait for receiving query responses from the network. 3
peridoc.publish.timeout Time interval (sec.) to wait before publishing an updated version of a resource in the network 60
adv.expiration.time Maximum time period (sec.) for considering a discovered resource still available. 120
coordinator.master.ratio Master/Slave ratio. Values accepted are those in the interval [0,1). Special value 0 indicates one master only. Values between 0 and 1 indicate the desired ratio between masters and slaves 0.2
job.backupmanager.required The desired number of backup masters for a single job. All positive values, 0 included, are valid. Special value -1 indicates that all the available masters must be used as backup masters 2
mapreduce.job.inputsplit.size The minimum input split size for MapReduce files (MBytes) 1
gui True if you want a GUI interface true
logging.level P2P-MapReduce framework logging level (see java.util.logging package) WARNING

Start a node

To make the P2P-MapRedcuce system work, at least two node have to been stared in order to have at least one master node and one slave node. To start a P2P-MapReduce node, just execute the script startNode.sh available in <installation_dir>.

Node GUI

When a node is started with parameter gui=true a graphical interface is started:

p2p-mapreduce-fig1-small.png

Figure 1

The windows title indicates the node type (master (M) or slave (S)) and node ID (e.g., D39F03). The window includes three panel: Net, Exec and Log. The Log panel shows the application log output. The Net panel (shown in Figure 1) shows network information. The upper side shows JXTA network information like GroupID, PeerID and CoordinatorID. In the middle two lists are shown, which respectively represents all the slave nodes and all the masters nodes known. Each list entry shows some node information:

  • node ID
  • node load
  • node information version number

The Exec panel is made by three lists showing a summary about the job or tasks managed by the node. If a node is a master (Figure 2) it will show the jobs managed as the primary master and those managed as a backup master. For each of them, some summary information is shown:

 

  • job ID
  • job state
  • completion status (percentage) of the map and reduce phases

p2p-mapreduce-fig2-small.png

Figure 2

By double-clicking on one of the list entries, an information window associated with the job will be shown (Figure 3). The information includes:

  • Long version of the job ID
  • ID of the primary master associtated with the job
  • number of tasks submitted (in execution, failed and successful completed)
  • list of all the backup masters for the job
  • list of the tasks submitted for the job, chronologically ordered

p2p-mapreduce-fig3-small.png

Figure 3

If a node is a slave (Figure 4) the list of all managed tasks will be shown.

p2p-mapreduce-fig4-small.png

Figure 4

Job submission

When at least one master node is available in the network it is possible to submit a new job. To submit a job, you must write you own application that configures the job and submits it.
As an example, the canonical WordCount application will be considered.

Application environment setup

In order to correctly run your application, you must setup the application environment. First, theP2PMapReduce.jar file and all the libraries inside the ‘lib’ directory must be added to your application class path. Second, the ‘conf’ directory must be copied in your application working directory, calling it<application_working_dir>.

Writing Mapper and Reducer classes

To specify map and reduce functions you need to write two classes that implement respectively the Mapper and the Reducer abstract classes, as in Apache Hadoop. 
In the following, we report the code of the map and reduce classes used for the WordCount example, WordCountMapper and WordCountReducer:

public class WordCountMapper extends
        Mapper<Long, String, StringWritable, IntWritable> {
    private static final String separator = " \n\t\r\b\f\"\'\\.,;:'()-/[]{}+*=!?<>&|%";

    private final static IntWritable ONE = new IntWritable(1);

    @Override
    public void map(Long key, String value, Context context)
            throws IOException, InterruptedException {
        StringTokenizer tokenizer = new StringTokenizer(value, separator);
        while (tokenizer.hasMoreTokens()) {
            StringWritable word = new StringWritable(tokenizer.nextToken());
            context.write(word, ONE);
        }
    }
}

public class WordCountReducer extends
        Reducer<StringWritable, IntWritable, StringWritable, IntWritable> {
    @Override
    public void reduce(StringWritable key, Iterable values,
            Context context) throws IOException, InterruptedException {
        int sum = 0;
        Iterator it = values.iterator();
        while (it.hasNext()) {
            IntWritable value = it.next();
            sum += value.getValue();
        }
        context.write(key, new IntWritable(sum));
    }
}

Once you have written the map and reduce classes, you must package them into a single jar file, calling it <job.jar>, together with all the other classes they require.

Job configuration

The configuration of a MapReduce job is very similar to the Apache Hadoop Job configuration, except that some different classes must be used instead. This is an example of a job configuration:

Job job = new Job(); 
job.setJobName("Word Count");
job.setInputFormatClass(TextFileInputFormat.class);
job.setOutputFormatClass(TextFileOutputFormat.class);

TextFileInputFormat.addLocalInputPath(job, "<path in local fs to input file or directory>"); 

job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(StringWritable.class);
job.setMapOutputValueClass(IntWritable.class);

job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(StringWritable.class);
job.setOutputValueClass(IntWritable.class);

job.setJar("<path in local fs to <job.jar> file>");

job.setNumReduceTasks(3);

Instead the hadoop TextInputFormat and TextOutputFormat we have TextFileInputFormat andTextFileOutputFormat. In particular, the TextFileInputFormat.addLocalInputPath method must be used to provide the path, on the local file system, to the job input files or directories.

Submit a job

Differently from Hadoop, to submit a Job you must create an UserNode instance. 
This is the simple code that you must write to submit a Job through a UserNode:

UserNode userNode = UserNode.createUserNode();
userNode.submit(job);

Otherwise you can use the UserNode.submit() method that take a Runnable object as second parameter:

UserNode userNode = UserNode.createUserNode();
Runnable actionAtCompletion = <runnable creation code>
userNode.submit(job,actionAtCompletion);

This allows you to specify an action that must be executed after the job has been successfully completed. In this way you can also construct a simple chain of jobs by submitting the next job with all the previous job output files as input files, like this:

UserNode userNode = UserNode.createUserNode();
Job firstJob = <firstJob creation code>;
Runnable firstJobCompletionAction = new Runnable(){
    public void run() {
        Job secondJob = <second_job_creation_code>;
        FileSystemView fsv = userNode.getFileSystemView();
        File jobOutputDir = fsv.createFile(firstJob.getConfiguration().get(MRJobConfig.JOB_OUTPUT_DIR_ATTR));
        for (File outputFile : jobOutputDir.listFiles()) {
            TextFileInputFormat.addLocalInputPath(secondJob, outputFile.getPath());
        }
        Runnable secondJobCompletionAction = <runnable creation code>;
        userNode.submit(job, secondJobCompletionAction);
    }
}
userNode.submit(job, firstJobCompletionAction);

Job execution

During job execution, the user node receives updates by the primary master and by the network module in case of node failures. On each reduce task completion, the user node is notified by the primary master and starts the retrieval of the reduce output in a local output directory named like this: “<application_working_dir>/ftp/<job_id>/output”
If the job is successfully completed, all the reduce output files will be located in this directory.

Download

The current P2P-MapReduce package (1.0, dated 26th of September 2011) can be downloaded here:

  • P2P-MapReduce framework

Copyright (C) 2008-2011 University of Calabria – Dept. of Electronics, Computer Science and Systems

How to cite

F. Marozzo, D. Talia, P. Trunfio, "P2P-MapReduce: Parallel data processing in dynamic Cloud environments". Journal of Computer and System Sciences, vol. 78, n. 5, pp. 1382--1402, Elsevier Science, 2012.

People

This project was designed and developed by Francesco De Luca, Fabrizio Marozzo, Domenico Talia and Paolo Trunfio.

For comments and suggestions please use the form below.