当前位置: 首页>前端>正文

pig微服务快速开发框架搭建 微服务开发首选框架


donet 微服务开发 学习-AOP框架基础

  • 目的介绍
  • AOP框架基础
  • 创建简单的熔断降级框架
  • 细化框架
  • 结合asp.net core依赖注入
  • 升级一波


目的介绍

donet 微服务开发 学习

AOP框架基础

如果直接使用Polly,那么就会造成业务代码中混杂大量的业务无关代码。我们使用AOP(如果不了解AOP,请自行参考网上资料)的方式封装一个简单的框架,模仿Spring cloud中的Hystrix。

需要先引入一个支持.Net Core的AOP,目前我发现的最好的.Net Core下的AOP框架是AspectCore(国产,动态织入),其他要不就是不支持.Net Core,要不就是不支持对异步方法进行拦截。MVC Filter

GitHub:https://github.com/dotnetcore/AspectCore-Framework

Install-Package AspectCore.Core

新建控制台项目aoptest1,并添加AspectCore.Core包引用

编写拦截器CustomInterceptorAttribute.cs,一般继承自AbstractInterceptorAttribute

public class CustomInterceptorAttribute : AbstractInterceptorAttribute
{
    //每个被拦截的方法中执行
    public async override Task Invoke(AspectContext context, AspectDelegate next)
    {
        try
        {
            Console.WriteLine("Before service call");
            await next(context);
        }
        catch (Exception)
        {
            Console.WriteLine("Service threw an exception!");
            throw;
        }
        finally
        {
            Console.WriteLine("After service call");
        }
    }
    
}

编写需要被代理拦截的类 Person.cs,在要被拦截的方法上标注CustomInterceptorAttribute 。类需要是public类,方法需要是虚!方法,支持异步方法,因为动态代理是动态生成被代理的类的动态子类实现的。

public class Person
{
    [CustomInterceptor]
    public virtual void Say(string msg)
    {
        Console.WriteLine("service calling..."+msg);
    }
}

通过AspectCore创建代理对象

ProxyGeneratorBuilder proxyGeneratorBuilder = new ProxyGeneratorBuilder();
using (IProxyGenerator proxyGenerator=proxyGeneratorBuilder.Build())
{
    Person p = proxyGenerator.CreateClassProxy<Person>();
    p.Say("Hello World");
    Console.WriteLine(p.GetType());
    Console.WriteLine(p.GetType().BaseType);
}

Console.ReadKey();

注意p指向的对象是AspectCore生成的Person的动态子类的对象,直接new Person是无法被,拦截的.

执行结果:

pig微服务快速开发框架搭建 微服务开发首选框架,pig微服务快速开发框架搭建 微服务开发首选框架_pig微服务快速开发框架搭建,第1张

创建简单的熔断降级框架

新建控制台项目 hystrixtest1
新建类Person.cs

public class Person
{
    public virtual async Task<string> HelloAsync(string name)
    {
        Console.WriteLine("hello"+name);
        return "ok";
    }

    public async Task<string> HelloFallBackAsync(string name)
    {
        Console.WriteLine("执行失败"+name);
        return "fail";
    }
}

目标:在执行 HelloAsync 失败的时候自动执行 HelloFallBackAsync ,达到熔断降级编写HystrixCommandAttribute.cs

[AttributeUsage(AttributeTargets.Method)]
public class HystrixCommandAttribute : AbstractInterceptorAttribute
{
    public string FallBackMethod { get; set; }

    public HystrixCommandAttribute(string fallBackMethod)
    {
        this.FallBackMethod = fallBackMethod;
    }

