47 Stimmen

Wie implementiere ich die Aufgabenpriorisierung mit einem ExecutorService in Java 5?

Ich implementiere einen Thread-Pooling-Mechanismus, in dem ich Aufgaben mit unterschiedlichen Prioritäten ausführen möchte. Ich möchte einen netten Mechanismus haben, mit dem ich eine Aufgabe mit hoher Priorität an den Dienst übermitteln kann, damit sie vor anderen Aufgaben geplant wird. Die Priorität der Aufgabe ist eine intrinsische Eigenschaft der Aufgabe selbst (ob ich diese Aufgabe als Callable oder eine Runnable ist für mich nicht wichtig).

Oberflächlich betrachtet sieht es so aus, als könnte ich eine PriorityBlockingQueue als die Aufgabenwarteschlange in meiner ThreadPoolExecutor , aber diese Warteschlange enthält Runnable Objekte, bei denen es sich auch um die Runnable Aufgaben, die ich ihr vorgelegt habe. Außerdem, wenn ich Folgendes eingereicht habe Callable Aufgaben, ist es nicht klar, wie dies jemals abgebildet werden könnte.

Gibt es eine Möglichkeit, dies zu tun? Ich würde wirklich lieber nicht meine eigene Rolle für diese, da ich viel wahrscheinlicher ist, es falsch, dass Weg zu bekommen.

(Nebenbei bemerkt: Ja, ich bin mir der Möglichkeit bewusst, dass Arbeitsplätze mit niedrigerer Priorität in einem solchen Fall verhungern könnten. Extrapunkte (?!) für Lösungen, die eine vernünftige Garantie für Fairness bieten)

16voto

Mike Punkte 18429

Ich habe dieses Problem auf eine vernünftige Art und Weise gelöst, und ich beschreibe es im Folgenden für mich und alle anderen, die auf dieses Problem mit den Java Concurrent-Bibliotheken stoßen.

Mit einer PriorityBlockingQueue als Mittel zum Festhalten von Aufgaben für die spätere Ausführung ist in der Tat ein Schritt in die richtige Richtung. Das Problem ist, dass die PriorityBlockingQueue muss generisch instanziiert werden und enthält Runnable Instanzen, und es ist unmöglich, die compareTo (oder ähnliches) auf einer Runnable Schnittstelle.

Nun zur Lösung des Problems. Bei der Erstellung des Executors muss dieser mit einem PriorityBlockingQueue . Die Warteschlange sollte außerdem einen benutzerdefinierten Comparator erhalten, um eine ordnungsgemäße Sortierung vorzunehmen:

new PriorityBlockingQueue<Runnable>(size, new CustomTaskComparator());

Nun, ein Blick auf CustomTaskComparator :

public class CustomTaskComparator implements Comparator<MyType> {

    @Override
    public int compare(MyType first, MyType second) {
         return comparison;
    }

}

Bis zu diesem Punkt sieht alles ziemlich einfach aus. Hier wird es ein bisschen klebrig. Unser nächstes Problem ist die Erstellung von FutureTasks durch den Executor. Im Executor müssen wir die Funktion newTaskFor als so:

@Override
protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
    //Override the default FutureTask creation and retrofit it with
    //a custom task. This is done so that prioritization can be accomplished.
    return new CustomFutureTask(c);
}

Donde c ist die Callable Aufgabe, die wir versuchen, auszuführen. Werfen wir nun einen Blick auf CustomFutureTask :

public class CustomFutureTask extends FutureTask {

    private CustomTask task;

    public CustomFutureTask(Callable callable) {
        super(callable);
        this.task = (CustomTask) callable;
    }

    public CustomTask getTask() {
        return task;
    }

}

Beachten Sie die getTask Methode. Wir werden das später verwenden, um die ursprüngliche Aufgabe aus dieser CustomFutureTask die wir geschaffen haben.

Und schließlich wollen wir die ursprüngliche Aufgabe, die wir auszuführen versuchen, ändern:

public class CustomTask implements Callable<MyType>, Comparable<CustomTask> {

    private final MyType myType;

    public CustomTask(MyType myType) {
        this.myType = myType;
    }

