将 ThreadLocal 传播到从 ExecutorService 获取的新线程

发布于 2024-12-02 19:58:25 字数 846 浏览 2 评论 0原文

我正在一个带有超时的单独线程中运行一个进程,使用 ExecutorService 和 Future (示例代码 此处)(线程“生成”发生在 AOP 方面)。

现在,主线程是一个 Resteasy 请求。 Resteasy 使用一个或多个 ThreadLocal 变量来存储我需要在 Rest 方法调用中的某个时刻检索的一些上下文信息。问题是,由于 Resteasy 线程在新线程中运行,因此 ThreadLocal 变量会丢失。

将 Resteasy 使用的任何 ThreadLocal 变量“传播”到新线程的最佳方法是什么?看来 Resteasy 使用多个 ThreadLocal 变量来跟踪上下文信息,我想“盲目地”将所有信息传输到新线程。

我研究了子类化 ThreadPoolExecutor 并使用 beforeExecute 方法将当前线程传递到池中,但我不能找到一种将 ThreadLocal 变量传递到池中的方法。

有什么建议吗?

谢谢

I'm running a process in a separate thread with a timeout, using an ExecutorService and a Future (example code here) (the thread "spawning" takes place in a AOP Aspect).

Now, the main thread is a Resteasy request. Resteasy uses one ore more ThreadLocal variables to store some context information that I need to retrieve at some point in my Rest method call. Problem is, since the Resteasy thread is running in a new thread, the ThreadLocal variables are lost.

What would be the best way to "propagate" whatever ThreadLocal variable is used by Resteasy to the new thread? It seems that Resteasy uses more than one ThreadLocal variable to keep track of context information and I would like to "blindly" transfer all the information to the new thread.

I have looked at subclassing ThreadPoolExecutor and using the beforeExecute method to pass the current thread to the pool, but I couldn't find a way to pass the ThreadLocal variables to the pool.

Any suggestion?

