Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

The reduce fails due to Task attempt failed to report status for 600 seconds. Killing! Solution?

The reduce phase of the job fails with:

of failed Reduce Tasks exceeded allowed limit.

The reason why each task fails is:

Task attempt_201301251556_1637_r_000005_0 failed to report status for 600 seconds. Killing!

Problem in detail:

The Map phase takes in each record which is of the format: time, rid, data.

The data is of the format: data element, and its count.

eg: a,1 b,4 c,7 correseponds to the data of a record.

The mapper outputs for each data element the data for every record. eg:

key:(time, a,), val: (rid,data) key:(time, b,), val: (rid,data) key:(time, c,), val: (rid,data)

Every reduce receives all the data corresponding to same key from all the records. e.g: key:(time, a), val:(rid1, data) and key:(time, a), val:(rid2, data) reach the same reduce instance.

It does some processing here and outputs similar rids.

My program runs without trouble for a small dataset such as 10MB. But fails when the data increases to say 1G, with the above mentioned reason. I don't know why this happens. Please help!

Reduce code:

There are two classes below:

  • VCLReduce0Split
  • CoreSplit

a. VCLReduce0SPlit

public class VCLReduce0Split extends MapReduceBase implements Reducer<Text, Text, Text, Text>{
    //  @SuppressWarnings("unchecked")
        public void reduce (Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {

            String key_str = key.toString();
            StringTokenizer stk = new StringTokenizer(key_str);
            String t = stk.nextToken();

            HashMap<String, String> hmap = new HashMap<String, String>();

            while(values.hasNext())
            {
                StringBuffer sbuf1 = new StringBuffer(); 
                String val = values.next().toString();
                StringTokenizer st = new StringTokenizer(val);

                String uid = st.nextToken();

                String data = st.nextToken();

                     int total_size = 0;

                     StringTokenizer stx = new StringTokenizer(data,"|");

                     StringBuffer sbuf = new StringBuffer();

                     while(stx.hasMoreTokens())
                     {
                         String data_part = stx.nextToken();
                         String data_freq = stx.nextToken();

                    //   System.out.println("data_part:----->"+data_part+" data_freq:----->"+data_freq);
                         sbuf.append(data_part);
                         sbuf.append("|");
                         sbuf.append(data_freq);
                         sbuf.append("|");
                     }
                /*     
                     for(int i = 0; i<parts.length-1; i++)
                     {
                         System.out.println("data:--------------->"+data);
                         int part_size = Integer.parseInt(parts[i+1]);
                         sbuf.append(parts[i]);
                         sbuf.append("|");
                         sbuf.append(part_size);
                         sbuf.append("|");
                         total_size = part_size+total_size;
                         i++;
                     }*/

                sbuf1.append(String.valueOf(total_size));
                sbuf1.append(",");
                sbuf1.append(sbuf);
                if(uid.equals("203664471")){
                //  System.out.println("data:--------------------------->"+data+" tot_size:---->"+total_size+" sbuf:------->"+sbuf);
                }
                hmap.put(uid, sbuf1.toString());

            }

            float threshold = (float)0.8;

            CoreSplit obj = new CoreSplit();


            ArrayList<CustomMapSimilarity> al = obj.similarityCalculation(t, hmap, threshold);

            for(int i = 0; i<al.size(); i++)
            {
                CustomMapSimilarity cmaps = al.get(i);
                String xy_pair = cmaps.getRIDPair();
                String similarity = cmaps.getSimilarity();
                output.collect(new Text(xy_pair), new Text(similarity));
            }


         }
    }

b. coreSplit

package com.a;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.TreeMap;

import org.apache.commons.collections.map.MultiValueMap;

public class PPJoinPlusCoreOptNewSplit{


