7 Stimmen

Mehrere Textzeilen in einer einzigen Karte

Ich habe versucht, Hadoop zu verwenden, um N Zeilen an ein einzelnes Mapping zu senden. Es ist nicht erforderlich, dass die Zeilen bereits aufgeteilt sind.

Ich habe versucht, NLineInputFormat zu verwenden, aber das sendet N Zeilen Text aus den Daten zu jedem Mapper eine Zeile zu einer Zeit [aufgeben nach der N-ten Zeile].

Ich habe versucht, die Option einzustellen, und es werden nur N Zeilen an Eingaben angenommen, die jeweils um 1 Zeile an jede Karte gesendet werden:

    job.setInt("mapred.line.input.format.linespermap", 10);

Ich habe eine Mailingliste gefunden, die mir empfiehlt, LineRecordReader::next zu überschreiben, aber das ist nicht so einfach, da die internen Datenelemente alle privat sind.

Ich habe gerade die Quelle für NLineInputFormat überprüft und es hart codiert LineReader, so überschreiben wird nicht helfen.

Außerdem verwende ich Hadoop 0.18 für die Kompatibilität mit dem Amazon EC2 MapReduce.

7voto

Peter Wippermann Punkte 3793

Sie müssen Ihr eigenes Eingabeformat zu implementieren . Sie haben dann auch die Möglichkeit, Ihren eigenen Datensatzleser zu definieren.

Leider müssen Sie eine getSplits()-Methode definieren. Meiner Meinung nach wird dies schwieriger sein als die Implementierung des Datensatzlesers: Diese Methode muss eine Logik zur Zerlegung der Eingabedaten implementieren.

Siehe den folgenden Auszug aus "Hadoop - The definitive guide" (ein großartiges Buch, das ich immer empfehlen kann!):

Hier ist die Schnittstelle:

public interface InputFormat<K, V> {
  InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
  RecordReader<K, V> getRecordReader(InputSplit split,
                                     JobConf job, 
                                     Reporter reporter) throws IOException;
}

Der JobClient ruft die Methode getSplits() auf und übergibt die gewünschte Anzahl von Map Tasks als Argument numSplits. Diese Zahl ist als Hinweis zu verstehen, da InputFormat-Implemen- tationen InputFormat-Implementierungen können eine andere Anzahl von Splits zurückgeben als in numSplits. Nachdem der Client die Splits berechnet hat, sendet er sie an den Jobtracker, der Der Jobtracker verwendet ihre Speicherorte, um Map-Tasks zu planen, die sie auf den Tasktrackern verarbeiten.

Auf einem Tasktracker übergibt die Map-Task den Split an die Methode getRecordReader() auf InputFormat, um einen RecordReader für diesen Split zu erhalten. Ein RecordReader ist wenig mehr als ein Iterator über Datensätze, und die Map-Aufgabe verwendet einen, um Schlüssel-Wert-Paare für Datensätze zu erzeugen, die sie an die map-Funktion weitergibt. Ein Codeschnipsel (basierend auf dem Code in MapRunner) veranschaulicht die Idee:

K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
  mapper.map(key, value, output, reporter);
}

3voto

Matt Fortier Punkte 1163

Ich habe dieses Problem vor kurzem gelöst, indem ich einfach mein eigenes InputFormat erstellt habe, das NLineInputFormat überschreibt und einen benutzerdefinierten MultiLineRecordReader anstelle des Standard-LineReader implementiert.

Ich habe mich für die Erweiterung von NLineInputFormat entschieden, weil ich die gleiche Garantie haben wollte, dass genau N Zeilen pro Split vorhanden sind.

Dieser Rekordleser ist fast unverändert übernommen von http://bigdatacircus.com/2012/08/01/wordcount-with-custom-record-reader-of-textinputformat/

Das einzige, was ich geändert habe, ist die Eigenschaft für maxLineLength die jetzt die neue API verwendet, und der Wert für NLINESTOPROCESS die aus dem NLineInputFormat gelesen wird setNumLinesPerSplit() anstatt fest kodiert zu sein (für mehr Flexibilität).

