如何使外部方法可中断?

发布于 2024-10-09 13:23:30 字数 1911 浏览 14 评论 0 原文

问题

我正在通过 ExecutorService。我希望能够中断这些方法,但不幸的是它们本身不检查中断标志。有什么方法可以强制从这些方法引发异常吗?

我知道从任意位置抛出异常是 潜在的危险,就我的具体情况而言,我愿意抓住这个机会并准备好应对后果。

详细信息

“外部方法”是指来自外部库的一些方法,并且我无法修改其代码(我可以,但这将使其成为每当发布新版本时的维护噩梦)。

外部方法的计算成本很高,不受 IO 限制,因此它们不响应常规中断,并且我无法强制关闭通道或套接字或其他东西。正如我之前提到的,它们也不检查中断标志。

该代码在概念上类似于:

// my code
public void myMethod() {
    Object o = externalMethod(x);
}

// External code
public class ExternalLibrary {
    public Object externalMethod(Object) {
        innerMethod1();
        innerMethod1();
        innerMethod1();
    }

    private void innerMethod1() {
        innerMethod2();
        // computationally intensive operations
    }

    private void innerMethod2() {
        // computationally intensive operations
    }
}

我尝试过的

Thread.stop() 理论上会做我想做的事,但它不仅已被弃用,而且仅适用于实际线程,而我' m 使用执行器任务(也可能与未来的任务共享线程,例如在线程池中工作时)。尽管如此,如果没有找到更好的解决方案,我会将我的代码转换为使用老式线程并使用此方法。

我尝试过的另一个选择是使用特殊的“Interruptable”注释来标记 myMethod() 和类似的方法,然后使用 AspectJ (我承认我是新手)来捕获那里的所有方法调用 - 一些就像:

@Before("call(* *.*(..)) && withincode(@Interruptable * *.*(..))")
public void checkInterrupt(JoinPoint thisJoinPoint) {
    if (Thread.interrupted()) throw new ForcefulInterruption();
}

但是 withincode 不会递归到匹配方法调用的方法,因此我必须将此注释编辑到外部代码中。

最后,这类似于我之前的问题 - 尽管一个显着的区别是现在我正在处理一个外部库。

The Problem

I'm running multiple invocations of some external method via an ExecutorService. I would like to be able to interrupt these methods, but unfortunately they do not check the interrupt flag by themselves. Is there any way I can force an exception to be raised from these methods?

I am aware that throwing an exception from an arbitrary location is potentially dangerous, in my specific case I am willing to take this chance and prepared to deal with the consequences.

Details

By "external method" I mean some method(s) that come from an external library, and I cannot modify its code (well I can, but that will make it a maintenance nightmare whenever a new version is released).

The external methods are computationally expensive, not IO-bound, so they don't respond to regular interrupts and I can't forcefully close a channel or a socket or something. As I've mentioned before, they also do not check the interrupt flag.

The code is conceptually something like:

// my code
public void myMethod() {
    Object o = externalMethod(x);
}

// External code
public class ExternalLibrary {
    public Object externalMethod(Object) {
        innerMethod1();
        innerMethod1();
        innerMethod1();
    }

    private void innerMethod1() {
        innerMethod2();
        // computationally intensive operations
    }

    private void innerMethod2() {
        // computationally intensive operations
    }
}

What I've Tried

Thread.stop() will theoretically do what I want, but not only is it deprecated but it is also only available for actual threads, while I'm working with executor tasks (which might also share threads with future tasks, for example when working in a thread pool). Nevertheless, if no better solution is found, I will convert my code to use old-fashioned Threads instead and use this method.

Another option I've tried is to mark myMethod() and similar methods with a special "Interruptable" annotation and then use AspectJ (which I am admittedly a newbie at) for catching all method invocations there - something like:

@Before("call(* *.*(..)) && withincode(@Interruptable * *.*(..))")
public void checkInterrupt(JoinPoint thisJoinPoint) {
    if (Thread.interrupted()) throw new ForcefulInterruption();
}

