前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >从执行上下文角度重新理解.NET(Core)的多线程编程[1]:基于调用链的”参数”传递

从执行上下文角度重新理解.NET(Core)的多线程编程[1]:基于调用链的”参数”传递

作者头像
蒋金楠
发布2020-11-26 17:21:13
1.3K0
发布2020-11-26 17:21:13
举报
文章被收录于专栏:大内老A

线程是操作系统能够进行运算调度的最小单位,操作系统线程进一步被封装成托管的Thread对象,手工创建并管理Thread对象已经成为了所能做到的对线程最细粒度的控制了。后来我们有了ThreadPool,可以更加方便地以池化的方式来使用线程。最后,Task诞生,它结合async/await关键字给与我们完美异步编程模式。但这一切让我们的编程体验越来越好,但是离线程的本质越来越远。被系列文章从“执行上下文传播”这个令开发者相对熟悉的角度来聊聊重新认识我们似乎已经很熟悉的主题。

目录 一、ThreadStatic字段或者ThreadLocal<T>对象 二、CallContext 三、支持跨线程传递吗? 四、IllogicalCallContext和LogicalCallContext 五、AsyncLocal<T>

一、ThreadStatic字段或者ThreadLocal<T>对象

本篇文章旨在解决一个问题:对于一个由多个方法组成的调用链,数据如何在上下游方法之间传递。我想很多人首先想到的就是通过方法的参数进行传递,但是作为方法签名重要组成部分的参数列表代表一种“契约”,往往是不能轻易更改的。既然不能通过参数直接进行传递,那么我们需要一个“共享”的数据容器,上游方法将需要传递的数据放到这个容器中,下游方法在使用的时候从该容器中将所需的数据提取出来。

那么这个共享的容器可以是一个静态字段,当然不行, 因为类型的静态字段类似于一个单例对象,它会被多个并发执行的调用链共享。虽然普通的静态字段不行,但是标注了ThreadStaticAttribute特性的静态字段则可以,因为这样的字段是线程独享的。为了方便演示,我们定义了如下一个CallStackContext类型来表示基于某个调用链的上下文,这是一个字典,用于存放任何需要传递的数据。自增的TraceId字段代码当前调用链的唯一标识。当前的CallStackContext上下文通过静态属性Current获取,可以看出它返回标注了ThreadStaticAttribute特性的静态字段_current。

代码语言:javascript
复制
public class CallStackContext : Dictionary<string, object>
{
    [ThreadStatic]
    private static CallStackContext _current;
    private static int _traceId = 0;
    public static CallStackContext Current { get => _current; set => _current = value; }
    public long TraceId { get; } = Interlocked.Increment(ref _traceId);
}

我们通过如下这个CallStack对象创建一个“逻辑”上的调用链。在初始化的时候,CallStack会创建一个CallStackContext对象并将其放进CallContext对象并对静态字段_current进行复制。该字段会在Dispose方法中被置空,此时标志逻辑调用链生命周期的终止。

代码语言:javascript
复制
public class CallStack : IDisposable
{
        public CallStack() => CallStackContext.Current = new CallStackContext();
        public void Dispose() => CallStackContext.Current = null;
}

我们通过如下的程序来演示针对CallStack和CallStackContext的使用。如代码片段所示,我们利用对象池并发调用Call方法。Call方法内部会依次调用Foo、Bar和Baz三个方法,需要传递的数据体现为一个Guid,我们将当存放在当前CallStackContext中。整个方法Call方法的操作均在创建Callback的using block中执行。