Hier ist das Ergebnis:

public class MultiLineInputFormat extends NLineInputFormat{
    @Override
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit genericSplit, TaskAttemptContext context) {
        context.setStatus(genericSplit.toString());
        return new MultiLineRecordReader();
    }

    public static class MultiLineRecordReader extends RecordReader<LongWritable, Text>{
        private int NLINESTOPROCESS;
        private LineReader in;
        private LongWritable key;
        private Text value = new Text();
        private long start =0;
        private long end =0;
        private long pos =0;
        private int maxLineLength;

        @Override
        public void close() throws IOException {
            if (in != null) {
                in.close();
            }
        }

        @Override
        public LongWritable getCurrentKey() throws IOException,InterruptedException {
            return key;
        }

        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
            return value;
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            if (start == end) {
                return 0.0f;
            }
            else {
                return Math.min(1.0f, (pos - start) / (float)(end - start));
            }
        }

        @Override
        public void initialize(InputSplit genericSplit, TaskAttemptContext context)throws IOException, InterruptedException {
            NLINESTOPROCESS = getNumLinesPerSplit(context);
            FileSplit split = (FileSplit) genericSplit;
            final Path file = split.getPath();
            Configuration conf = context.getConfiguration();
            this.maxLineLength = conf.getInt("mapreduce.input.linerecordreader.line.maxlength",Integer.MAX_VALUE);
            FileSystem fs = file.getFileSystem(conf);
            start = split.getStart();
            end= start + split.getLength();
            boolean skipFirstLine = false;
            FSDataInputStream filein = fs.open(split.getPath());

            if (start != 0){
                skipFirstLine = true;
                --start;
                filein.seek(start);
            }
            in = new LineReader(filein,conf);
            if(skipFirstLine){
                start += in.readLine(new Text(),0,(int)Math.min((long)Integer.MAX_VALUE, end - start));
            }
            this.pos = start;
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            if (key == null) {
                key = new LongWritable();
            }
            key.set(pos);
            if (value == null) {
                value = new Text();
            }
            value.clear();
            final Text endline = new Text("\n");
            int newSize = 0;
            for(int i=0;i<NLINESTOPROCESS;i++){
                Text v = new Text();
                while (pos < end) {
                    newSize = in.readLine(v, maxLineLength,Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),maxLineLength));
                    value.append(v.getBytes(),0, v.getLength());
                    value.append(endline.getBytes(),0, endline.getLength());
                    if (newSize == 0) {
                        break;
                    }
                    pos += newSize;
                    if (newSize < maxLineLength) {
                        break;
                    }
                }
            }
            if (newSize == 0) {
                key = null;
                value = null;
                return false;
            } else {
                return true;
            }
        }
    }

}

1voto

kawaa Punkte 11

Ich denke, dass Sie in Ihrem Fall dem Delegationsmuster folgen und einen Wrapper um LineRecordReader implementieren können, der die erforderlichen Methoden überschreibt, z. B. next() (oder nextKeyValue() in der neuen API), um den Wert auf eine Verkettung von N Zeilen statt auf eine Zeile zu setzen.

Ich habe die beispielhafte Implementierung von ParagraphRecordReader gegoogelt, die LineRecordReader verwendet, um Eingabedaten Zeile für Zeile zu lesen (und zu verketten), bis sie entweder auf EOF oder eine Leerzeile stoßen. Dann wird ein Paar zurückgegeben, dessen Wert ein Absatz (statt einer Zeile) ist. Außerdem ist das ParagraphInputFormat für diesen ParagraphRecordReader so einfach wie das Standard-TextInputFormat.

Die notwendigen Links zu dieser Implementierung und ein paar Worte dazu finden Sie im folgenden Beitrag: http://hadoop-mapreduce.blogspot.com/2011/03/little-more-complicated-recordreaders.html .

Am besten

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X