Thanks

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(6

忆梦 2024-12-09 19:58:25

与线程关联的 ThreadLocal 实例集保存在每个 Thread 的私有成员中。枚举这些的唯一机会是对Thread进行一些反思;这样,您可以覆盖线程字段的访问限制。

一旦获得了 ThreadLocal 集合,您就可以使用 beforeExecute()afterExecute() 钩子复制后台线程。 >ThreadPoolExecutor,或者为您的任务创建一个 Runnable 包装器,拦截 run() 调用以设置取消设置必要的 ThreadLocal 实例。实际上,后一种技术可能效果更好,因为它可以为您提供一个方便的位置来在任务排队时存储 ThreadLocal 值。


更新:这是第二种方法的更具体说明。与我最初的描述相反,包装器中存储的所有内容都是调用线程,在执行任务时会询问该线程。

static Runnable wrap(Runnable task)
{
  Thread caller = Thread.currentThread();
  return () -> {
    Iterable<ThreadLocal<?>> vars = copy(caller);
    try {
      task.run();
    }
    finally {
      for (ThreadLocal<?> var : vars)
        var.remove();
    }
  };
}

/**
 * For each {@code ThreadLocal} in the specified thread, copy the thread's 
 * value to the current thread.  
 * 
 * @param caller the calling thread
 * @return all of the {@code ThreadLocal} instances that are set on current thread
 */
private static Collection<ThreadLocal<?>> copy(Thread caller)
{
  /* Use a nasty bunch of reflection to do this. */
  throw new UnsupportedOperationException();
}

The set of ThreadLocal instances associated with a thread are held in private members of each Thread. Your only chance to enumerate these is to do some reflection on the Thread; this way, you can override the access restrictions on the thread's fields.

Once you can get the set of ThreadLocal, you could copy in the background threads using the beforeExecute() and afterExecute() hooks of ThreadPoolExecutor, or by creating a Runnable wrapper for your tasks that intercepts the run() call to set an unset the necessary ThreadLocal instances. Actually, the latter technique might work better, since it would give you a convenient place to store the ThreadLocal values at the time the task is queued.


Update: Here's a more concrete illustration of the second approach. Contrary to my original description, all that is stored in the wrapper is the calling thread, which is interrogated when the task is executed.

static Runnable wrap(Runnable task)
{
  Thread caller = Thread.currentThread();
  return () -> {
    Iterable<ThreadLocal<?>> vars = copy(caller);
    try {
      task.run();
    }
    finally {
      for (ThreadLocal<?> var : vars)
        var.remove();
    }
  };
}

/**
 * For each {@code ThreadLocal} in the specified thread, copy the thread's 
 * value to the current thread.  
 * 
 * @param caller the calling thread
 * @return all of the {@code ThreadLocal} instances that are set on current thread
 */
private static Collection<ThreadLocal<?>> copy(Thread caller)
{
  /* Use a nasty bunch of reflection to do this. */
  throw new UnsupportedOperationException();
}
涫野音 2024-12-09 19:58:25

根据 @erickson 的回答,我编写了这段代码。它适用于可继承的ThreadLocals。它使用与线程构造函数中使用的相同方法构建可继承线程本地列表。当然,我使用反射来做到这一点。我还重写了执行器类。

public class MyThreadPoolExecutor extends ThreadPoolExecutor
{
   @Override
   public void execute(Runnable command)
   {
      super.execute(new Wrapped(command, Thread.currentThread()));
   }
}

包装:

   private class Wrapped implements Runnable
   {
      private final Runnable task;

      private final Thread caller;

      public Wrapped(Runnable task, Thread caller)
      {
         this.task = task;
         this.caller = caller;
      }

      public void run()
      {
         Iterable<ThreadLocal<?>> vars = null;
         try
         {
            vars = copy(caller);
         }
         catch (Exception e)
         {
            throw new RuntimeException("error when coping Threads", e);
         }
         try {
            task.run();
         }
         finally {
            for (ThreadLocal<?> var : vars)
               var.remove();
         }
      }
   }

复制方法:

public static Iterable<ThreadLocal<?>> copy(Thread caller) throws Exception
   {
      List<ThreadLocal<?>> threadLocals = new ArrayList<>();
      Field field = Thread.class.getDeclaredField("inheritableThreadLocals");
      field.setAccessible(true);
      Object map = field.get(caller);
      Field table = Class.forName("java.lang.ThreadLocal$ThreadLocalMap").getDeclaredField("table");
      table.setAccessible(true);

      Method method = ThreadLocal.class
              .getDeclaredMethod("createInheritedMap", Class.forName("java.lang.ThreadLocal$ThreadLocalMap"));
      method.setAccessible(true);
      Object o = method.invoke(null, map);

      Field field2 = Thread.class.getDeclaredField("inheritableThreadLocals");
      field2.setAccessible(true);
      field2.set(Thread.currentThread(), o);

      Object tbl = table.get(o);
      int length = Array.getLength(tbl);
      for (int i = 0; i < length; i++)
      {
         Object entry = Array.get(tbl, i);
         Object value = null;
         if (entry != null)
         {
            Method referentField = Class.forName("java.lang.ThreadLocal$ThreadLocalMap$Entry").getMethod(
                    "get");
            referentField.setAccessible(true);
            value = referentField.invoke(entry);
            threadLocals.add((ThreadLocal<?>) value);
         }
      }
      return threadLocals;
   }

Based on @erickson answer I wrote this code. It is working for inheritableThreadLocals. It builds list of inheritableThreadLocals using same method as is used in Thread contructor. Of course I use reflection to do this. Also I override the executor class.

public class MyThreadPoolExecutor extends ThreadPoolExecutor
{
   @Override
   public void execute(Runnable command)
   {
      super.execute(new Wrapped(command, Thread.currentThread()));
   }
}

Wrapper:

   private class Wrapped implements Runnable
   {
      private final Runnable task;

      private final Thread caller;

      public Wrapped(Runnable task, Thread caller)
      {
         this.task = task;
         this.caller = caller;
      }

      public void run()
      {
         Iterable<ThreadLocal<?>> vars = null;
         try
         {
            vars = copy(caller);
         }
         catch (Exception e)
         {
            throw new RuntimeException("error when coping Threads", e);
         }
         try {
            task.run();
         }
         finally {
            for (ThreadLocal<?> var : vars)
               var.remove();
         }
      }
   }

copy method:

public static Iterable<ThreadLocal<?>> copy(Thread caller) throws Exception
   {
      List<ThreadLocal<?>> threadLocals = new ArrayList<>();
      Field field = Thread.class.getDeclaredField("inheritableThreadLocals");
      field.setAccessible(true);
      Object map = field.get(caller);
      Field table = Class.forName("java.lang.ThreadLocal$ThreadLocalMap").getDeclaredField("table");
      table.setAccessible(true);

      Method method = ThreadLocal.class
              .getDeclaredMethod("createInheritedMap", Class.forName("java.lang.ThreadLocal$ThreadLocalMap"));
      method.setAccessible(true);
      Object o = method.invoke(null, map);

      Field field2 = Thread.class.getDeclaredField("inheritableThreadLocals");
      field2.setAccessible(true);
      field2.set(Thread.currentThread(), o);

      Object tbl = table.get(o);
      int length = Array.getLength(tbl);
      for (int i = 0; i < length; i++)
      {
         Object entry = Array.get(tbl, i);
         Object value = null;
         if (entry != null)
         {
            Method referentField = Class.forName("java.lang.ThreadLocal$ThreadLocalMap$Entry").getMethod(
                    "get");
            referentField.setAccessible(true);
            value = referentField.invoke(entry);
            threadLocals.add((ThreadLocal<?>) value);
         }
      }
      return threadLocals;
   }
不气馁 2024-12-09 19:58:25

据我了解您的问题,您可以查看 InheritableThreadLocal 意味着将 ThreadLocal 变量从父线程上下文传递到子线程上下文

As I understand your problem, you can have a look at InheritableThreadLocal which is meant to pass ThreadLocal variables from Parent Thread context to Child Thread Context

浪漫之都 2024-12-09 19:58:25

我不喜欢反射方法。替代解决方案是实现执行器包装器并将对象作为 ThreadLocal 上下文直接传递给传播父上下文的所有子线程。

public class PropagatedObject {

    private ThreadLocal<ConcurrentHashMap<AbsorbedObjectType, Object>> data = new ThreadLocal<>();

   //put, set, merge methods, etc

}

==>

public class ObjectAwareExecutor extends AbstractExecutorService {

    private final ExecutorService delegate;
    private final PropagatedObject objectAbsorber;

    public ObjectAwareExecutor(ExecutorService delegate, PropagatedObject objectAbsorber){
        this.delegate = delegate;
        this.objectAbsorber = objectAbsorber;
    }
    @Override
    public void execute(final Runnable command) {

        final ConcurrentHashMap<String, Object> parentContext = objectAbsorber.get();
        delegate.execute(() -> {
            try{
                objectAbsorber.set(parentContext);
                command.run();
            }finally {
                parentContext.putAll(objectAbsorber.get());
                objectAbsorber.clean();
            }
        });
        objectAbsorber.merge(parentContext);
    }

I don't like Reflection approach. Alternative solution would be to implement executor wrapper and pass object directly as a ThreadLocal context to all child threads propagating a parent context.

public class PropagatedObject {

    private ThreadLocal<ConcurrentHashMap<AbsorbedObjectType, Object>> data = new ThreadLocal<>();

   //put, set, merge methods, etc

}

==>

public class ObjectAwareExecutor extends AbstractExecutorService {

    private final ExecutorService delegate;
    private final PropagatedObject objectAbsorber;

    public ObjectAwareExecutor(ExecutorService delegate, PropagatedObject objectAbsorber){
        this.delegate = delegate;
        this.objectAbsorber = objectAbsorber;
    }
    @Override
    public void execute(final Runnable command) {

        final ConcurrentHashMap<String, Object> parentContext = objectAbsorber.get();
        delegate.execute(() -> {
            try{
                objectAbsorber.set(parentContext);
                command.run();
            }finally {
                parentContext.putAll(objectAbsorber.get());
                objectAbsorber.clean();
            }
        });
        objectAbsorber.merge(parentContext);
    }
流星番茄 2024-12-09 19:58:25

下面是一个示例,将父线程中的当前 LocaleContext 传递给 CompletableFuture 跨越的子线程[默认情况下它使用 ForkJoinPool]。

只需在 Runnable 块内的子线程中定义您想要执行的所有操作即可。因此,当 CompletableFuture 执行 Runnable 块时,子线程处于控制状态,瞧,您在子线程的 ThreadLocal 中设置了父线程的 ThreadLocal 内容。

这里的问题不是整个ThreadLocal都被复制过来了。仅复制 LocaleContext。由于 ThreadLocal 只能对它所属的线程进行私有访问,因此使用反射并尝试在 Child 中获取和设置是太多古怪的事情,可能会导致内存泄漏或性能下降。

因此,如果您从 ThreadLocal 知道您感兴趣的参数,那么此解决方案的工作方式会更加清晰。

 public void parentClassMethod(Request request) {
        LocaleContext currentLocale = LocaleContextHolder.getLocaleContext();
        executeInChildThread(() -> {
                LocaleContextHolder.setLocaleContext(currentLocale);
                //Do whatever else you wanna do
            }));

        //Continue stuff you want to do with parent thread
}


private void executeInChildThread(Runnable runnable) {
    try {
        CompletableFuture.runAsync(runnable)
            .get();
    } catch (Exception e) {
        LOGGER.error("something is wrong");
    }
}

Here is an example to pass the current LocaleContext in parent thread to the child thread spanned by CompletableFuture[By default it used ForkJoinPool].

Just define all the things you wanted to do in a child thread inside a Runnable block. So when the CompletableFuture execute the Runnable block, its the child thread who is in control and voila you have the parent's ThreadLocal stuff set in Child's ThreadLocal.

The problem here is not the entire ThreadLocal is copied over. Only the LocaleContext is copied. Since the ThreadLocal is of private access to only the Thread it belongs too using Reflection and trying to get and set in Child is all too much of wacky stuff which might lead to memory leaks or performance hit.

So if you know the parameters you are interested from the ThreadLocal, then this solution works way cleaner.

 public void parentClassMethod(Request request) {
        LocaleContext currentLocale = LocaleContextHolder.getLocaleContext();
        executeInChildThread(() -> {
                LocaleContextHolder.setLocaleContext(currentLocale);
                //Do whatever else you wanna do
            }));

        //Continue stuff you want to do with parent thread
}


private void executeInChildThread(Runnable runnable) {
    try {
        CompletableFuture.runAsync(runnable)
            .get();
    } catch (Exception e) {
        LOGGER.error("something is wrong");
    }
}
谁的年少不轻狂 2024-12-09 19:58:25

如果你查看 ThreadLocal 代码,你会发现:

    public T get() {
        Thread t = Thread.currentThread();
        ...
    }

当前线程不能被覆盖。

可能的解决方案:

  1. 看看java 7 fork/join机制(但我认为这是一个坏方法)

  2. 看看< href="http://download.oracle.com/javase/1.4.2/docs/guide/standards/" rel="nofollow">认可机制来覆盖ThreadLocal类在你的JVM。

  3. 尝试重写RESTEasy(你可以使用IDE中的Refactor工具来替换所有ThreadLocal的使用,看起来很简单)

If you look at ThreadLocal code you can see:

    public T get() {
        Thread t = Thread.currentThread();
        ...
    }

current thread cannot be overwritten.

Possible solutions:

  1. Look at java 7 fork/join mechanism (but i think it's a bad way)

  2. Look at endorsed mechanism to overwrite ThreadLocal class in your JVM.

  3. Try to rewrite RESTEasy (you can use Refactor tools in your IDE to replace all ThreadLocal usage, it's look like easy)

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文