InheritableThreadLocal 和线程池

发布于 2024-12-03 05:41:07 字数 424 浏览 0 评论 0原文

我有一个问题,我真的认为没有解决方案,但无论如何我都会在这里尝试。我的应用程序使用线程池,并且该池中的一些线程具有可继承的线程局部变量。我扩展了 ThreadPoolExecutor 类,以便在线程执行完毕时基本上清除线程局部变量(在 afterExecute 回调方法中)。

据我了解,当您有 InheritableThreadLocal 变量时,在初始化线程时会调用 childValue() 方法,以从父线程获取 ThreadLocal 变量的值。然而,在我的例子中,下次使用线程时(使用一次后),InheritableThreadLocal 变量的值为 null(因为它之前已在 afterExecute 中清除)。有没有办法在 beforeExecute 中访问父线程的线程局部变量,以便我基本上可以模拟 InheritableThreadLocal 中的 childValue 方法在线程创建时所做的操作。

I have a problem which I don't really think has a solution but I'll try here anyway. My application uses a thread pool and some of the threads in this pool have an inheritable thread local variable. I've extended the ThreadPoolExecutor class to essentially clear out the thread local variable (in the afterExecute call back method) when a thread is done executing.

I understand that when you have an InheritableThreadLocal variable, the childValue() method is called when the thread is initialized to get the ThreadLocal variable's value from the parent thread. However, in my case the next time the thread is used (after being used once), the value of the InheritableThreadLocal variable is null (because it was previously cleared out in afterExecute). Is there a way to access the parent thread's thread local variable in beforeExecute so that I can essentially simulate what the childValue method in InheritableThreadLocal does at the time of thread creation.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

肩上的翅膀 2024-12-10 05:41:07

听起来这对于线程局部变量的“可继承”风格来说是一个糟糕的用例。

我的建议是只使用常规的 TheadLocal 并显式地进行初始化;例如将初始值作为参数或其他内容从父线程传递给子线程。

(我建议您通过让子线程在启动时立即获取值来强制子线程初始化。但这存在竞争条件的风险;例如,如果父线程在子线程之前返回到池中线程开始执行。)


我想我要问的是是否有一种方法可以从子线程访问父线程的线程局部变量的值。

没有办法做到这一点。

而且,从您的其他评论来看,我怀疑您的意思是正常意义上的“父”和“子”……父线程创建子线程。

但这里有一个想法。不要尝试在线程之间共享变量,而是共享一个固定值(例如请求 ID),并将其用作共享 Map 的键。使用 Map 条目作为共享变量。

It sounds like this is a poor use-case for the "inheritable" flavour of thread-locals.

My advice would be to just use a regular TheadLocal and do the initialization explicitly; e.g. passing the initial value from the parent thread to the child thread as a parameter or something.

(I was going suggest that you force initialization of the thread-local the child thread by having it fetch the value as soon as it starts. But that risks a race-condition; e.g. if the parent thread is returned to the pool before the child thread starts executing.)


I guess what I am asking is if there is a way to access the value of the parent thread's thread local variable from a child thread.

There isn't a way to do this.

And, judging from your other comments, I doubt that you mean "parent" and "child" in the normal sense ... where the parent thread creates the child thread.

But here's an idea. Instead of trying to share a variable between threads, share a fixed value (e.g. a request ID), and use that as a key for a shared Map. Use the Map entries as the shared variables.

说好的呢 2024-12-10 05:41:07

runnable 的构造函数在调用者的线程中运行,而 run 方法在子线程中运行。您可以使用此事实将信息从父线程传输到子线程。看:

public class ThreadlocalApplication {
static public ThreadLocal<String> tenantId = new ThreadLocal<>();

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService executor = Executors.newCachedThreadPool();
    System.out.println(Thread.currentThread().getName());
    ThreadlocalApplication.tenantId.set("4711");
    executor.submit(new AsyncTask()).get();
    executor.shutdown();
}

static class AsyncTask implements Runnable {
    private String _tenantId;

    public AsyncTask() {
        System.out.println(Thread.currentThread().getName());
        _tenantId = ThreadlocalApplication.tenantId.get();
    }

    @Override
    public void run() {
        ThreadlocalApplication.tenantId.set(_tenantId);
        System.out.println(Thread.currentThread().getName());
        System.out.println(ThreadlocalApplication.tenantId.get());
    }
}
}

这就是结果

main
main
pool-1-thread-1
4711

The constructor of a runnable runs in the thread of the caller, whereas the run method runs in the child thread. You can use this fact to transfer information from the parent to the child thread. See:

public class ThreadlocalApplication {
static public ThreadLocal<String> tenantId = new ThreadLocal<>();

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService executor = Executors.newCachedThreadPool();
    System.out.println(Thread.currentThread().getName());
    ThreadlocalApplication.tenantId.set("4711");
    executor.submit(new AsyncTask()).get();
    executor.shutdown();
}

static class AsyncTask implements Runnable {
    private String _tenantId;

    public AsyncTask() {
        System.out.println(Thread.currentThread().getName());
        _tenantId = ThreadlocalApplication.tenantId.get();
    }

    @Override
    public void run() {
        ThreadlocalApplication.tenantId.set(_tenantId);
        System.out.println(Thread.currentThread().getName());
        System.out.println(ThreadlocalApplication.tenantId.get());
    }
}
}

And this is the result

main
main
pool-1-thread-1
4711
幸福%小乖 2024-12-10 05:41:07