    public override async Task Invoke(AspectContext context, AspectDelegate next)
    {
        try
        {
            await next(context);//执行被拦截的方法
        }
        catch (Exception ex)
        {
            /*
             * context.ServiceMethod 被拦截的方法
             * context.ServiceMethod.DeclaringType 被拦截的方法所在的类
             * context.Implementation 实际执行的对象
             * context.Parameters 方法参数值
             * 如果执行失败,则执行FallBackMethod
             */
            var fallBackMethod = context.ServiceMethod.DeclaringType.GetMethod(this.FallBackMethod);
            object fallBackResult = fallBackMethod.Invoke(context.Implementation, context.Parameters);
            context.ReturnValue = fallBackResult;
            await Task.FromResult(0);
        }
    }
}

修改Person.cs类

public class Person
{
    [HystrixCommand(nameof(HelloFallBackAsync))]
    public virtual async Task<string> HelloAsync(string name)//需要是虚方法
    {
        Console.WriteLine("hello"+name);

        //抛错
        String s = null;
        //s.ToString();

        return "ok";
    }

    public async Task<string> HelloFallBackAsync(string name)
    {
        Console.WriteLine("执行失败"+name);
        return "fail";
    }

    [HystrixCommand(nameof(AddFall))]
    public virtual int Add(int i, int j)
    {
        //抛错
        String s = null;
        //s.ToString();

        return i + j;
    }

    public int AddFall(int i, int j)
    {
        return 0;
    }
}

创建代理对象

ProxyGeneratorBuilder proxyGeneratorBuilder = new ProxyGeneratorBuilder();
using (IProxyGenerator proxyGenerator=proxyGeneratorBuilder.Build())
{
    Person p = proxyGenerator.CreateClassProxy<Person>();
    Console.WriteLine(p.HelloAsync("Hello World").Result);
    Console.WriteLine(p.Add(1,2));
}

执行效果

pig微服务快速开发框架搭建 微服务开发首选框架,pig微服务快速开发框架搭建 微服务开发首选框架_AOP_02,第2张

异常执行效果

pig微服务快速开发框架搭建 微服务开发首选框架,pig微服务快速开发框架搭建 微服务开发首选框架_AOP_03,第3张

细化框架

重试: MaxRetryTimes表示最多重试几次,如果为0则不重试, RetrvIntervalMilliseconds表示重试间隔的豪秒数;

熔断: EnableCircuitBreaker是否启用熔断, ExceptionsAllowedBeforeBreaking表示出现错误,几次后熔断, Milliseconds0fBreak表示熔断多长时间(毫秒);
超时: TimeOutMilliseconds执行超过多少毫秒则认为超时(0表示不检测超时)缓存:缓存多少豪秒(0表示不缓存) ,用“类名+方法名+所有参数ToString拼接"做缓存Key.

新建控制台项目aspnetcorehystrix1,并添加AspectCore.Core、Polly包引用

Install-Package AspectCore.Core
Install-Package Polly
Install-Package Microsoft.Extensions.Caching.Memory

编写HystrixCommandAttribute.cs

/// <summary>
/// 熔断框架
/// </summary>
[AttributeUsage(AttributeTargets.Method)]
public class HystrixCommandAttribute : AbstractInterceptorAttribute
{
    #region 属性
    /// <summary>
    /// 最多重试几次,如果为0则不重试
    /// </summary>
    public int MaxRetryTimes { get; set; } = 0;

    /// <summary>
    /// 重试间隔的毫秒数
    /// </summary>
    public int RetryIntervalMilliseconds { get; set; } = 100;

    /// <summary>
    /// 是否启用熔断
    /// </summary>
    public bool EnableCircuitBreater { get; set; } = false;

    /// <summary>
    /// 熔断前出现允许错误几次
    /// </summary>
    public int ExceptionAllowedBeforeBreaking { get; set; } = 3;

    /// <summary>
    /// 熔断多长时间(毫秒 )
    /// </summary>
    public int MillisecondsOfBreak { get; set; } = 1000;

    /// <summary>
    /// 执行超过多少毫秒则认为超时(0表示不检测超时)
    /// </summary>
    public int TimeOutMilliseconds { get; set; } = 0;