    @Override
    public MyType call() {
        //Do some things, return something for FutureTask implementation of `call`.
        return myType;
    }

    @Override
    public int compareTo(MyType task2) {
        return new CustomTaskComparator().compare(this.myType, task2.myType);
    }

}

Sie können sehen, dass wir die Comparable in der Aufgabe, die an den eigentlichen Comparator para MyType .

Und schon haben Sie eine angepasste Priorisierung für einen Executor unter Verwendung der Java-Bibliotheken! Es braucht ein wenig Biegung, aber es ist die sauberste, die ich in der Lage gewesen, mit zu kommen. Ich hoffe, das ist hilfreich für jemanden!

8voto

Adam Jaskiewicz Punkte 10844

Auf den ersten Blick sieht es so aus, als könnten Sie eine Schnittstelle für Ihre Aufgaben definieren, die die Runnable o Callable<T> y Comparable . Dann wickeln Sie eine ThreadPoolExecutor mit einer PriorityBlockingQueue als Warteschlange, und akzeptieren Sie nur Aufgaben, die Ihre Schnittstelle implementieren.

Unter Berücksichtigung Ihres Kommentars scheint es eine Möglichkeit zu sein, die ThreadPoolExecutor und überschreiben Sie die submit() Methoden. Siehe AbstractExecutorService um zu sehen, wie die Standardeinstellungen aussehen; sie verpacken lediglich die Runnable o Callable in einem FutureTask y execute() es. Ich würde dies wahrscheinlich tun, indem ich eine Wrapper-Klasse schreibe, die Folgendes implementiert ExecutorService und delegiert an eine anonyme innere ThreadPoolExecutor . Wickeln Sie sie in etwas ein, das Ihre Priorität hat, so dass Ihre Comparator zugreifen kann.

4voto

Sie können diese Hilfsklassen verwenden:

public class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get(timeout, unit);
    }

    public void run() {
        src.run();
    }

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
        public int compare(Runnable o1, Runnable o2) {
            if (o1 == null && o2 == null)
                return 0;
            else if (o1 == null)
                return -1;
            else if (o2 == null)
                return 1;
            else {
                int p1 = ((PriorityFuture<?>) o1).getPriority();
                int p2 = ((PriorityFuture<?>) o2).getPriority();

                return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
            }
        }
    };
}

UND

public interface PriorityCallable<T> extends Callable<T> {

    int getPriority();

}

UND diese Hilfsmethode:

public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
            return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
        }
    };
}

UND dann verwenden Sie es so:

class LenthyJob implements PriorityCallable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor exec = getPriorityExecutor(2);

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}

4voto

thedarkpassenger Punkte 6590

Ich werde versuchen, dieses Problem anhand eines voll funktionsfähigen Codes zu erklären. Aber bevor ich in den Code eintauche, möchte ich etwas über PriorityBlockingQueue erklären

PriorityBlockingQueue : PriorityBlockingQueue ist eine Implementierung von BlockingQueue. Sie nimmt die Aufgaben mit ihrer Priorität entgegen und gibt die Aufgabe mit der höchsten Priorität zuerst zur Ausführung frei. Wenn zwei Aufgaben die gleiche Priorität haben, müssen wir eine benutzerdefinierte Logik bereitstellen, um zu entscheiden, welche Aufgabe zuerst ausgeführt wird.

Kommen wir gleich zum Code.

Klasse der Fahrer : Diese Klasse erstellt einen Executor, der Aufgaben annimmt und sie später zur Ausführung übergibt. Hier erstellen wir zwei Aufgaben, eine mit NIEDRIGER Priorität und die andere mit HOHER Priorität. Hier weisen wir den Executor an, MAX von 1 Threads auszuführen und die PriorityBlockingQueue zu verwenden.

     public static void main(String[] args) {

       /*
       Minimum number of threads that must be running : 0
       Maximium number of threads that can be created : 1
       If a thread is idle, then the minimum time to keep it alive : 1000
       Which queue to use : PriorityBlockingQueue
       */
    PriorityBlockingQueue queue = new PriorityBlockingQueue();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,
        1000, TimeUnit.MILLISECONDS,queue);

    MyTask task = new MyTask(Priority.LOW,"Low");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.HIGH,"High");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.MEDIUM,"Medium");
    executor.execute(new MyFutureTask(task));

}

