教程集 www.jiaochengji.com
教程集 >  Golang编程  >  golang教程  >  正文 electron窗口自适应_Go 限流器系列(3)自适应限流

electron窗口自适应_Go 限流器系列(3)自适应限流

发布时间:2022-01-09   编辑:jiaochengji.com
教程集为您提供electron窗口自适应,Go 限流器系列(3)自适应限流等资源,欢迎您收藏本站,我们将为您提供最新的electron窗口自适应,Go 限流器系列(3)自适应限流资源

漏斗桶/令牌桶确实能够保护系统不被拖垮, 但不管漏斗桶还是令牌桶, 其防护思路都是设定一个指标, 当超过该指标后就阻止或减少流量的继续进入,当系统负载降低到某一水平后则恢复流量的进入。但其通常都是被动的,其实际效果取决于限流阈值设置是否合理,但往往设置合理不是一件容易的事情.

项目日常维护中, 经常能够看到某某同学在群里说:xx系统429了, 然后经过一番查找后发现是一波突然的活动流量, 只能申请再新增几台机器. 过了几天 OP 发现该集群的流量达不到预期又下掉了几台机器, 然后又开始一轮新的循环.

这里先不讨论集群自动伸缩的问题. 这里提出一些问题

<ol><li>集群增加机器或者减少机器限流阈值是否要重新设置?</li><li>设置限流阈值的依据是什么?</li><li>人力运维成本是否过高?</li><li>当调用方反馈429时, 这个时候重新设置限流, 其实流量高峰已经过了重新评估限流是否有意义?</li></ol>

这些其实都是采用漏斗桶/令牌桶的缺点, 总体来说就是太被动, 不能快速适应流量变化

<h2><span style="font-weight:bold;"/><span style="font-weight:bold;">自适应限流</span><span style="font-weight:bold;"> </span></h2>

对于自适应限流来说, 一般都是结合系统的 Load、CPU 使用率以及应用的入口 QPS、平均响应时间和并发量等几个维度的监控指标,通过自适应的流控策略, 让系统的入口流量和系统的负载达到一个平衡,让系统尽可能跑在最大吞吐量的同时保证系统整体的稳定性。

比较出名的自适应限流的实现是 Alibaba Sentinel. 不过由于提前没有发现 Sentinel 有个 golang 版本的实现, 本篇文章就以 Kratos 的 BBR 实现探讨自适应限流的原理.

<h2><span style="font-weight:bold;"/><span style="font-weight:bold;">Kratos 自适应限流</span><span style="font-weight:bold;"> </span></h2>

借鉴了 Sentinel 项目的自适应限流系统, 通过综合分析服务的 cpu 使用率、请求成功的 qps 和请求成功的 rt 来做自适应限流保护。

<ul><li>cpu: 最近 1s 的 CPU 使用率均值,使用滑动平均计算,采样周期是 250ms</li><li>inflight: 当前处理中正在处理的请求数量</li><li>pass: 请求处理成功的量</li><li>rt: 请求成功的响应耗时</li></ul><h3><span style="font-weight:bold;"/>限流公式<span style="font-weight:bold;"/></h3>

cpu > 800 AND (Now - PrevDrop) < 1s AND (MaxPass * MinRt * windows / 1000) < InFlight

<ul><li>MaxPass 表示最近 5s 内,单个采样窗口中最大的请求数</li><li>MinRt 表示最近 5s 内,单个采样窗口中最小的响应时间</li><li>windows 表示一秒内采样窗口的数量,默认配置中是 5s 50 个采样,那么 windows 的值为 10</li></ul><h3><span style="font-weight:bold;"/>kratos 中间件实现<span style="font-weight:bold;"/></h3> <pre class="has"><code>func (b *RateLimiter) Limit() HandlerFunc {
 return func(c *Context) {
  uri := fmt.Sprintf("%s://%s%s", c.Request.URL.Scheme, c.Request.Host, c.Request.URL.Path)
  limiter := b.group.Get(uri)
  done, err := limiter.Allow(c)
  if err != nil {
   _metricServerBBR.Inc(uri, c.Request.Method)
   c.JSON(nil, err)
   c.Abort()
   return
  }
  defer func() {
   done(limit.DoneInfo{Op: limit.Success})
   b.printStats(uri, limiter)
  }()
  c.Next()
 }
}
</code></pre>

