194 Stimmen

Wie kann man MDC mit Thread-Pools verwenden?

In unserer Software verwenden wir ausgiebig MDC um Dinge wie Sitzungs-IDs und Benutzernamen für Webanfragen zu verfolgen. Dies funktioniert gut, solange es im ursprünglichen Thread läuft.

Allerdings gibt es eine Menge Dinge, die im Hintergrund verarbeitet werden müssen. Dafür verwenden wir die java.concurrent.ThreadPoolExecutor y java.util.Timer Klassen zusammen mit einigen selbstgedrehten asynchron Ausführungsdienstleistungen. Alle diese Dienste verwalten ihren eigenen Thread-Pool.

Das ist es, was Handbuch von Logback über die Verwendung von MDC in einer solchen Umgebung zu sagen hat:

Eine Kopie des zugeordneten Diagnosekontextes kann nicht immer vom initiierenden Thread an Worker-Threads vererbt werden. Dies ist der Fall, wenn java.util.concurrent.Executors für das Thread-Management verwendet wird. Die Methode newCachedThreadPool beispielsweise erstellt einen ThreadPoolExecutor und verfügt wie anderer Thread-Pooling-Code über eine komplizierte Thread-Erstellungslogik.

In solchen Fällen wird empfohlen, dass MDC.getCopyOfContextMap() auf dem ursprünglichen (Master-)Thread aufgerufen wird, bevor eine Aufgabe an den Executor übergeben wird. Wenn die Aufgabe läuft, sollte sie als erste Aktion MDC.setContextMapValues() aufrufen, um die gespeicherte Kopie der ursprünglichen MDC-Werte mit dem neuen vom Executor verwalteten Thread zu verknüpfen.

Das wäre in Ordnung, aber man vergisst sehr leicht, diese Anrufe hinzuzufügen, und es gibt keine einfache Möglichkeit, das Problem zu erkennen, bevor es zu spät ist. Das einzige Anzeichen mit Log4j ist, dass Sie fehlende MDC-Informationen in den Protokollen erhalten, und mit Logback erhalten Sie veraltete MDC-Informationen (da der Thread im Tread-Pool sein MDC von der ersten Aufgabe erbt, die auf ihm ausgeführt wurde). Beides sind ernste Probleme in einem Produktionssystem.

Ich sehe unsere Situation in keiner Weise als etwas Besonderes an, aber ich konnte im Internet nicht viel über dieses Problem finden. Offenbar sind nicht viele Menschen mit diesem Problem konfrontiert, also muss es eine Möglichkeit geben, es zu vermeiden. Was machen wir hier falsch?

2voto

MyKey_ Punkte 768

Ähnlich wie bei den zuvor veröffentlichten Lösungen, ist die newTaskFor Methoden für Runnable y Callable kann überschrieben werden, um das Argument bei der Erstellung der RunnableFuture .

Anmerkung: Folglich ist die executorService 's submit Methode muss anstelle der execute Methode.

Für die ScheduledThreadPoolExecutor El decorateTask Methoden stattdessen überschrieben werden würden.

1voto

Kenston Choi Punkte 2704

Eine andere Variante, die den bisherigen Antworten ähnelt, ist die Implementierung ExecutorService und lassen Sie zu, dass ein Delegat an ihn übergeben wird. Durch die Verwendung von Generika kann der eigentliche Delegat dann immer noch offengelegt werden, falls man einige Statistiken abrufen möchte (solange keine anderen Änderungsmethoden verwendet werden).

Referenzcode:

  • https://github.com/project-ncl/pnc/blob/master/common/src/main/java/org/jboss/pnc/common/concurrent/MDCThreadPoolExecutor.java
  • https://github.com/project-ncl/pnc/blob/master/common/src/main/java/org/jboss/pnc/common/concurrent/MDCWrappers.java

    public class MDCExecutorService<D extends ExecutorService> implements ExecutorService {

    private final D delegate;
    
    public MDCExecutorService(D delegate) {
        this.delegate = delegate;
    }
    
    @Override
    public void shutdown() {
        delegate.shutdown();
    }
    
    @Override
    public List<Runnable> shutdownNow() {
        return delegate.shutdownNow();
    }
    
    @Override
    public boolean isShutdown() {
        return delegate.isShutdown();
    }
    
    @Override
    public boolean isTerminated() {
        return delegate.isTerminated();
    }
    
    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        return delegate.awaitTermination(timeout, unit);
    }
    
    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return delegate.submit(wrap(task));
    }
    
    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return delegate.submit(wrap(task), result);
    }
    
    @Override
    public Future<?> submit(Runnable task) {
        return delegate.submit(wrap(task));
    }
    
    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
        return delegate.invokeAll(wrapCollection(tasks));
    }
    
    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
        return delegate.invokeAll(wrapCollection(tasks), timeout, unit);
    }
    
    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        return delegate.invokeAny(wrapCollection(tasks));
    }
    
    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return delegate.invokeAny(wrapCollection(tasks), timeout, unit);
    }
    
    @Override
    public void execute(Runnable command) {
        delegate.execute(wrap(command));
    }
    
    public D getDelegate() {
        return delegate;
    }
    
    /* Copied from https://github.com/project-ncl/pnc/blob/master/common/src/main/java/org/jboss/pnc/common
    /concurrent/MDCWrappers.java */
    
    private static Runnable wrap(final Runnable runnable) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return () -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                runnable.run();
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }
    
    private static <T> Callable<T> wrap(final Callable<T> callable) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return () -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                return callable.call();
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }
    
    private static <T> Consumer<T> wrap(final Consumer<T> consumer) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return (t) -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                consumer.accept(t);
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }
    
    private static <T> Collection<Callable<T>> wrapCollection(Collection<? extends Callable<T>> tasks) {
        Collection<Callable<T>> wrapped = new ArrayList<>();
        for (Callable<T> task : tasks) {
            wrapped.add(wrap(task));
        }
        return wrapped;
    }

    }

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X