代码语言:javascript
复制
class Program
{
    static void Main()
    {
        for (int i = 0; i < 5; i++)
        {
            ThreadPool.QueueUserWorkItem(_ => Call());
        }
        Console.Read();
    }
    static void Call()
    {
        using (new CallStack())
        {
            CallStackContext.Current["argument"] = Guid.NewGuid();
            Foo();
            Bar();
            Baz();
        }
    }
    static void Foo() => Trace();
    static void Bar() => Trace();
    static void Baz() => Trace();
    static void Trace([CallerMemberName] string methodName = null)
    {
        var threadId = Thread.CurrentThread.ManagedThreadId;
        var traceId = CallStackContext.Current?.TraceId;
        var argument = CallStackContext.Current?["argument"];
        Console.WriteLine($"Thread: {threadId}; TraceId: {traceId}; Method: {methodName}; Argument:{argument}");
    }
}

为了验证三个方法获取的数据是否正确,我们让它们调用同一个Trace方法,该方法会在控制台上打印出当前线程ID、调用链标识(TraceId)、方法名和获取到的数据。如下所示的是该演示程序执行后的结果,可以看出置于CallContext中的CallStackContext对象帮助我们很好地完成了针对调用链的数据传递。

既然我们可以使用ThreadStatic静态字段,自然也可以使用ThreadLocal<T>对象来代替。如果希望时候后者,我们只需要将CallStackContext改写成如下的形式即可。

代码语言:javascript
复制
public class CallStackContext : Dictionary<string, object>
{
    private static ThreadLocal<CallStackContext> _current = new ThreadLocal<CallStackContext>();
    private static int _traceId = 0;
    public static CallStackContext Current { get => _current.Value; set => _current.Value = value; }
    public long TraceId { get; } = Interlocked.Increment(ref _traceId);
}

二、CallContext

除使用ThreadStatic字段来传递调用链数据之外,我们还可以使用CallContext。顾名思义,CallContext是专门为调用链创建的上下文,我们首先利用它来实现基于调用链的数据传递。如果采用这种解决方案,上述的CallStack和CallStackContext类型可以改写成如下的形式。如代码片段所示,当前的CallStackContext上下文通过静态属性Current获取,可以看出它是通过调用CallContext的静态方法GetData提取的,传入的类型名称作为存放“插槽”的名称。在初始化的时候,CallStack会创建一个CallStackContext对象并将其放进CallContext对应存储插槽中作为当前上下文,该插槽会在Dispose方法中被释放

代码语言:javascript
复制
public class CallStackContext: Dictionary<string, object>
{
    private static  int _traceId = 0;
    public static CallStackContext Current => CallContext.GetData(nameof(CallStackContext)) as CallStackContext;
    public long TraceId { get; } = Interlocked.Increment(ref _traceId);
}

三、支持跨线程传递吗?

对于上面演示的实例来说,调用链中的三个方法(Foo、Bar和Baz)均是在同一个线程中执行的,如果出现了跨线程调用,CallContext是否还能帮助我们实现上下文的快线程传递吗?为了验证CallContext跨线程传递的能力,我们将Call方法改写成如下的形式:Call方法直接调用Foo方法,但是Foo方法针对Bar方法的调用,以及Bar方法针对Baz方法的调用均在一个新创建的线程中进行的。

代码语言:javascript
复制
static void Call()
{
    using (new CallStack())
    {
        CallStackContext.Current["argument"] = Guid.NewGuid();
        Foo();
    }
}
static void Foo()
{
    Trace();
    new Thread(Bar).Start();
}
static void Bar()
{
    Trace();
    new Thread(Baz).Start();
}
static void Baz() => Trace();

再次执行我们我们的程序,不论是采用基于ThreadStatic静态字段,还是采用ThreadLocal<T>对象或者CallContext的解决方法,均会得到如下所示的输出结果。可以看出设置的数据只能在Foo方法中获取到,但是并没有自动传递到异步执行的Bar和Baz方法中。

四、IllogicalCallContext和LogicalCallContext

其实CallContext设置的上下文对象分为IllogicalCallContext和LogicalCallContext两种类型,调用SetData设置的是IllogicalCallContext,它并不具有跨线程传播的能力。如果希望在进行异步调用的时候自动传递到目标线程,必须调用CallContext的LogicalSetData方法设置为LogicalCallContext。所以我们应该将CallStack类型进行如下的改写。