MyTask-Klasse : MyTask implementiert Runnable und akzeptiert die Priorität als Argument im Konstruktor. Wenn diese Aufgabe ausgeführt wird, gibt sie eine Nachricht aus und versetzt den Thread dann für 1 Sekunde in den Ruhezustand.

   public class MyTask implements Runnable {

  public int getPriority() {
    return priority.getValue();
  }

  private Priority priority;

  public String getName() {
    return name;
  }

  private String name;

  public MyTask(Priority priority,String name){
    this.priority = priority;
    this.name = name;
  }

  @Override
  public void run() {
    System.out.println("The following Runnable is getting executed "+getName());
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}

MyFutureTask-Klasse : Da wir PriorityBlocingQueue zur Aufnahme unserer Tasks verwenden, müssen unsere Tasks in FutureTask verpackt werden und unsere Implementierung von FutureTask muss die Schnittstelle Comparable implementieren. Die Comparable-Schnittstelle vergleicht die Priorität von 2 verschiedenen Tasks und übergibt den Task mit der höchsten Priorität zur Ausführung.

 public class MyFutureTask extends FutureTask<MyFutureTask>
      implements Comparable<MyFutureTask> {

    private  MyTask task = null;

    public  MyFutureTask(MyTask task){
      super(task,null);
      this.task = task;
    }

    @Override
    public int compareTo(MyFutureTask another) {
      return task.getPriority() - another.task.getPriority();
    }
  }

Prioritätsklasse : Selbsterklärend Prioritätsklasse.

public enum Priority {

  HIGHEST(0),
  HIGH(1),
  MEDIUM(2),
  LOW(3),
  LOWEST(4);

  int value;

  Priority(int val) {
    this.value = val;
  }

  public int getValue(){
    return value;
  }

}

Wenn wir nun dieses Beispiel ausführen, erhalten wir folgende Ausgabe

The following Runnable is getting executed High
The following Runnable is getting executed Medium
The following Runnable is getting executed Low

Auch wenn wir die Aufgabe mit NIEDRIGER Priorität zuerst, die mit HOHER Priorität aber später eingereicht haben, wird jede Aufgabe mit höherer Priorität zuerst ausgeführt, da wir eine PriorityBlockingQueue verwenden.

1voto

Daniel Hári Punkte 6308

Bei meiner Lösung bleibt die Reihenfolge der Übermittlung von Aufgaben mit gleichen Prioritäten erhalten. Es ist eine Verbesserung der folgenden Lösung respuesta

Reihenfolge der Aufgabenausführung basiert auf:

  1. Priorität
  2. Bestellung abschicken (innerhalb derselben Priorität)

Tester-Klasse:

public class Main {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService executorService = PriorityExecutors.newFixedThreadPool(1);

        //Priority=0
        executorService.submit(newCallable("A1", 200));     //Defaults to priority=0 
        executorService.execute(newRunnable("A2", 200));    //Defaults to priority=0
        executorService.submit(PriorityCallable.of(newCallable("A3", 200), 0));
        executorService.submit(PriorityRunnable.of(newRunnable("A4", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A5", 200), 0));
        executorService.submit(PriorityRunnable.of(newRunnable("A6", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A7", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A8", 200), 0));

        //Priority=1
        executorService.submit(PriorityRunnable.of(newRunnable("B1", 200), 1));
        executorService.submit(PriorityRunnable.of(newRunnable("B2", 200), 1));
        executorService.submit(PriorityCallable.of(newCallable("B3", 200), 1));
        executorService.execute(PriorityRunnable.of(newRunnable("B4", 200), 1));
        executorService.submit(PriorityRunnable.of(newRunnable("B5", 200), 1));

        executorService.shutdown();

    }

    private static Runnable newRunnable(String name, int delay) {
        return new Runnable() {
            @Override
            public void run() {
                System.out.println(name);
                sleep(delay);
            }
        };
    }

    private static Callable<Integer> newCallable(String name, int delay) {
        return new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println(name);
                sleep(delay);
                return 10;
            }
        };
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

}

Ergebnis:

A1 B1 B2 B3 B4 B5 A2 A3 A4 A5 A6 A7 A8

Die erste Aufgabe ist A1, weil es in der Warteschlange keine höhere Priorität gab, als sie eingefügt wurde. B-Aufgaben haben eine Priorität von 1 und werden daher früher ausgeführt, A-Aufgaben haben eine Priorität von 0 und werden daher später ausgeführt, aber die Ausführungsreihenfolge folgt der Reihenfolge der Einreichung: B1, B2, B3, ... A2, A3, A4 ...

Die Lösung:

public class PriorityExecutors {

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new PriorityExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS);
    }

    private static class PriorityExecutor extends ThreadPoolExecutor {
        private static final int DEFAULT_PRIORITY = 0;
        private static AtomicLong instanceCounter = new AtomicLong();

        @SuppressWarnings({"unchecked"})
        public PriorityExecutor(int corePoolSize, int maximumPoolSize,
                long keepAliveTime, TimeUnit unit) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10,
                    ComparableTask.comparatorByPriorityAndSequentialOrder()));
        }

        @Override
        public void execute(Runnable command) {
            // If this is ugly then delegator pattern needed
            if (command instanceof ComparableTask) //Already wrapped
                super.execute(command);
            else {
                super.execute(newComparableRunnableFor(command));
            }
        }

        private Runnable newComparableRunnableFor(Runnable runnable) {
            return new ComparableRunnable(ensurePriorityRunnable(runnable));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new ComparableFutureTask<>(ensurePriorityCallable(callable));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value);
        }

        private <T> PriorityCallable<T> ensurePriorityCallable(Callable<T> callable) {
            return (callable instanceof PriorityCallable) ? (PriorityCallable<T>) callable
                    : PriorityCallable.of(callable, DEFAULT_PRIORITY);
        }

        private PriorityRunnable ensurePriorityRunnable(Runnable runnable) {
            return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable
                    : PriorityRunnable.of(runnable, DEFAULT_PRIORITY);
        }

        private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask {
            private Long sequentialOrder = instanceCounter.getAndIncrement();
            private HasPriority hasPriority;

            public ComparableFutureTask(PriorityCallable<T> priorityCallable) {
                super(priorityCallable);
                this.hasPriority = priorityCallable;
            }

            public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) {
                super(priorityRunnable, result);
                this.hasPriority = priorityRunnable;
            }

            @Override
            public long getInstanceCount() {
                return sequentialOrder;
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }
        }

        private static class ComparableRunnable implements Runnable, ComparableTask {
            private Long instanceCount = instanceCounter.getAndIncrement();
            private HasPriority hasPriority;
            private Runnable runnable;

            public ComparableRunnable(PriorityRunnable priorityRunnable) {
                this.runnable = priorityRunnable;
                this.hasPriority = priorityRunnable;
            }

            @Override
            public void run() {
                runnable.run();
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }

            @Override
            public long getInstanceCount() {
                return instanceCount;
            }
        }

        private interface ComparableTask extends Runnable {
            int getPriority();

            long getInstanceCount();

            public static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() {
                return (o1, o2) -> {
                    int priorityResult = o2.getPriority() - o1.getPriority();
                    return priorityResult != 0 ? priorityResult
                            : (int) (o1.getInstanceCount() - o2.getInstanceCount());
                };
            }

        }

    }

    private static interface HasPriority {
        int getPriority();
    }

    public interface PriorityCallable<V> extends Callable<V>, HasPriority {

        public static <V> PriorityCallable<V> of(Callable<V> callable, int priority) {
            return new PriorityCallable<V>() {
                @Override
                public V call() throws Exception {
                    return callable.call();
                }

                @Override
                public int getPriority() {
                    return priority;
                }
            };
        }
    }

    public interface PriorityRunnable extends Runnable, HasPriority {

        public static PriorityRunnable of(Runnable runnable, int priority) {
            return new PriorityRunnable() {
                @Override
                public void run() {
                    runnable.run();
                }

                @Override
                public int getPriority() {
                    return priority;
                }
            };
        }
    }

}

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X