3 Stimmen

Executor/Queue Prozess nur letzte bekannte Aufgabe

Ich bin auf der Suche nach einigen gleichzeitigen Code zu schreiben, die ein Ereignis verarbeiten wird. Diese Verarbeitung kann eine lange Zeit dauern.

Während dieses Ereignis verarbeitet wird, sollte es eingehende Ereignisse aufzeichnen und dann die letzten eingehenden Ereignisse verarbeiten, wenn es wieder laufen kann. (Die anderen Ereignisse können weggeworfen werden). Dies ist ein wenig wie eine FILO-Warteschlange, aber ich muss nur ein Element in der Warteschlange speichern.

Im Idealfall möchte ich meinen neuen Executor in meine unten dargestellte Ereignisverarbeitungsarchitektur einbinden.

public class AsyncNode<I, O> extends AbstractNode<I, O>  {
    private static final Logger log = LoggerFactory.getLogger(AsyncNode.class);
    private Executor executor;

    public AsyncNode(EventHandler<I, O> handler, Executor executor) {
        super(handler);
        this.executor = executor;
    }

    @Override
    public void emit(O output) {
        if (output != null) {
            for (EventListener<O> node : children) {
                node.handle(output);
            }
        }
    }

    @Override
    public void handle(final I input) {

        executor.execute(new Runnable() {

            @Override
            public void run() {
                try{
                emit(handler.process(input));
                }catch (Exception e){
                    log.error("Exception occured whilst processing input." ,e);
                    throw e;
                }

            }
        });

    }

}

3voto

Peter Lawrey Punkte 511323

Ich würde beides nicht tun. Ich würde eine AtomicReference auf das Ereignis, das Sie verarbeiten möchten und fügen Sie eine Aufgabe, um es in eine destruktive Weise zu verarbeiten.

final AtomicReference<Event> eventRef =

public void processEvent(Event event) {
   eventRef.set(event);
   executor.submit(new Runnable() {
       public vodi run() {
           Event e = eventRef.getAndSet(null);
           if (e == null) return;
           // process event
       }
   }
}

Dadurch wird das nächste Ereignis immer nur dann verarbeitet, wenn der Executor frei ist, ohne dass der Executor oder die Warteschlange (die für andere Dinge verwendet werden kann) angepasst wird.

Dies gilt auch für Ereignisse mit Schlüsseln, d. h. Sie möchten das letzte Ereignis für einen Schlüssel verarbeiten.

3voto

James Mudd Punkte 1317

Ich denke, der Schlüssel dazu ist die "Wegwerfpolitik", die Sie auf Ihre Executor . Wenn Sie nur die neueste Aufgabe bearbeiten wollen, benötigen Sie eine Warteschlangengröße von eins und eine "Verwerfungsrichtlinie", die die älteste Aufgabe wegwirft. Hier ist ein Beispiel für einen Executor, der dies tun wird

Executor latestTaskExecutor = new ThreadPoolExecutor(1, 1, // Single threaded 
        30L, TimeUnit.SECONDS, // Keep alive, not really important here
        new ArrayBlockingQueue<>(1), // Single element queue
        new ThreadPoolExecutor.DiscardOldestPolicy()); // When new work is submitted discard oldest

Wenn dann Ihre Aufgaben eintreffen, übermitteln Sie sie einfach an diesen Executor. Wenn sich bereits ein Auftrag in der Warteschlange befindet, wird er durch den neuen ersetzt

latestTaskExecutor.execute(() -> doUpdate()));