     public ArrayList<CustomMapSimilarity> similarityCalculation(String time, HashMap<String,String>hmap, float t)
     {

         ArrayList<CustomMapSimilarity> als = new ArrayList<CustomMapSimilarity>();
         ArrayList<CustomMapSimilarity> alsim = new ArrayList<CustomMapSimilarity>();

        Iterator<String> iter = hmap.keySet().iterator();

        MultiValueMap index = new MultiValueMap();

        String RID;
        TreeMap<String, Integer> hmap2;
        Iterator<String> iter1;

        int size;
        float prefix_size;
        HashMap<String, Float> alpha;
        HashMap<String, CustomMapOverlap> hmap_overlap;

        String data;

        while(iter.hasNext())
            {
                RID = (String)iter.next();

                String data_val = hmap.get(RID);

                StringTokenizer st = new StringTokenizer(data_val,",");
            //    System.out.println("data_val:--**********-->"+data_val+" RID:------------>"+RID+" time::---?"+time);
                String RIDsize = st.nextToken();
                size = Integer.parseInt(RIDsize);
                data = st.nextToken();


                StringTokenizer st1 = new StringTokenizer(data,"\\|");


                String[] parts = data.split("\\|");

            //  hmap2 = (TreeMap<String, Integer>)hmap.get(RID);
        //      iter1 = hmap2.keySet().iterator();

            //  size = hmap_size.get(RID);

                prefix_size = (float)(size-(0.8*size)+1); 

                if(size==1)
                {
                    prefix_size = 1;
                }

                alpha = new HashMap<String, Float>();

                hmap_overlap = new HashMap<String, CustomMapOverlap>();

        //      Iterator<String> iter2 = hmap2.keySet().iterator();

                int prefix_index = 0;

                int pi=0;

                for(float j = 0; j<=prefix_size; j++)
                {

                    boolean prefix_chk = false;
                    prefix_index++;
                    String ptoken = parts[pi];
            //      System.out.println("data:---->"+data+" ptoken:---->"+ptoken);
                    float val = Float.parseFloat(parts[pi+1]);
                    float temp_j = j;
                     j = j+val;
                     boolean j_l = false ;
                     float prefix_contri = 0;
                     pi= pi+2;

                     if(j>prefix_size)
                        {

                            // prefix_contri = j-temp_j;
                             prefix_contri = prefix_size-temp_j;

                            if(prefix_contri>0)
                            {
                                 j_l = true;
                                 prefix_chk = false;

                            }
                            else
                            {
                                prefix_chk = true;                              
                            }
                        }                   


                    if(prefix_chk == false){


                        filters(index, ptoken, RID, hmap,t, size, val, j_l, alpha, hmap_overlap, j, prefix_contri);


                    CustomMapPrefixTokens cmapt = new CustomMapPrefixTokens(RID,j);
                    index.put(ptoken, cmapt);

                }

            }


                als = calcSimilarity(time, RID, hmap, alpha, hmap_overlap);

                for(int i = 0; i<als.size(); i++)
                {
                    if(als.get(i).getRIDPair()!=null)
                    {
                        alsim.add(als.get(i));

                    }
                }

            }

         return alsim;

     }


     public void filters(MultiValueMap index, String ptoken, String RID, HashMap<String, String> hmap, float t, int size, float val, boolean j_l, HashMap<String, Float> alpha, HashMap<String, CustomMapOverlap> hmap_overlap, float j, float prefix_contri)
     {
            @SuppressWarnings("unchecked")

            ArrayList<CustomMapPrefixTokens> positions_list = (ArrayList<CustomMapPrefixTokens>) index.get(ptoken);

            if((positions_list!=null) &&(positions_list.size()!=0))
            {

                CustomMapPrefixTokens cmapt ;
                String y;
                Iterator<String> iter3;
                int y_size = 0;
                float check_size = 0;
            //  TreeMap<String, Integer> hmapy;
                float RID_val=0;
                float y_overlap = 0;
                float ubound = 0;
                ArrayList<Float> fl = new ArrayList<Float>();

              StringTokenizer st;

            for(int k = 0; k<positions_list.size(); k++)
            {
                cmapt = positions_list.get(k);

                if(!cmapt.getRID().equals(RID))
                {

                 y = hmap.get(cmapt.getRID());

                // iter3 = y.keySet().iterator();

                 String yRID = cmapt.getRID();

                 st = new StringTokenizer(y,",");

                 y_size = Integer.parseInt(st.nextToken());

                 check_size = (float)0.8*(size);

                if(y_size>=check_size)
                {

                    //hmapy = hmap.get(yRID);

                    String y_data = st.nextToken();

                    StringTokenizer st1 = new StringTokenizer(y_data,"\\|");


                    while(st1.hasMoreTokens())
                    {
                        String token = st1.nextToken();
                        if(token.equals(ptoken))
                        {

                            String nxt_token = st1.nextToken();
                    //      System.out.println("ydata:--->"+y_data+" nxt_token:--->"+nxt_token);
                            RID_val = (float)Integer.parseInt(nxt_token);
                            break;
                        }
                    }

                 //    RID_val = (float) hmapy.get(ptoken); 
                     float alpha1 = (float)(0.8/1.8)*(size+y_size);

                     fl = overlapCalc(alpha1, size, y_size, cmapt, j, alpha, j_l,RID_val,val,prefix_contri);

                     ubound = fl.get(0);
                     y_overlap = fl.get(1);


                    positionFilter(ubound, alpha1, cmapt, y_overlap, hmap_overlap);

                  }

                }   
            }
        }



     }


   public void positionFilter( float ubound,float alpha1, CustomMapPrefixTokens cmapt, float y_overlap, HashMap<String, CustomMapOverlap> hmap_overlap)
   {

     float y_overlap_total = 0;

            if(null!=hmap_overlap.get(cmapt.getRID()))
            {

            y_overlap_total = hmap_overlap.get(cmapt.getRID()).getOverlap();

            if((y_overlap_total+ubound)>=alpha1)
            {

                CustomMapOverlap cmap_tmp = hmap_overlap.get(cmapt.getRID());

                float y_o_t = y_overlap+y_overlap_total;

                cmap_tmp.setOverlap(y_o_t);
                hmap_overlap.put(cmapt.getRID(),cmap_tmp);

            }
            else
            {
                float n = 0;
                hmap_overlap.put(cmapt.getRID(), new CustomMapOverlap(cmapt.getRID(),n));
            }

            }
            else
            {
                CustomMapOverlap cmap_tmp = new CustomMapOverlap(cmapt.getRID(),y_overlap);
                hmap_overlap.put(cmapt.getRID(), cmap_tmp);

            }

   }

   public ArrayList<Float> overlapCalc(float alpha1, int size, int y_size, CustomMapPrefixTokens cmapt, float j, HashMap<String, Float> alpha, boolean j_l, float RID_val, float val, float prefix_contri )
   {

            alpha.put(cmapt.getRID(), alpha1);
            float min1 = y_size-cmapt.getPosition();
            float min2 = size-j;
            float min = 0;

            float y_overlap = 0;

            if(min1<min2)
            {
                min = min1;
            }
            else
            {
                min = min2;
            }
            if(j_l==true)
            {
                val = prefix_contri;    
            }                                       
            if(RID_val<val)
            {
                y_overlap = RID_val;
            }
            else
            {
                y_overlap = val;
            }

            float ubound = y_overlap+min;

            ArrayList<Float> fl = new ArrayList<Float>();
            fl.add(ubound);
            fl.add(y_overlap);

            return fl;

   }


     public ArrayList<CustomMapSimilarity> calcSimilarity( String time, String RID, HashMap<String,String> hmap , HashMap<String, Float> alpha, HashMap<String, CustomMapOverlap> hmap_overlap)
     {

         float jaccard = 0;

         CustomMapSimilarity cms = new CustomMapSimilarity(null, null);   
         ArrayList<CustomMapSimilarity> alsim = new ArrayList<CustomMapSimilarity>();

        Iterator<String> iter = hmap_overlap.keySet().iterator();

        while(iter.hasNext())
        {
            String key = (String)iter.next();

            CustomMapOverlap val = (CustomMapOverlap)hmap_overlap.get(key);

            float overlap = (float)val.getOverlap();

            if(overlap>0)
            {

               String yRID = val.getRID();

              String RIDpair = RID+" "+yRID;

             jaccard = unionIntersection(hmap, RIDpair);

             if(jaccard>0.8)
                {
                    cms = new CustomMapSimilarity(time+" "+RIDpair, String.valueOf(jaccard));
                    alsim.add(cms);
                }

            }

        }

         return alsim;

     }