    /// <summary>
    /// 缓存多少毫秒(0表示不缓存),用“类名+方法名+所有参数ToString拼接”做缓存Key
    /// </summary>
    public int CacheTTLMilliseconds { get; set; } = 0;

    private Policy policy;

    //缓存
    private static readonly Microsoft.Extensions.Caching.Memory.IMemoryCache memoryCache = new Microsoft.Extensions.Caching.Memory.MemoryCache(new Microsoft.Extensions.Caching.Memory.MemoryCacheOptions());

    /// <summary>
    /// 降级方法名
    /// </summary>
    public string FallBackMethod { get; set; }
    #endregion

    #region 构造函数
    /// <summary>
    /// 熔断框架
    /// </summary>
    /// <param name="fallBackMethod">降级方法名</param>
    public HystrixCommandAttribute(string fallBackMethod)
    {
        this.FallBackMethod = fallBackMethod;
    }
    #endregion



    public override async Task Invoke(AspectContext context, AspectDelegate next)
    {
        //一个HystrixCommand中保持一个policy对象即可
        //其实主要是CircuitBreaker要求对于同一段代码要共享一个policy对象
        //根据反射原理,同一个方法就对应一个HystrixCommandAttribute,无论几次调用,
        //而不同方法对应不同的HystrixCommandAttribute对象,天然的一个policy对象共享
        //因为同一个方法共享一个policy,因此这个CircuitBreaker是针对所有请求的。
        //Attribute也不会在运行时再去改变属性的值,共享同一个policy对象也没问题
        lock (this)
        {
            if (policy==null)
            {
                policy = Policy.Handle<Exception>()
                    .FallbackAsync(async (ctx, t) =>
                    {
                        AspectContext aspectContext = (AspectContext)ctx["aspectContext"];
                        var fallBackMethod = context.ServiceMethod.DeclaringType.GetMethod(this.FallBackMethod);
                        Object fallBackResult = fallBackMethod.Invoke(context.Implementation, context.Parameters);
                        //不能如下这样,因为这是闭包相关,如果这样写第二次调用Invoke的时候context指向的
                        //还是第一次的对象,所以要通过Polly的上下文来传递AspectContext
                        //context.ReturnValue = fallBackResult;
                        aspectContext.ReturnValue = fallBackResult;
                    }, async (ex, t) => { });

                if (MaxRetryTimes>0)//重试
                {
                    policy = policy.WrapAsync(Policy.Handle<Exception>().WaitAndRetryAsync(MaxRetryTimes, i => TimeSpan.FromMilliseconds(RetryIntervalMilliseconds)));
                }

                if (EnableCircuitBreater)//熔断
                {
                    policy = policy.WrapAsync(Policy.Handle<Exception>().CircuitBreakerAsync(ExceptionAllowedBeforeBreaking, TimeSpan.FromMilliseconds(MillisecondsOfBreak)));
                }

                if (TimeOutMilliseconds>0)//超时
                {
                    policy = policy.WrapAsync(Policy.TimeoutAsync(() => TimeSpan.FromMilliseconds(TimeOutMilliseconds), Polly.Timeout.TimeoutStrategy.Pessimistic));
                }

            }
        }

        //把本地调用的AspectContext传递给Polly,主要给FallBackMethod中使用,避免闭包的坑
        Context pollyCtx = new Context();
        pollyCtx["aspectContext"] = context;

        if (CacheTTLMilliseconds>0)
        {
            //用类名+方法名+参数的下划线连接起来作为缓存key
            string cacheKey = "HystrixMethodCacheManager_Key_" + context.ServiceMethod.DeclaringType + "." + context.ServiceMethod + string.Join("_", context.Parameters);

            //尝试去缓存中获取。如果找到了,则直接用缓存中的值做返回值
            if (memoryCache.TryGetValue(cacheKey,out var cacheValue))
            {
                context.ReturnValue = cacheValue;
            }
            else
            {
                //如果缓存中没有,则执行实际被拦截的方法
                await policy.ExecuteAsync(ctx => next(context), pollyCtx);
                //存入缓存中
                using (var cacheEntry=memoryCache.CreateEntry(cacheKey))
                {
                    cacheEntry.Value = context.ReturnValue;
                    cacheEntry.AbsoluteExpiration = DateTime.Now + TimeSpan.FromMilliseconds(CacheTTLMilliseconds);
                }
            }
        }
        else//如果没有启用缓存,就直接执行业务方法
        {
            await policy.ExecuteAsync(ctx => next(context), pollyCtx);
        }
    }
}

