5 Stimmen

Wenn Sie Hadoop zum ersten Mal verwenden, läuft der MapReduce-Auftrag nicht in der Reduce-Phase

Ich schrieb einen einfachen Map-Reduce-Auftrag, der Daten aus dem DFS einlesen und einen einfachen Algorithmus darauf anwenden sollte. Als ich versuchte, ihn zu debuggen, beschloss ich, die Mapper einfach einen einzigen Satz von Schlüsseln und Werten ausgeben zu lassen, während die Reducer einen völlig anderen Satz ausgeben. Ich lasse diesen Auftrag auf einem Hadoop 20.2-Cluster mit einem einzigen Knoten laufen. Nach Beendigung des Auftrags enthält die Ausgabe lediglich die Werte, die von den Mappern ausgegeben wurden, was mich zu der Annahme veranlasst, dass der Reducer nicht ausgeführt wird. Ich wäre sehr dankbar, wenn mir jemand einen Einblick geben könnte, warum mein Code eine solche Ausgabe erzeugt. Ich habe versucht, die outputKeyClass und outputValueClass auf verschiedene Dinge sowie die setMapOutputKeyClass und setMapOutputValueClass auf verschiedene Dinge zu setzen. Derzeit sind die kommentierten Codeabschnitte der Algorithmus, den ich ausführe, aber ich habe die Map- und Reduce-Methoden geändert, um einfach bestimmte Werte auszugeben. Auch hier enthält die Ausgabe des Auftrags nur die Werte, die vom Mapper ausgegeben wurden. Hier ist die Klasse, die ich zur Ausführung des Auftrags verwendet habe:

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class CalculateHistogram {

    public static class HistogramMap extends Mapper<LongWritable, Text, LongWritable, Text> {

        private static final int R = 100;
        private int n = 0;

        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            if (n == 0) {
                StringTokenizer tokens = new StringTokenizer(value.toString(), ",");
                int counter = 0;
                while (tokens.hasMoreTokens()) {
                    String token = tokens.nextToken();
                    if (tokens.hasMoreTokens()) {
                        context.write(new LongWritable(-2), new Text("HI"));
                        //context.write(new LongWritable(counter), new Text(token));
                    }
                    counter++;
                    n++;
                }
            } else {
                n++;
                if (n == R) {
                    n = 0;
                }

            }
        }
    }

    public static class HistogramReduce extends Reducer<LongWritable, Text, LongWritable, HistogramBucket> {

        private final static int R = 10;

        public void reduce(LongWritable key, Iterator<Text> values, Context context)
                                            throws IOException, InterruptedException {
            if (key.toString().equals("-1")) {
                //context.write(key, new HistogramBucket(key));
            }
            Text t = values.next();
            for (char c : t.toString().toCharArray()) {
                if (!Character.isDigit(c) && c != '.') {
                    //context.write(key, new HistogramBucket(key));//if this isnt a numerical attribute we ignore it
                }
            }
            context.setStatus("Building Histogram");
            HistogramBucket i = new HistogramBucket(key);
            i.add(new DoubleWritable(Double.parseDouble(t.toString())));
            while (values.hasNext()) {
                for (int j = 0; j < R; j++) {
                    t = values.next();
                }
                if (!i.contains(Double.parseDouble(t.toString()))) {
                    context.setStatus("Writing a value to the Histogram");
                    i.add(new DoubleWritable(Double.parseDouble(t.toString())));
                }
            }

            context.write(new LongWritable(55555555), new HistogramBucket(new LongWritable(55555555)));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }

        Job job = new Job(conf, "MRDT - Generate Histogram");
        job.setJarByClass(CalculateHistogram.class);
        job.setMapperClass(HistogramMap.class);
        job.setReducerClass(HistogramReduce.class);

        //job.setOutputValueClass(HistogramBucket.class);

        //job.setMapOutputKeyClass(LongWritable.class);
        //job.setMapOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

11voto

Helmut Zechmann Punkte 894

Die Signatur Ihrer reduce-Methode ist falsch. Ihre Methodensignatur enthält Iterator<Text> . Sie müssen eine Iterable<Text> .

Ihr Code überschreibt nicht die reduce Methode der Reducer Basisklasse. Aus diesem Grund ist die Standardimplementierung, die von der Reducer Basisklasse verwendet wird. Diese Implementierung ist eine Identitätsfunktion.

Verwenden Sie die @Override um Fehler wie diesen zu vermeiden.

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X