50 Stimmen

Einfaches Java Map/Reduce-Framework

Kann mir jemand einen Hinweis auf ein einfaches, quelloffenes Map/Reduce-Framework/API für Java geben? Es scheint nicht zu viel Beweise für eine solche Sache existieren, aber jemand anderes könnte anders wissen.

Das Beste, was ich finden kann, ist natürlich Hadoop MapReduce, aber das erfüllt nicht das Kriterium "einfach". Ich brauche nicht die Möglichkeit, verteilte Aufträge auszuführen, sondern nur etwas, mit dem ich Map/Reduce-ähnliche Aufträge auf einem Multi-Core-Rechner in einer einzigen JVM mit standardmäßiger Gleichzeitigkeit im Stil von Java5 ausführen kann.

Es ist nicht schwer, selbst etwas zu schreiben, aber ich möchte es lieber nicht tun.

18voto

chaostheory Punkte 1519

Haben Sie sich die Akka ? Obwohl akka eigentlich ein verteiltes, auf dem Actor-Modell basierendes Concurrency-Framework ist, kann man viele Dinge einfach mit wenig Code implementieren. Es ist einfach, die Arbeit in Teile aufzuteilen, und es nutzt automatisch die Vorteile einer Multi-Core-Maschine sowie die Möglichkeit, mehrere Maschinen für die Verarbeitung von Arbeit zu verwenden. Im Gegensatz zur Verwendung von Threads fühlt sich das für mich natürlicher an.

Ich habe eine Java map reduce Beispiel mit Akka. Es ist nicht das einfachste Map-Reduce-Beispiel, da es Futures verwendet, aber es sollte Ihnen eine ungefähre Vorstellung davon vermitteln, worum es geht. Es gibt mehrere wichtige Dinge, die mein Map-Reduce-Beispiel demonstriert:

  • Wie wird die Arbeit aufgeteilt?
  • Wie man die Arbeit zuweist: akka hat ein wirklich einfaches Nachrichtensystem sowie einen Arbeitsteiler, dessen Zeitplan man konfigurieren kann. Sobald ich gelernt hatte, wie man es benutzt, konnte ich nicht mehr aufhören. Es ist einfach so einfach und flexibel. Ich habe alle vier CPU-Kerne in kürzester Zeit genutzt. Das ist wirklich großartig für die Implementierung von Diensten.
  • Wie weiß man, wann die Arbeit erledigt ist und das Ergebnis verarbeitet werden kann? Dies ist der Teil, der am schwierigsten und verwirrendsten zu verstehen ist, wenn Sie nicht bereits mit Futures vertraut sind. Sie müssen nicht unbedingt Futures verwenden, es gibt auch andere Möglichkeiten. Ich habe sie nur verwendet, weil ich etwas kürzeres wollte, damit die Leute sich zurechtfinden.

Wenn Sie irgendwelche Fragen haben, StackOverflow hat tatsächlich eine awesome akka QA Abschnitt.

12voto

Lukas Eder Punkte 194234

Ich denke, es ist erwähnenswert, dass diese Probleme ab Java 8 der Vergangenheit angehören. Ein Beispiel:

int heaviestBlueBlock =
    blocks.filter(b -> b.getColor() == BLUE)
          .map(Block::getWeight)
          .reduce(0, Integer::max);

Mit anderen Worten: Single-Node MapReduce ist in Java 8 verfügbar .

Für weitere Einzelheiten siehe Brian Goetz' Präsentation über das Projekt Lambda

10voto

Peter Lawrey Punkte 511323

Ich verwende die folgende Struktur

int procs = Runtime.getRuntime().availableProcessors();
ExecutorService es = Executors.newFixedThreadPool(procs);

List<Future<TaskResult>> results = new ArrayList();
for(int i=0;i<tasks;i++)
    results.add(es.submit(new Task(i)));
for(Future<TaskResult> future:results)
    reduce(future);

8voto

Gareth Davis Punkte 27204

Ich weiß, dass dies vielleicht ein bisschen spät ist, aber vielleicht möchten Sie einen Blick auf die JSR166y ForkJoin Klassen aus JDK7.

Es gibt eine zurückportierte Bibliothek, die unter JDK6 ohne Probleme funktioniert, so dass Sie nicht bis zum nächsten Jahrtausend warten müssen, um sie auszuprobieren. Sie ist irgendwo zwischen einem Raw Executor und Hadoop angesiedelt und bietet einen Rahmen für die Arbeit an Map-Reduce-Jobs innerhalb der aktuellen JVM.

6voto

xan Punkte 7181

Ich habe vor ein paar Jahren, als ich einen 8-Kern-Rechner bekam, ein Einzelstück für mich erstellt, aber ich war nicht sehr glücklich damit. Es war nie so einfach zu bedienen, wie ich gehofft hatte, und speicherintensive Aufgaben ließen sich nicht gut skalieren.

Wenn Sie nichts bekommen réel Antworten kann ich noch mehr geben, aber der Kern ist:

public class LocalMapReduce<TMapInput, TMapOutput, TOutput> {
    private int m_threads;
    private Mapper<TMapInput, TMapOutput> m_mapper;
    private Reducer<TMapOutput, TOutput> m_reducer;
    ...
    public TOutput mapReduce(Iterator<TMapInput> inputIterator) {
        ExecutorService pool = Executors.newFixedThreadPool(m_threads);
        Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>();
        while (inputIterator.hasNext()) {
            TMapInput m = inputIterator.next();
            Future<TMapOutput> f = pool.submit(m_mapper.makeWorker(m));
            futureSet.add(f);
            Thread.sleep(10);
        }
        while (!futureSet.isEmpty()) {
            Thread.sleep(5);
            for (Iterator<Future<TMapOutput>> fit = futureSet.iterator(); fit.hasNext();) {
                Future<TMapOutput> f = fit.next();
                if (f.isDone()) {
                    fit.remove();
                    TMapOutput x = f.get();
                    m_reducer.reduce(x);
                }
            }
        }
        return m_reducer.getResult();
    }
}

EDIT: Aufgrund eines Kommentars ist hier eine Version ohne sleep . Der Trick ist die Verwendung CompletionService die im Wesentlichen eine blockierende Warteschlange von abgeschlossenen Future s.

 public class LocalMapReduce<TMapInput, TMapOutput, TOutput> {
    private int m_threads;
    private Mapper<TMapInput, TMapOutput> m_mapper;
    private Reducer<TMapOutput, TOutput> m_reducer;
    ...
    public TOutput mapReduce(Collection<TMapInput> input) {
        ExecutorService pool = Executors.newFixedThreadPool(m_threads);
        CompletionService<TMapOutput> futurePool = 
                  new ExecutorCompletionService<TMapOutput>(pool);
        Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>();
        for (TMapInput m : input) {
            futureSet.add(futurePool.submit(m_mapper.makeWorker(m)));
        }
        pool.shutdown();
        int n = futureSet.size();
        for (int i = 0; i < n; i++) {
            m_reducer.reduce(futurePool.take().get());
        }
        return m_reducer.getResult();
    }

Ich möchte auch anmerken, dass es sich um einen sehr destillierten Map-Reduce-Algorithmus handelt, der einen einzigen Reduce-Worker enthält, der sowohl die Reduce- als auch die Merge-Operation durchführt.

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X