使用方式

<pre class="has"><code>e := bm.DefaultServer(nil)
limiter := bm.NewRateLimiter(nil)
e.Use(limiter.Limit())
e.GET("/api", myHandler)
</code></pre> <h3><span style="font-weight:bold;"/>源码实现<span style="font-weight:bold;"/></h3> <h4><span style="font-weight:bold;"/>Allow<span style="font-weight:bold;"/></h4> <pre class="has"><code>func (l *BBR) Allow(ctx context.Context, opts ...limit.AllowOption) (func(info limit.DoneInfo), error) {
 allowOpts := limit.DefaultAllowOpts()
 for _, opt := range opts {
  opt.Apply(&allowOpts)
 }
 if l.shouldDrop() { // 判断是否触发限流
  return nil, ecode.LimitExceed
 }
 atomic.AddInt64(&l.inFlight, 1) // 增加正在处理请求数
 stime := time.Since(initTime) // 记录请求到来的时间
 return func(do limit.DoneInfo) {
  rt := int64((time.Since(initTime) - stime) / time.Millisecond) // 请求处理成功的响应时长
  l.rtStat.Add(rt) // 增加rtStat响应耗时的统计
  atomic.AddInt64(&l.inFlight, -1) // 请求处理成功后, 减少正在处理的请求数
  switch do.Op {
  case limit.Success:
   l.passStat.Add(1) // 处理成功后增加成功处理请求数的统计
   return
  default:
   return
  }
 }, nil
}
</code></pre> <h4><span style="font-weight:bold;"/>shouldDrop<span style="font-weight:bold;"/></h4> <pre class="has"><code>func (l *BBR) shouldDrop() bool {
 // 判断目前cpu的使用率是否达到设置的CPU的限制, 默认值800
 if l.cpu()   // 如果上一次舍弃请求的时间是0, 那么说明没有限流的需求, 直接返回
  prevDrop, _ := l.prevDrop.Load().(time.Duration)
  if prevDrop == 0 {
   return false
  }
  // 如果上一次请求的时间与当前的请求时间小于1s, 那么说明有限流的需求
  if time.Since(initTime)-prevDrop <= time.Second {
   if atomic.LoadInt32(&l.prevDropHit) == 0 {
    atomic.StoreInt32(&l.prevDropHit, 1)
   }
   // 增加正在处理的请求的数量
   inFlight := atomic.LoadInt64(&l.inFlight)
   // 判断正在处理的请求数是否达到系统的最大的请求数量
   return inFlight > 1 && inFlight > l.maxFlight()
  }
  // 清空当前的prevDrop
  l.prevDrop.Store(time.Duration(0))
  return false
 }
 // 增加正在处理的请求的数量
 inFlight := atomic.LoadInt64(&l.inFlight)
 // 判断正在处理的请求数是否达到系统的最大的请求数量
 drop := inFlight > 1 && inFlight > l.maxFlight()
 if drop {
  prevDrop, _ := l.prevDrop.Load().(time.Duration)
  // 如果判断达到了最大请求数量, 并且当前有限流需求
  if prevDrop != 0 {
   return drop
  }
  l.prevDrop.Store(time.Since(initTime))
 }
 return drop
}
</code></pre> <h4><span style="font-weight:bold;"/>maxFlight<span style="font-weight:bold;"/></h4>

该函数是核心函数. 其计算公式: MaxPass * MinRt * windows / 1000. maxPASS/minRT都是基于<code>metric.RollingCounter</code>来实现的, 限于篇幅原因这里就不再具体看其实现(想看的可以去看rolling_counter_test.go还是蛮容易理解的)