我遇到了类似的问题,最终创建了一个 ForkListeningExecutorService ,它包装了一个 ExecutorService (请参阅此处)。每当任务提交给执行器或任务结束时,包装器都会向侦听器发送事件。然后,您可以使用侦听器在线程之间传递您想要的任何内容。侦听器如下所示:

  /**
   * Listener that will receive the events around task submission.
   *
   */
  public interface ExecutorServiceListener {

    /**
     * Will be called <b>before</b> any task is submitted to the service. This method will therefore run on the original "parent" thread.
     */
    default void beforeTaskSubmission() {
      // to be overridden by implementations
    }

    /**
     * Will be called <b>after</b> a task is submitted to the service and <b>before</b> the actual execution starts. This method will therefore run on the new "child" thread.
     */
    default void afterTaskSubmission() {
      // to be overridden by implementations
    }

    /**
     * Will be called <b>before</b> a submitted task ends no matter if an exception was thrown or not. This method will therefore run on the new "child" thread just before it's
     * released.
     */
    default void beforeTaskEnds() {
      // to be overridden by implementations
    }
  }

因此,您可以简单地包装它(或任何其他 ExecutorService 实现)并在侦听器中传递状态,而不是扩展 ThreadPoolExecutor ,如下所示:

public class ForkListeningExecutorServiceExample {

  private static ThreadLocal<String> threadLocal = new ThreadLocal<>();

  private static void printThreadMessage(final String message) {
    System.out.println(message
        + ", current thread: " + Thread.currentThread().getName()
        + ", value from threadLocal: " + threadLocal.get());
  }

  public static void main(final String[] args) throws Exception {

    threadLocal.set("MY_STATE");

    final ExecutorService executorService = new ForkListeningExecutorService(
        Executors.newCachedThreadPool(),
        new ExecutorServiceListener() {

          private String valueToShare;

          @Override
          public void beforeTaskSubmission() {
            valueToShare = threadLocal.get();
            printThreadMessage("The task is about to be submitted");
          }

          @Override
          public void afterTaskSubmission() {
            threadLocal.set(valueToShare);
            printThreadMessage("The task has been submitted and will start now");
          }

          @Override
          public void beforeTaskEnds() {
            threadLocal.set(null);
            printThreadMessage("The task has finished and thread will be released now");
          }
        });

    executorService.submit(() -> {
      printThreadMessage("The task is running now");
    }).get();

    printThreadMessage("We are back on the main thread");
  }
}

输出看起来像这样:

The task is about to be submitted, current thread: main, value from threadLocal: MY_STATE
The task has been submitted and will start now, current thread: pool-1-thread-1, value from threadLocal: MY_STATE
The task is running now, current thread: pool-1-thread-1, value from threadLocal: MY_STATE
The task has finished and thread will be released now, current thread: pool-1-thread-1, value from threadLocal: null
We are back on the main thread, current thread: main, value from threadLocal: MY_STATE

I had a similar problem and ended up creating a ForkListeningExecutorService which wraps an ExecutorService (see here). The wrapper sends events to a listener whenever a task is submitted to the executor or when it ends. You can then use the listener to pass whatever you want across the threads. This is how the Listener looks like:

  /**
   * Listener that will receive the events around task submission.
   *
   */
  public interface ExecutorServiceListener {

    /**
     * Will be called <b>before</b> any task is submitted to the service. This method will therefore run on the original "parent" thread.
     */
    default void beforeTaskSubmission() {
      // to be overridden by implementations
    }

    /**
     * Will be called <b>after</b> a task is submitted to the service and <b>before</b> the actual execution starts. This method will therefore run on the new "child" thread.
     */
    default void afterTaskSubmission() {
      // to be overridden by implementations
    }

    /**
     * Will be called <b>before</b> a submitted task ends no matter if an exception was thrown or not. This method will therefore run on the new "child" thread just before it's
     * released.
     */
    default void beforeTaskEnds() {
      // to be overridden by implementations
    }
  }

So, instead of extending ThreadPoolExecutor, you can simply wrap it (or whatever other ExecutorService implementation) and pass the state within the listener like this:

public class ForkListeningExecutorServiceExample {

  private static ThreadLocal<String> threadLocal = new ThreadLocal<>();

  private static void printThreadMessage(final String message) {
    System.out.println(message
        + ", current thread: " + Thread.currentThread().getName()
        + ", value from threadLocal: " + threadLocal.get());
  }

  public static void main(final String[] args) throws Exception {

    threadLocal.set("MY_STATE");

    final ExecutorService executorService = new ForkListeningExecutorService(
        Executors.newCachedThreadPool(),
        new ExecutorServiceListener() {

          private String valueToShare;

          @Override
          public void beforeTaskSubmission() {
            valueToShare = threadLocal.get();
            printThreadMessage("The task is about to be submitted");
          }

          @Override
          public void afterTaskSubmission() {
            threadLocal.set(valueToShare);
            printThreadMessage("The task has been submitted and will start now");
          }

          @Override
          public void beforeTaskEnds() {
            threadLocal.set(null);
            printThreadMessage("The task has finished and thread will be released now");
          }
        });

    executorService.submit(() -> {
      printThreadMessage("The task is running now");
    }).get();

    printThreadMessage("We are back on the main thread");
  }
}

The output looks then like this:

The task is about to be submitted, current thread: main, value from threadLocal: MY_STATE
The task has been submitted and will start now, current thread: pool-1-thread-1, value from threadLocal: MY_STATE
The task is running now, current thread: pool-1-thread-1, value from threadLocal: MY_STATE
The task has finished and thread will be released now, current thread: pool-1-thread-1, value from threadLocal: null
We are back on the main thread, current thread: main, value from threadLocal: MY_STATE
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文