代码语言:javascript
复制
public class CallStack : IDisposable
{
    public CallStack() => CallContext.LogicalSetData(nameof(CallStackContext), new CallStackContext());
    public void Dispose() => CallContext.FreeNamedDataSlot(nameof(CallStackContext));
}

与之相对,获取LogicalCallContext对象的方法也得换成LogicalGetData,为此我们将CallStackContext改写成如下的形式。

代码语言:javascript
复制
public class CallStackContext: Dictionary<string, object>
{
    private static  int _traceId = 0;
    public static CallStackContext Current => CallContext.LogicalGetData(nameof(CallStackContext)) as CallStackContext;
    public long TraceId { get; } = Interlocked.Increment(ref _traceId);
}

再次执行我们程序,依然能够得到希望的结果。

除了将设置和提取当前CallStackContext的方式进行修改(GetData=>LogicalGet; SetData=>LogicalSetData)之外,我们还有另一个解决方案,那就是让放存放在CallContext存储槽的数据类型实现ILogicalThreadAffinative接口。该接口没有定义任何成员,实现类型对应的对象将自动视为LogicalCallContext。对于我们的演示实例来说,我们只需要让CallStackContext实现该接口就可以了。

代码语言:javascript
复制
public class CallStackContext: Dictionary<string, object>, ILogicalThreadAffinative
{
    private static  int _traceId = 0;
    public static CallStackContext Current => CallContext.GetData(nameof(CallStackContext)) as CallStackContext;
    public long TraceId { get; } = Interlocked.Increment(ref _traceId);
}

五、AsyncLocal<T>

CallContext并没有被.NET Core继承下来。也就是,只有.NET Framework才提供针对CallContext的支持,.因为我们有更好的选择,那就是AsyncLocal<T>。如果使用AsyncLocal<T>作为存放调用链上下文的容器,我们的

代码语言:javascript
复制
public class CallStackContext: Dictionary<string, object>, ILogicalThreadAffinative
{
    internal static readonly AsyncLocal<CallStackContext> _contextAccessor = new AsyncLocal<CallStackContext>();
    private static  int _traceId = 0;
    public static CallStackContext Current => _contextAccessor.Value;
    public long TraceId { get; } = Interlocked.Increment(ref _traceId);
}

public class CallStack : IDisposable
{       
    public CallStack() => CallStackContext._contextAccessor.Value = new CallStackContext();
    public void Dispose() => CallStackContext._contextAccessor.Value = null;
}

既然命名为AsyncLocal<T>,自然是支持异步调用。它不仅支持上面演示的直接创建线程的方式,最主要的是支持我们熟悉的await的方式(如下所示)。

代码语言:javascript
复制
class Program
{
    static async Task Main(string[] args)
    {
        for (int i = 0; i < 5; i++)
        {
            ThreadPool.QueueUserWorkItem(_ => Call());
        }
        Console.Read();
        Console.Read();

        async Task Call()
        {
            using (new CallStack())
            {
                CallStackContext.Current["argument"] = Guid.NewGuid();
                await FooAsync();
                await BarAsync();
                await BazAsync();
            }
        }
    }
    static Task FooAsync() => Task.Run(() => Trace());
    static Task BarAsync() => Task.Run(() => Trace());
    static Task BazAsync() => Task.Run(() => Trace());
    static void Trace([CallerMemberName] string methodName = null)
    {
        var threadId = Thread.CurrentThread.ManagedThreadId;
        var traceId = CallStackContext.Current?.TraceId;
        var argument = CallStackContext.Current?["argument"];
        Console.WriteLine($"Thread: {threadId}; TraceId: {traceId}; Method: {methodName}; Argument:{argument}");
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-11-25 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、ThreadStatic字段或者ThreadLocal<T>对象
  • 二、CallContext
  • 三、支持跨线程传递吗?
  • 四、IllogicalCallContext和LogicalCallContext
  • 五、AsyncLocal<T>
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档