<pre class="has"><code>func (l *BBR) maxFlight() int64 {
 return int64(math.Floor(float64(l.maxPASS()*l.minRT()*l.winBucketPerSec)/1000.0   0.5))
}
</code></pre> <ul><li>winBucketPerSec: 每秒内的采样数量,其计算方式:int64(time.Second)/(int64(conf.Window)/int64(conf.WinBucket)), conf.Window默认值10s, conf.WinBucket默认值100. 简化下公式: 1/(10/100) = 10, 所以每秒内的采样数就是10</li></ul><pre class="has"><code>// 单个采样窗口在一个采样周期中的最大的请求数, 默认的采样窗口是10s, 采样bucket数量100
func (l *BBR) maxPASS() int64 {
 rawMaxPass := atomic.LoadInt64(&l.rawMaxPASS)
 if rawMaxPass > 0 && l.passStat.Timespan() 1 {
  return rawMaxPass
 }
 // 遍历100个采样bucket, 找到采样bucket中最大的请求数
 rawMaxPass = int64(l.passStat.Reduce(func(iterator metric.Iterator) float64 {
  var result = 1.0
  for i := 1; iterator.Next() && i    bucket := iterator.Bucket()
   count := 0.0
   for _, p := range bucket.Points {
    count  = p
   }
   result = math.Max(result, count)
  }
  return result
 }))
 if rawMaxPass == 0 {
  rawMaxPass = 1
 }
 atomic.StoreInt64(&l.rawMaxPASS, rawMaxPass)
 return rawMaxPass
}

// 单个采样窗口中最小的响应时间
func (l *BBR) minRT() int64 {
 rawMinRT := atomic.LoadInt64(&l.rawMinRt)
 if rawMinRT > 0 && l.rtStat.Timespan() 1 {
  return rawMinRT
 }
 // 遍历100个采样bucket, 找到采样bucket中最小的响应时间
 rawMinRT = int64(math.Ceil(l.rtStat.Reduce(func(iterator metric.Iterator) float64 {
  var result = math.MaxFloat64
  for i := 1; iterator.Next() && i    bucket := iterator.Bucket()
   if len(bucket.Points) == 0 {
    continue
   }
   total := 0.0
   for _, p := range bucket.Points {
    total  = p
   }
   avg := total / float64(bucket.Count)
   result = math.Min(result, avg)
  }
  return result
 })))
 if rawMinRT <= 0 {
  rawMinRT = 1
 }
 atomic.StoreInt64(&l.rawMinRt, rawMinRT)
 return rawMinRT
}
</code></pre> <h2><span style="font-weight:bold;"/><span style="font-weight:bold;">参考</span></h2> <h3><span style="font-weight:bold;"/></h3> [1]

alibaba/Sentinel-系统自适应限流: <em>https://github.com/alibaba/Sentinel/wiki/系统自适应限流</em>

[2]

go-kratos/kratos-自适应限流保护: <em>https://github.com/go-kratos/kratos/blob/master/doc/wiki-cn/ratelimit.md</em>

[3]

alibaba/sentinel-golang-系统自适应流控: <em>https://github.com/alibaba/sentinel-golang/wiki/系统自适应流控</em>


推荐阅读

<ul><li>

Go 限流器实战系列(2) -- Token Bucket 令牌桶

</li></ul> 学习交流 Go 语言,扫码回复「进群」即可

站长 polarisxu

自己的原创文章

不限于 Go 技术

职场和创业经验

Go语言中文网

每天为你

分享 Go 知识

Go爱好者值得关注

到此这篇关于“electron窗口自适应_Go 限流器系列(3)自适应限流”的文章就介绍到这了,更多文章或继续浏览下面的相关文章,希望大家以后多多支持JQ教程网!

您可能感兴趣的文章:
electron窗口自适应_Go 限流器系列(3)自适应限流
还在担心服务挂掉?Sentinel Go 让服务稳如磐石
jBox 2.3基于jquery的最新多功能对话框插件 常见使用问题解答
Deno 来势汹汹?这些最新项目让你知道Node.js可不是吃素的
(三)go-kit服务接口限流
win2003 asp.net权限设置问题及解决方法
python有什么ide吗
Golang 限流器的使用和实现
jquery中防刷IP流量软件影响统计的一点对策
Java 框架 Netty 实现原理分析

[关闭]
~ ~