But withincode isn't recursive to methods called by the matching methods, so I would have to edit this annotation into the external code.

Finally, this is similar to a previous question of mine - though a notable difference is that now I'm dealing with an external library.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(8

花期渐远 2024-10-16 13:23:30

我想到了以下奇怪的想法:

  • 使用字节码修改库,例如 Javassist,在字节码内的各个点引入中断检查。仅仅在方法的开头可能还不够,因为您提到这些外部方法不是递归的,所以您可能想在任何时候强制停止它们。在字节码级别执行此操作也将使其非常敏感,例如,即使外部代码在循环或其他内容中运行,也可以引入中断检查。然而,这会增加一些开销,因此整体性能会变慢。
  • 为外部代码启动单独的进程(例如单独的虚拟机)。中止进程可能比其他解决方案更容易编写代码。缺点是您需要在外部代码和您的代码之间建立某种通信通道,例如 IPC、套接字等。第二个缺点是您需要更多的资源(CPU、内存)来启动新的 VM,并且可能会环境特定。如果您使用外部代码启动几个任务,而不是数百个任务,那么这将起作用。此外,性能会受到影响,但计算本身将与原始计算一样快。可以使用 java.lang.Process.destroy() 强制停止进程,
  • 使用自定义 SecurityManager,它对每个 checkXXX 方法执行中断检查。如果外部代码以某种方式调用特权方法,那么您在这些位置中止可能就足够了。如果外部代码定期读取系统属性,则示例为 java.lang.SecurityManager.checkPropertyAccess(String)。

The following weird ideas come to my mind:

  • Using a byte code modification library, such as Javassist, to introduce the interrupt-checks at various points within the bytecode. Just at the beginning of methods may not be enough, since you mention that these external methods are not recursive, so you may want to forcefully stop them at any point. Doing this at the byte code level would also make it very responsive, e.g. even if the external code is running within a loop or something, it would be possible to introduce the interrupt checks. However, this will add some overhead, so overall performance will be slower.
  • Launching separate processes (e.g. separate VMs) for the external code. Aborting processes may be much easier to code than the other solutions. The disadvantage is that you would need some sort of communication channel between the external code and your code, e.g. IPC, sockets etc. The second disadvantage is that you need much more resources (CPU, memory) to start up new VMs and it may be environment specific. This would work if you start a couple of tasks using the external code, but not hundreds of tasks. Also, the performance would suffer, but the computation itself would be as fast as the original. Processes can be forcefully stopped by using java.lang.Process.destroy()
  • Using a custom SecurityManager, which performs the interrupt check on each of the checkXXX methods. If the external code somehow calls privileged methods, it may be sufficient for you to abort at these locations. An example would be java.lang.SecurityManager.checkPropertyAccess(String) if the external code periodically reads a system property.
心的位置 2024-10-16 13:23:30

这个解决方案也不容易,但它可以工作:使用 Javassist 或 CGLIB,您可以在每个内部方法(可能由主 run() 方法调用的方法)的开头插入代码来检查线程是否处于活动状态,或其他一些标志(如果是其他标志,您还必须添加它以及设置它的方法)。

我建议使用 Javassist/CGLIB,而不是通过代码扩展类,因为您提到它是外部的,并且您不想更改源代码,并且将来可能会更改。因此,即使内部方法名称(或其参数、返回值等)发生变化,在运行时添加中断检查也适用于当前版本以及未来版本。您只需获取该类并在每个非 run() 方法的方法的开头添加中断检查。

This solution isn't easy either, but it could work: Using Javassist or CGLIB, you can insert code at the beginning of each internal method (the ones presumably being called by the main run() method) to check if the thread is alive, or some other flag (if it's some other flag, you'll have to add it as well, along with a method to set it).

I'm proposing Javassist/CGLIB instead of extending the class through code because you mention it's external and you don't want to change the source code, and it may change in the future. So adding the interrupt checks at runtime will work for the current version and also in future versions even if the internal method names change (or their parameters, return values, etc). You just have to take the class and add interrupt checks at the beginning of each method that is not the run() method.

拧巴小姐 2024-10-16 13:23:30

一个选项是:

  1. 使用 JDI 让 VM 连接到自身。
  2. 查找正在运行任务的线程。这并不简单,但由于您可以访问所有堆栈帧,因此它当然是可行的。 (如果您在任务对象中放置唯一的 id 字段,您将能够识别正在执行它的线程。)
  3. 异步停止线程。

尽管我不认为停止的线程会严重干扰执行器(毕竟它们应该是故障安全的),但还有一种不涉及停止线程的替代解决方案。

假设您的任务不会修改系统其他部分的任何内容(这是一个合理的假设,否则您不会尝试将它们击落),您可以做的是使用 JDI 弹出不需要的堆栈帧并正常退出任务。

public class StoppableTask implements Runnable {

private boolean stopped;
private Runnable targetTask;
private volatile Thread runner;
private String id;

public StoppableTask(TestTask targetTask) {
    this.targetTask = targetTask;
    this.id = UUID.randomUUID().toString();
}

@Override
public void run() {
    if( !stopped ) {
        runner = Thread.currentThread();
        targetTask.run();
    } else {
        System.out.println( "Task "+id+" stopped.");
    }
}

public Thread getRunner() {
    return runner;
}

public String getId() {
    return id;
}
}

这是包装所有其他可运行对象的可运行对象。它存储对执行线程的引用(稍后会很重要)和 id,以便我们可以通过 JDI 调用找到它。

public class Main {

public static void main(String[] args) throws IOException, IllegalConnectorArgumentsException, InterruptedException, IncompatibleThreadStateException, InvalidTypeException, ClassNotLoadedException {
    //connect to the virtual machine
    VirtualMachineManager manager = Bootstrap.virtualMachineManager();
    VirtualMachine vm = null;
    for( AttachingConnector con : manager.attachingConnectors() ) {
        if( con instanceof SocketAttachingConnector ) {
            SocketAttachingConnector smac = (SocketAttachingConnector)con;
            Map<String,? extends Connector.Argument> arg = smac.defaultArguments();
            arg.get( "port" ).setValue( "8000");
            arg.get( "hostname" ).setValue( "localhost" );
            vm = smac.attach( arg );
        }
    }

    //start the test task
    ExecutorService service = Executors.newCachedThreadPool();
    StoppableTask task = new StoppableTask( new TestTask() );
    service.execute( task );
    Thread.sleep( 1000 );

    // iterate over all the threads
    for( ThreadReference thread : vm.allThreads() ) {
        //iterate over all the objects referencing the thread
        //could take a long time, limiting the number of referring
        //objects scanned is possible though, as not many objects will
        //reference our runner thread
        for( ObjectReference ob : thread.referringObjects( 0 ) ) {
            //this cast is safe, as no primitive values can reference a thread
            ReferenceType obType = (ReferenceType)ob.type();
            //if thread is referenced by a stoppable task
            if( obType.name().equals( StoppableTask.class.getName() ) ) {

                StringReference taskId = (StringReference)ob.getValue( obType.fieldByName( "id" ));

                if( task.getId().equals( taskId.value() ) ) {
                    //task with matching id found
                    System.out.println( "Task "+task.getId()+" found.");

                    //suspend thread
                    thread.suspend();

                    Iterator<StackFrame> it = thread.frames().iterator();
                    while( it.hasNext() ) {
                        StackFrame frame = it.next();
                        //find stack frame containing StoppableTask.run()
                        if( ob.equals( frame.thisObject() ) ) {
                            //pop all frames up to the frame below run()
                            thread.popFrames( it.next() );
                            //set stopped to true
                            ob.setValue( obType.fieldByName( "stopped") , vm.mirrorOf( true ) );
                            break;
                        }
                    }
                    //resume thread
                    thread.resume();

                }

            }
        }
    }

}
}

作为参考,我测试了它的“库”调用:

public class TestTask implements Runnable {

    @Override
    public void run() {
        long l = 0;
        while( true ) {
            l++;
            if( l % 1000000L == 0 )
                System.out.print( ".");
        }

    }
}

您可以通过使用命令行选项 -agentlib:jdwp=transport=dt_socket,server= 启动 Main 类来尝试一下。 y,地址=本地主机:8000,超时= 5000,挂起= n 。它的工作原理有两个警告。首先,如果正在执行本机代码(帧的 thisObject 为空),则必须等待它完成。其次,finally 块不会被调用,因此各种资源可能会泄漏。

An option is to:

  1. Make the VM connect to itself using JDI.
  2. Look up the thread that's running your task. This isn't trivial, but since you have access to all the stack frames, it is certainly doable. (If you put a unique id field in your task objects, you will be able to identify the thread that is executing it.)
  3. Stop the thread asynchronously.

Although I don't think a stopped thread would seriously interfere with executors (they should be fail-safe after all), there is an alternative solution that doesn't involve stopping the thread.

Provided that your tasks don't modify anything in other parts of the system (which is a fair assumption, otherwise you wouldn't be trying to shoot them down), what you can do is to use JDI to pop unwanted stack frames off and exit the task normally.

public class StoppableTask implements Runnable {

private boolean stopped;
private Runnable targetTask;
private volatile Thread runner;
private String id;

public StoppableTask(TestTask targetTask) {
    this.targetTask = targetTask;
    this.id = UUID.randomUUID().toString();
}

@Override
public void run() {
    if( !stopped ) {
        runner = Thread.currentThread();
        targetTask.run();
    } else {
        System.out.println( "Task "+id+" stopped.");
    }
}

public Thread getRunner() {
    return runner;
}

public String getId() {
    return id;
}
}

This is the runnable that wraps all your other runnables. It stores a reference to the executing thread (will be important later) and an id so we can find it with a JDI call.

public class Main {

public static void main(String[] args) throws IOException, IllegalConnectorArgumentsException, InterruptedException, IncompatibleThreadStateException, InvalidTypeException, ClassNotLoadedException {
    //connect to the virtual machine
    VirtualMachineManager manager = Bootstrap.virtualMachineManager();
    VirtualMachine vm = null;
    for( AttachingConnector con : manager.attachingConnectors() ) {
        if( con instanceof SocketAttachingConnector ) {
            SocketAttachingConnector smac = (SocketAttachingConnector)con;
            Map<String,? extends Connector.Argument> arg = smac.defaultArguments();
            arg.get( "port" ).setValue( "8000");
            arg.get( "hostname" ).setValue( "localhost" );
            vm = smac.attach( arg );
        }
    }

    //start the test task
    ExecutorService service = Executors.newCachedThreadPool();
    StoppableTask task = new StoppableTask( new TestTask() );
    service.execute( task );
    Thread.sleep( 1000 );

    // iterate over all the threads
    for( ThreadReference thread : vm.allThreads() ) {
        //iterate over all the objects referencing the thread
        //could take a long time, limiting the number of referring
        //objects scanned is possible though, as not many objects will
        //reference our runner thread
        for( ObjectReference ob : thread.referringObjects( 0 ) ) {
            //this cast is safe, as no primitive values can reference a thread
            ReferenceType obType = (ReferenceType)ob.type();
            //if thread is referenced by a stoppable task
            if( obType.name().equals( StoppableTask.class.getName() ) ) {

                StringReference taskId = (StringReference)ob.getValue( obType.fieldByName( "id" ));

                if( task.getId().equals( taskId.value() ) ) {
                    //task with matching id found
                    System.out.println( "Task "+task.getId()+" found.");

                    //suspend thread
                    thread.suspend();

                    Iterator<StackFrame> it = thread.frames().iterator();
                    while( it.hasNext() ) {
                        StackFrame frame = it.next();
                        //find stack frame containing StoppableTask.run()
                        if( ob.equals( frame.thisObject() ) ) {
                            //pop all frames up to the frame below run()
                            thread.popFrames( it.next() );
                            //set stopped to true
                            ob.setValue( obType.fieldByName( "stopped") , vm.mirrorOf( true ) );
                            break;
                        }
                    }
                    //resume thread
                    thread.resume();

                }

            }
        }
    }

}
}

And for reference, the "library" call I tested it with:

public class TestTask implements Runnable {

    @Override
    public void run() {
        long l = 0;
        while( true ) {
            l++;
            if( l % 1000000L == 0 )
                System.out.print( ".");
        }

    }
}

You can try it out by launching the Main class with the command line option -agentlib:jdwp=transport=dt_socket,server=y,address=localhost:8000,timeout=5000,suspend=n. It works with two caveats. Firstly, if there is native code being executed (thisObject of a frame is null), you'll have to wait until it's finished. Secondly, finally blocks are not invoked, so various resources may potentially be leaking.

孤千羽 2024-10-16 13:23:30

你写道:

我尝试过的另一个选项是标记 myMethod() 和类似的方法
带有特殊的“可中断”注释,然后使用 AspectJ (
我承认我是一个新手),用于捕获那里的所有方法调用
- 类似于:

@Before("call(* *.*(..)) && insidecode(@Interruptable * *.*(..))")
公共无效checkInterrupt(JoinPoint thisJoinPoint){
    if (Thread.interrupted()) 抛出新的 ForcefulInterruption();
}

但是 withincode 不会递归到匹配调用的方法
方法,所以我必须将此注释编辑到外部
代码。

AspectJ 的想法很好,但是您需要

  • 使用 cflow()cflowbelow() 以便递归地匹配某个控制流(例如类似 @Before (“cflow(执行(@Interruptable * *(..)))”))。
  • 确保还编写您的外部库,而不仅仅是您自己的代码。这可以通过使用二进制编织、检测 JAR 文件的类并将它们重新打包到新的 JAR 文件中或通过在应用程序启动期间(即在类加载期间)应用 LTW(加载时编织)来完成。

如果您的外部库具有可以使用 within() 精确定位的包名称,您甚至可能不需要标记注释。 AspectJ 确实很强大,而且通常有不止一种方法可以解决一个问题。我建议使用它,因为它是为您这样的努力而设计的。

You wrote:

Another option I've tried is to mark myMethod() and similar methods
with a special "Interruptable" annotation and then use AspectJ (which
I am admittedly a newbie at) for catching all method invocations there
- something like:

@Before("call(* *.*(..)) && withincode(@Interruptable * *.*(..))")
public void checkInterrupt(JoinPoint thisJoinPoint) {
    if (Thread.interrupted()) throw new ForcefulInterruption();
}

But withincode isn't recursive to methods called by the matching
methods, so I would have to edit this annotation into the external
code.

The AspectJ idea is good, but you need to

  • use cflow() or cflowbelow() in order to recursively match a certain control flow (e.g. something like @Before("cflow(execution(@Interruptable * *(..)))")).
  • make sure to also weave your external library, not just you own code. This can be done by either using binary weaving, instrumenting the JAR file's classes and re-packaging them into a new JAR file, or by applying LTW (load-time weaving) during application start-up (i.e. during class loading).

You might not even need your marker annotation if your external library has a package name you can pinpoint with within(). AspectJ is really powerful, and often there is more than one way to solve a problem. I would recommend using it because it was made for such endeavours as yours.

无力看清 2024-10-16 13:23:30

我为我的问题找到了一个丑陋的解决方案。它并不漂亮,但它适用于我的情况,所以我将其发布在这里,以防对其他人有所帮助。

我所做的是分析应用程序的库部分,希望能够隔离一小组重复调用的方法 - 例如一些get方法或 equals() 或类似的东西;然后我可以在那里插入以下代码段:

if (Thread.interrupted()) {
    // Not really necessary, but could help if the library does check it itself in some other place:
    Thread.currentThread().interrupt();
    // Wrapping the checked InterruptedException because the signature doesn't declare it:
    throw new RuntimeException(new InterruptedException());
}