Hier ist eine Beispielanwendung, die dies zeigt

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class LatestUpdate {

    private static final Executor latestTaskExecutor = new ThreadPoolExecutor(1, 1, // Single threaded
            30L, TimeUnit.SECONDS, // Keep alive, not really important here
            new ArrayBlockingQueue<>(1), // Single element queue
            new ThreadPoolExecutor.DiscardOldestPolicy()); // When new work is submitted discard oldest

    private static final AtomicInteger counter = new AtomicInteger(0);
    private static final Random random = new Random(); 

    public static void main(String[] args) {
        LatestUpdate latestUpdate = new LatestUpdate();
        latestUpdate.run();
    }

    private void doUpdate(int number) {
        System.out.println("Latest number updated is: " + number);
        try { // Wait a random amount of time up to 5 seconds. Processing the update takes time...
            Thread.sleep(random.nextInt(5000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void run() {
        // Updates a counter every second and schedules an update event
        Thread counterUpdater = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    Thread.sleep(1000L); // Wait one second
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                counter.incrementAndGet();
                // Schedule this update will replace any existing update waiting 
                latestTaskExecutor.execute(() -> doUpdate(counter.get()));
                System.out.println("New number is: " + counter.get());
            }
        });
        counterUpdater.start(); // Run the thread
    }
}

Dies gilt auch für grafische Benutzeroberflächen (GUIs), bei denen man möchte, dass die GUI nach dem Ende der Aktualisierungen mit dem letzten empfangenen Ereignis konsistent wird.

0voto

DD. Punkte 20583
public class LatestTaskExecutor implements Executor {
    private final AtomicReference<Runnable> lastTask =new AtomicReference<>();
    private final Executor executor;

    public LatestTaskExecutor(Executor executor) {
        super();
        this.executor = executor;
    }

    @Override
    public void execute(Runnable command) {
        lastTask.set(command);
        executor.execute(new Runnable() {
            @Override
            public void run() {
                Runnable task=lastTask.getAndSet(null);
                if(task!=null){
                    task.run();
                }
            }
        });

    }
}

@RunWith( MockitoJUnitRunner.class )
public class LatestTaskExecutorTest {

    @Mock private Executor executor;
    private LatestTaskExecutor latestExecutor;
    @Before
    public void setup(){
        latestExecutor=new LatestTaskExecutor(executor);
    }
    @Test
    public void testRunSingleTask() {
        Runnable run=mock(Runnable.class);
        latestExecutor.execute(run);
        ArgumentCaptor<Runnable> captor=ArgumentCaptor.forClass(Runnable.class);
        verify(executor).execute(captor.capture());
        captor.getValue().run();
        verify(run).run();
    }

    @Test
    public void discardsIntermediateUpdates(){
        Runnable run=mock(Runnable.class);
        Runnable run2=mock(Runnable.class);
        latestExecutor.execute(run);
        latestExecutor.execute(run2);
        ArgumentCaptor<Runnable> captor=ArgumentCaptor.forClass(Runnable.class);
        verify(executor,times(2)).execute(captor.capture());
        for (Runnable runnable:captor.getAllValues()){
            runnable.run();
        }
        verify(run2).run();
        verifyNoMoreInteractions(run);
    }
}

0voto

shams Punkte 3400

Diese Antwort ist eine abgewandelte Version der Antwort von DD, die die Übermittlung überflüssiger Aufgaben minimiert.

Eine atomare Referenz wird verwendet, um das letzte Ereignis zu verfolgen. Eine benutzerdefinierte Aufgabe wird an die Warteschlange übermittelt, um möglicherweise ein Ereignis zu verarbeiten. Nur die Aufgabe, die das letzte Ereignis lesen darf, macht tatsächlich weiter und erledigt nützliche Arbeit, bevor sie den atomaren Verweis auf null löscht. Wenn andere Aufgaben eine Chance bekommen und feststellen, dass kein Ereignis zur Verarbeitung verfügbar ist, tun sie einfach nichts und gehen stillschweigend weiter. Die Übermittlung überflüssiger Aufgaben wird vermieden, indem die Anzahl der verfügbaren Aufgaben in der Warteschlange verfolgt wird. Wenn mindestens eine Aufgabe in der Warteschlange ansteht, können wir die Übermittlung der Aufgabe vermeiden, da das Ereignis bearbeitet wird, wenn eine bereits in der Warteschlange stehende Aufgabe aus der Warteschlange entfernt wird.

import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

public class EventExecutorService implements Executor {

    private final Executor executor;
    // the field which keeps track of the latest available event to process
    private final AtomicReference<Runnable> latestEventReference = new AtomicReference<>();
    private final AtomicInteger activeTaskCount = new AtomicInteger(0);

    public EventExecutorService(final Executor executor) {
        this.executor = executor;
    }

    @Override
    public void execute(final Runnable eventTask) {
        // update the latest event
        latestEventReference.set(eventTask);
        // read count _after_ updating event
        final int activeTasks = activeTaskCount.get();

        if (activeTasks == 0) {
            // there is definitely no other task to process this event, create a new task
            final Runnable customTask = new Runnable() {
                @Override
                public void run() {
                    // decrement the count for available tasks _before_ reading event
                    activeTaskCount.decrementAndGet();
                    // find the latest available event to process
                    final Runnable currentTask = latestEventReference.getAndSet(null);
                    if (currentTask != null) {
                        // if such an event exists, process it
                        currentTask.run();
                    } else {
                        // somebody stole away the latest event. Do nothing.
                    }
                }
            };
            // increment tasks count _before_ submitting task
            activeTaskCount.incrementAndGet();
            // submit the new task to the queue for processing
            executor.execute(customTask);
        }
    }
}

0voto

RoK Punkte 881

Die Lösung von James Mudd gefällt mir zwar, aber es wird trotzdem eine zweite Aufgabe in die Warteschlange gestellt, während die vorherige läuft, was unerwünscht sein könnte. Wenn Sie wollen immer ignorieren/verwerfen ankommende Aufgabe, wenn vorherige nicht abgeschlossen ist, können Sie einige Wrapper wie diese machen:

public class DiscardingSubmitter {
private final ExecutorService es = Executors.newSingleThreadExecutor();
private Future<?> future = CompletableFuture.completedFuture(null); //to avoid null check

public void submit(Runnable r){
    if (future.isDone()) {
        future = es.submit(r);
    }else {
        //Task skipped, log if you want
    }
}

}

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X