编写业务类Person.cs

public class Person//需要public类
{
    [HystrixCommand(nameof(Hello1FallBackAsync), MaxRetryTimes = 3, EnableCircuitBreater = true)]
    public virtual async Task<String> HelloAsync(string name)//需要是虚方法
    {
        Console.WriteLine("hello" + name);

        #region 抛错
        String s = null;
        s.ToString(); 
        #endregion

        return "ok" + name;
    }

    [HystrixCommand(nameof(Hello2FallBackAsync))]
    public virtual async Task<string> Hello1FallBackAsync(string name)
    {
        Console.WriteLine("Hello降级1" + name);
        String s = null;
        s.ToString();
        return "fail_1";
    }

    public virtual async Task<string> Hello2FallBackAsync(string name)
    {
        Console.WriteLine("Hello降级2" + name);

        return "fail_2";
    }

    [HystrixCommand(nameof(AddFall))]
    public virtual int Add(int i, int j)
    {
        String s = null;
        //s.ToString();
        return i + j;
    }
    public int AddFall(int i, int j)
    {
        return 0;
    }

    [HystrixCommand(nameof(TestFallBack), CacheTTLMilliseconds = 3000)]
    public virtual void Test(int i)
    {
        Console.WriteLine("Test" + i);
    }

    public virtual void TestFallBack(int i)
    {
        Console.WriteLine("Test" + i);
    }
}

创建代理对象

ProxyGeneratorBuilder proxyGeneratorBuilder = new ProxyGeneratorBuilder();
using (IProxyGenerator proxyGenerator=proxyGeneratorBuilder.Build())
{
    Person p = proxyGenerator.CreateClassProxy<Person>();
    Console.WriteLine(p.HelloAsync("Hello World").Result);
    Console.WriteLine(p.Add(1, 2));
    while (true)
    {
        Console.WriteLine(p.HelloAsync("Hello World").Result);
        Thread.Sleep(100);
    }
}

测试结果:

正常:

pig微服务快速开发框架搭建 微服务开发首选框架,pig微服务快速开发框架搭建 微服务开发首选框架_AOP_04,第4张

一级熔断

pig微服务快速开发框架搭建 微服务开发首选框架,pig微服务快速开发框架搭建 微服务开发首选框架_Core_05,第5张

二级熔断

pig微服务快速开发框架搭建 微服务开发首选框架,pig微服务快速开发框架搭建 微服务开发首选框架_缓存_06,第6张

结合asp.net core依赖注入

新建WebAPI项目aspnetcorehystrix,

并添加AspectCore.Core、Polly包引用

Install-Package AspectCore.Core
Install-Package Polly
Install-Package Microsoft.Extensions.Caching.Memory

编写HystrixCommandAttribute.cs

/// <summary>
/// 熔断框架
/// </summary>
[AttributeUsage(AttributeTargets.Method)]
public class HystrixCommandAttribute : AbstractInterceptorAttribute
{
    #region 属性
    /// <summary>
    /// 最多重试几次,如果为0则不重试
    /// </summary>
    public int MaxRetryTimes { get; set; } = 0;