     public float unionIntersection( HashMap<String,String> hmap, String RIDpair)
     {


            StringTokenizer st = new StringTokenizer(RIDpair);

            String xRID = st.nextToken();

            String yRID = st.nextToken();

            String xdata = hmap.get(xRID);

            String ydata = hmap.get(yRID);


            int total_union = 0;

            int xval = 0;
            int yval = 0;
            int part_union = 0;

            int total_intersect = 0;

        //  System.out.println("xdata:------*************>"+xdata);

            StringTokenizer xtokenizer = new StringTokenizer(xdata,",");
            StringTokenizer ytokenizer = new StringTokenizer(ydata,",");
        //  String[] xpart = xdata.split(",");
        //  String[] ypart = ydata.split(",");

            xtokenizer.nextToken();
            ytokenizer.nextToken();

            String datax = xtokenizer.nextToken();
            String datay = ytokenizer.nextToken();


            HashMap<String,Integer> x = new HashMap<String, Integer>();
            HashMap<String,Integer> y = new HashMap<String, Integer>();


            String [] xparts;

                 xparts = datax.toString().split("\\|");


              String [] yparts;

                 yparts = datay.toString().split("\\|");


                 for(int i = 0; i<xparts.length-1; i++)
                 {
                     int part_size = Integer.parseInt(xparts[i+1]);
                     x.put(xparts[i], part_size);

                     i++;
                 }

                 for(int i = 0; i<yparts.length-1; i++)
                 {
                     int part_size = Integer.parseInt(yparts[i+1]);
                     y.put(xparts[i], part_size);

                     i++;
                 }


             Set<String> xset = x.keySet();
             Set<String> yset = y.keySet();

            for(String elm:xset )
            {

                yval = 0;

                xval = (Integer)x.get(elm);

                part_union = 0;
                int part_intersect = 0;
                if(yset.contains(elm)){

                    yval = (Integer) y.get(elm);

                if(xval>yval)
                {
                    part_union = xval;
                    part_intersect = yval;
                }
                else
                {
                    part_union = yval;
                    part_intersect = xval;
                }
                total_intersect = total_intersect+part_intersect;
                }
                else
                {
                    part_union = xval;
                }

                total_union = total_union+part_union;


            }


            for(String elm: yset)
            {
                part_union = 0;

                if(!xset.contains(elm))
                {
                    part_union = (Integer) y.get(elm);
                    total_union = total_union+part_union;
                }

            }

            float jaccard = (float)total_intersect/total_union;

         return jaccard;

     }

}
like image 436
Mahalakshmi Lakshminarayanan Avatar asked Mar 07 '13 20:03

Mahalakshmi Lakshminarayanan


2 Answers

The reason for the timeouts might be a long-running computation in your reducer without reporting the progress back to the Hadoop framework. This can be resolved using different approaches:

I. Increasing the timeout in mapred-site.xml:

<property>
  <name>mapred.task.timeout</name>
  <value>1200000</value>
</property>

The default is 600000 ms = 600 seconds.

II. Reporting progress every x records as in the Reducer example in javadoc:

public void reduce(K key, Iterator<V> values,
                          OutputCollector<K, V> output, 
                          Reporter reporter) throws IOException {
   // report progress
   if ((noValues%10) == 0) {
     reporter.progress();
   }

   // ...
}

optionally you can increment a custom counter as in the example:

reporter.incrCounter(NUM_RECORDS, 1);
like image 129
harpun Avatar answered Oct 11 '22 01:10

harpun


It's possible that you might have consumed all of Java's heap space or GC is happening too frequently giving no chance to the reducer to report status to master and is hence killed.

Another possibility is that one of the reducer is getting too skewed data, i.e. for a particular rid, a lot of records are there.

Try to increase your java heap by setting the following config: mapred.child.java.opts

to

-Xmx2048m

Also, try and reduce the number of parallel reducers by setting the following config to a lower value than what it currently has (default value is 2):

mapred.tasktracker.reduce.tasks.maximum

like image 2
Amar Avatar answered Oct 11 '22 01:10

Amar