• Home
  • Articles
    • 日志
    • 妍小言
    • 舒小书
    • 浩然说
    • 生活日记
  • All Tags

上下文信息传递

05 Aug 2019

Reading time ~1 minute

线程上下文信息的传递能够帮助应用将完整的请求进行串联。对于Java直接新建线程,线程上下文信息的传递存在一定的问题。

存在的问题

  1. Java提供InheritableThreadLocal用来传递父线程的上下文信息,但在线程池场景下,这种方式则会失效。
  2. 对于两种不同的线程启动方式(直接新建并启动和提交至线程池)场景,最好能够存在一致的线程上下文信息传递的方案。
  3. 自动传递,上下文信息的传递不是业务人员需要关心的问题。

解决思路

  • 不继续在线程方向上进行扩展,因为无论是直接新建还是使用线程池,线程都存在一定的复杂度,并且JDK已经提供完整的实现,通过继承的方式进行扩展,不利于自动传递。
  • 无论是新建线程还是使用线程是,一般都会传递Runnable的实现作为线程的执行逻辑

思路

  • 能够扩展Runnable的字节码,在Runnable初始化时将线程上下文信息存储到Runnable的成员变量,run()方法添加before环绕,在before逻辑中,将成员变量的信息保存到线程上下文中。
  • 因为Runnable构造方法的调用是在父线程中,run()方法是在子线程中调用,这样处理,可以利用runnable来实现上下文信息的传递。
  • 使用runnable扩展方案,无论是主动创建线程还是使用线程池,都可以解决上下文信息传递的问题(线程池场景下需同时扩展callable)

具体实现

需要对Runnable字节码层面进行扩展,一般有两种编织手段,编译时编织和运行时编织,我们平常经常使用的比较多的Spring AOP就是运行时编织,但这存在一种限制,Bean的实例化需要交由容器来实现,主动创建出来的实例无法进行字节码扩展。

因为Runnable绝大多数的使用方式都是直接创建匿名内部类,所以编织方式不适合运行时编织,采用编译时编织,使用AspectJ对其进行代理,AJC进行编译。

以下是我通过AspectJ对Runnable和Callable字节码扩展的实现:

public aspect RunnableTransactionPropagationAspect {

    public interface RunnableTransactionPropagation {}
    private String RunnableTransactionPropagation.xid = null;

    public void RunnableTransactionPropagation.init() {
        this.xid = Thread.currentThread().getName();
    }
    public String RunnableTransactionPropagation.xid() {
        return this.xid;
    }

    declare parents : java.lang.Runnable+ implements RunnableTransactionPropagation;

    pointcut init() : execution(java.lang.Runnable+.new(..));
    pointcut cut(): (execution(* *..run()) && this(java.lang.Runnable));

    after(RunnableTransactionPropagation m): init() && this(m) {
        m.init();
    }

    before(RunnableTransactionPropagation m): cut() && this(m) {
        String xid = m.xid();
        if (null != xid) {
            System.out.println(xid);
        }
    }
}

public aspect CallableTransactionPropagationAspect {

    public interface CallableTransactionPropagation {}
    private String CallableTransactionPropagation.xid = null;

    public void CallableTransactionPropagation.init() {
        this.xid = Thread.currentThread().getName();
    }
    public String CallableTransactionPropagation.xid() {
        return this.xid;
    }

    declare parents : java.util.concurrent.Callable+ implements CallableTransactionPropagation;

    pointcut init() : execution(java.util.concurrent.Callable+.new(..));
    pointcut cut(): (execution(* *..call()));

    after(CallableTransactionPropagation m): init() && this(m) {
        m.init();
    }

    before(CallableTransactionPropagation m): cut() && this(m) {
        String xid = m.xid();
        if (null != xid) {
            System.out.println(xid);
        }
    }
}


thread