    /// <summary>
    /// 重试间隔的毫秒数
    /// </summary>
    public int RetryIntervalMilliseconds { get; set; } = 100;

    /// <summary>
    /// 是否启用熔断
    /// </summary>
    public bool EnableCircuitBreater { get; set; } = false;

    /// <summary>
    /// 熔断前出现允许错误几次
    /// </summary>
    public int ExceptionAllowedBeforeBreaking { get; set; } = 3;

    /// <summary>
    /// 熔断多长时间(毫秒 )
    /// </summary>
    public int MillisecondsOfBreak { get; set; } = 1000;

    /// <summary>
    /// 执行超过多少毫秒则认为超时(0表示不检测超时)
    /// </summary>
    public int TimeOutMilliseconds { get; set; } = 0;

    /// <summary>
    /// 缓存多少毫秒(0表示不缓存),用“类名+方法名+所有参数ToString拼接”做缓存Key
    /// </summary>
    public int CacheTTLMilliseconds { get; set; } = 0;

    private Policy policy;

    //缓存
    private static readonly Microsoft.Extensions.Caching.Memory.IMemoryCache memoryCache = new Microsoft.Extensions.Caching.Memory.MemoryCache(new Microsoft.Extensions.Caching.Memory.MemoryCacheOptions());

    /// <summary>
    /// 降级方法名
    /// </summary>
    public string FallBackMethod { get; set; }
    #endregion

    #region 构造函数
    /// <summary>
    /// 熔断框架
    /// </summary>
    /// <param name="fallBackMethod">降级方法名</param>
    public HystrixCommandAttribute(string fallBackMethod)
    {
        this.FallBackMethod = fallBackMethod;
    }
    #endregion



    public override async Task Invoke(AspectContext context, AspectDelegate next)
    {
        //一个HystrixCommand中保持一个policy对象即可
        //其实主要是CircuitBreaker要求对于同一段代码要共享一个policy对象
        //根据反射原理,同一个方法就对应一个HystrixCommandAttribute,无论几次调用,
        //而不同方法对应不同的HystrixCommandAttribute对象,天然的一个policy对象共享
        //因为同一个方法共享一个policy,因此这个CircuitBreaker是针对所有请求的。
        //Attribute也不会在运行时再去改变属性的值,共享同一个policy对象也没问题
        lock (this)
        {
            if (policy==null)
            {
                policy = Policy.Handle<Exception>()
                    .FallbackAsync(async (ctx, t) =>
                    {
                        AspectContext aspectContext = (AspectContext)ctx["aspectContext"];
                        var fallBackMethod = context.ServiceMethod.DeclaringType.GetMethod(this.FallBackMethod);
                        Object fallBackResult = fallBackMethod.Invoke(context.Implementation, context.Parameters);
                        //不能如下这样,因为这是闭包相关,如果这样写第二次调用Invoke的时候context指向的
                        //还是第一次的对象,所以要通过Polly的上下文来传递AspectContext
                        //context.ReturnValue = fallBackResult;
                        aspectContext.ReturnValue = fallBackResult;
                    }, async (ex, t) => { });

                if (MaxRetryTimes>0)//重试
                {
                    policy = policy.WrapAsync(Policy.Handle<Exception>().WaitAndRetryAsync(MaxRetryTimes, i => TimeSpan.FromMilliseconds(RetryIntervalMilliseconds)));
                }

                if (EnableCircuitBreater)//熔断
                {
                    policy = policy.WrapAsync(Policy.Handle<Exception>().CircuitBreakerAsync(ExceptionAllowedBeforeBreaking, TimeSpan.FromMilliseconds(MillisecondsOfBreak)));
                }

                if (TimeOutMilliseconds>0)//超时
                {
                    policy = policy.WrapAsync(Policy.TimeoutAsync(() => TimeSpan.FromMilliseconds(TimeOutMilliseconds), Polly.Timeout.TimeoutStrategy.Pessimistic));
                }

            }
        }

        //把本地调用的AspectContext传递给Polly,主要给FallBackMethod中使用,避免闭包的坑
        Context pollyCtx = new Context();
        pollyCtx["aspectContext"] = context;

        if (CacheTTLMilliseconds>0)
        {
            //用类名+方法名+参数的下划线连接起来作为缓存key
            string cacheKey = "HystrixMethodCacheManager_Key_" + context.ServiceMethod.DeclaringType + "." + context.ServiceMethod + string.Join("_", context.Parameters);

            //尝试去缓存中获取。如果找到了,则直接用缓存中的值做返回值
            if (memoryCache.TryGetValue(cacheKey,out var cacheValue))
            {
                context.ReturnValue = cacheValue;
            }
            else
            {
                //如果缓存中没有,则执行实际被拦截的方法
                await policy.ExecuteAsync(ctx => next(context), pollyCtx);
                //存入缓存中
                using (var cacheEntry=memoryCache.CreateEntry(cacheKey))
                {
                    cacheEntry.Value = context.ReturnValue;
                    cacheEntry.AbsoluteExpiration = DateTime.Now + TimeSpan.FromMilliseconds(CacheTTLMilliseconds);
                }
            }
        }
        else//如果没有启用缓存,就直接执行业务方法
        {
            await policy.ExecuteAsync(ctx => next(context), pollyCtx);
        }
    }
}

