Ich habe vor ein paar Jahren, als ich einen 8-Kern-Rechner bekam, ein Einzelstück für mich erstellt, aber ich war nicht sehr glücklich damit. Es war nie so einfach zu bedienen, wie ich gehofft hatte, und speicherintensive Aufgaben ließen sich nicht gut skalieren.
Wenn Sie nichts bekommen réel Antworten kann ich noch mehr geben, aber der Kern ist:
public class LocalMapReduce<TMapInput, TMapOutput, TOutput> {
private int m_threads;
private Mapper<TMapInput, TMapOutput> m_mapper;
private Reducer<TMapOutput, TOutput> m_reducer;
...
public TOutput mapReduce(Iterator<TMapInput> inputIterator) {
ExecutorService pool = Executors.newFixedThreadPool(m_threads);
Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>();
while (inputIterator.hasNext()) {
TMapInput m = inputIterator.next();
Future<TMapOutput> f = pool.submit(m_mapper.makeWorker(m));
futureSet.add(f);
Thread.sleep(10);
}
while (!futureSet.isEmpty()) {
Thread.sleep(5);
for (Iterator<Future<TMapOutput>> fit = futureSet.iterator(); fit.hasNext();) {
Future<TMapOutput> f = fit.next();
if (f.isDone()) {
fit.remove();
TMapOutput x = f.get();
m_reducer.reduce(x);
}
}
}
return m_reducer.getResult();
}
}
EDIT: Aufgrund eines Kommentars ist hier eine Version ohne sleep
. Der Trick ist die Verwendung CompletionService
die im Wesentlichen eine blockierende Warteschlange von abgeschlossenen Future
s.
public class LocalMapReduce<TMapInput, TMapOutput, TOutput> {
private int m_threads;
private Mapper<TMapInput, TMapOutput> m_mapper;
private Reducer<TMapOutput, TOutput> m_reducer;
...
public TOutput mapReduce(Collection<TMapInput> input) {
ExecutorService pool = Executors.newFixedThreadPool(m_threads);
CompletionService<TMapOutput> futurePool =
new ExecutorCompletionService<TMapOutput>(pool);
Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>();
for (TMapInput m : input) {
futureSet.add(futurePool.submit(m_mapper.makeWorker(m)));
}
pool.shutdown();
int n = futureSet.size();
for (int i = 0; i < n; i++) {
m_reducer.reduce(futurePool.take().get());
}
return m_reducer.getResult();
}
Ich möchte auch anmerken, dass es sich um einen sehr destillierten Map-Reduce-Algorithmus handelt, der einen einzigen Reduce-Worker enthält, der sowohl die Reduce- als auch die Merge-Operation durchführt.