要么通过编辑库的代码手动插入,要么通过编写适当的方面自动插入。请注意,如果库尝试捕获并吞下 RuntimeException,则抛出的异常可能会被库不尝试捕获的其他异常替换。

对我来说幸运的是,使用 VisualVM,我能够找到一个单一方法,称为在我对库的具体使用过程中,出现的次数非常多。添加上述代码段后,现在可以正确响应中断了。

这当然是不可维护的,而且没有什么能真正保证库会在其他场景中重复调用此方法;但它对我有用,并且由于分析其他应用程序并在那里插入检查相对容易,我认为这是一个通用的(尽管丑陋的)解决方案。

I have hacked an ugly solution to my problem. It's not pretty, but it works in my case, so I'm posting it here in case it will help anyone else.

What I did was profile the library parts of my application, hoping that I could isolate a small group of methods which are called repeatedly - for example some get methods or equals() or something along these lines; and then I could insert the following code segment there:

if (Thread.interrupted()) {
    // Not really necessary, but could help if the library does check it itself in some other place:
    Thread.currentThread().interrupt();
    // Wrapping the checked InterruptedException because the signature doesn't declare it:
    throw new RuntimeException(new InterruptedException());
}

Either inserting it manually by editing the library's code, or automatically by writing an appropriate aspect. Notice that if the library attempts to catch and swallow a RuntimeException, the thrown exception could be replaced with something else the library doesn't try to catch.

Luckily for me, using VisualVM, I was able to find a single method called a very high number of times during the specific usage I was making of the library. After adding the above code segment, it now properly responds to interrupts.

This is of course not maintainable, plus nothing really guarantees the library will call this method repeatedly in other scenarios; but it worked for me, and since it's relatively easy to profile other applications and insert the checks there, I consider this a generic, if ugly, solution.

祁梦 2024-10-16 13:23:30

如果内部方法具有相似的名称,那么您可以使用 xml (spring/AspectJ) 中的切入点定义而不是注释,因此不需要对外部库进行代码修改。

If internal methods have similar names, then you could use pointcut definition in xml (spring/AspectJ) instead of annotations so no code modification of the external library should be needed.

尸血腥色 2024-10-16 13:23:30

就像 mhaller 所说,最好的选择是启动一个新的流程。由于您的工作不那么合作,因此您永远无法保证线程终止。

解决您的问题的一个很好的解决方案是使用支持“轻量级线程”的任意暂停/停止的库,例如 Akka 代替执行器服务,尽管这可能有点大材小用。

虽然我从未使用过 Akka 并且无法确认它是否按您的预期工作,但文档指出有一个 stop() 用于停止 Actor 的方法。

Like mhaller said, the best option is to launch a new Process. Since your jobs are not that cooperative, you will never have guarantees on the Thread termination.

A nice solution to your problem would be using a library that supports arbitrary pause/stop of ' lightweight threads' such as Akka instead of the executor service, although this may be a bit of an overkill.

Although I've never used Akka and cannot confirm it works as you expect, the docs state there's a stop() method for stopping actors.

拥抱我好吗 2024-10-16 13:23:30

据我所知,有两种使用方面

  • AspecJ 的
  • 方法Spring AOP

AspectJ 是一个定制的编译器,它拦截正在编译的方法(这意味着它无法获取“外部方法”。
Spring AOP(默认情况下)使用类代理在运行时拦截方法(因此它可以拦截“外部方法”。但是 Spring AOP 的问题是它无法代理已经代理的类(AspectJ 可以做到这一点,因为它不代理类)我认为 AspectJ 在这种情况下可以帮助你。

To my knowledge there are two ways of using aspect

  • AspecJ
  • Spring AOP

AspectJ is a customized compiler which intercepts methods that is compiles (meaning it can't get "external methods".
Spring AOP (by default) uses class proxying to intercept methods at runtime (so it can intercept "external methods". but the problem with Spring AOP is that it can't proxy already-proxied classes (which AspectJ can do because it does not proxy classes). I think AspectJ could help you in this case.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文