编写业务类Person.cs

public class Person//需要public类
{
    [HystrixCommand(nameof(HelloFallBackAsync))]
    public virtual async Task<string> HelloAsync(string name)//需要是虚方法
    {
        Console.WriteLine("hello" + name);
        String s = null;
        s.ToString();
        return "ok";
    }
    public async Task<string> HelloFallBackAsync(string name)
    {
        Console.WriteLine("执行失败" + name);
        return "fail";
    }

    [HystrixCommand(nameof(AddFall))]
    public virtual int Add(int i, int j)
    {
        String s = null;
        //  s.ToArray();
        return i + j;
    }
    public int AddFall(int i, int j)
    {
        return 0;
    }
}

在asp.net core项目中,可以借助于asp.net core的依赖注入,简化代理类对象的注入,不用再自己调用ProxyGeneratorBuilder 进行代理类对象的注入了。

Install-Package AspectCore.Extensions.DependencyInjection

修改Startup.cs的ConfigureServices方法,把返回值从void改为IServiceProvider

public IServiceProvider ConfigureServices(IServiceCollection services)
{
    services.AddMvc();
    services.AddScoped<Person>();
    return services.BuildAspectCoreServiceProvider();
}

其中services.AddSingleton();表 示 把Person注 入 。

BuildAspectCoreServiceProvider是让aspectcore接管注入。

在Controller中就可以通过构造函数进行依赖注入了:

升级一波

当然要通过反射扫描所有Service类,只要类中有标记了CustomInterceptorAttribute的方法都算作服务实现类。为了避免一下子扫描所有类,所以RegisterServices还是手动指定从哪个程序集中加载。

/// <summary>
/// 根据特性批量注入
/// </summary>
private static void RegisterServices(Assembly assembly, IServiceCollection services)
{
   //遍历程序集中的所有public类型
   foreach (Type type in assembly.GetExportedTypes())
   {
       //判断类中是否有标注了CustomInterceptorAttribute的方法
       bool hasHystrixCommandAttr= type.GetMethods().Any(m => m.GetCustomAttribute(typeof(HystrixCommandAttribute)) != null);
       if (hasHystrixCommandAttr)
       {
           services.AddSingleton(type);
       }
   }
}
public IServiceProvider ConfigureServices(IServiceCollection services)
{
    services.AddMvc();
    RegisterServices(this.GetType().Assembly, services);
    return services.BuildAspectCoreServiceProvider();
}



https://www.xamrdz.com/web/2su1942506.html

相关文章: