Golang 实现Thrift客户端连接池
Golang Thrift客户端连接池实现
- 1 前言
- 1.1 运行环境
- 1.2 .thrift文件
- 1.3 对照实验说明
- 2 Thrift客户端连接池实现
- 2.1 连接池的功能
- 2.2 获取连接
- 2.3 释放连接
- 2.4 超时管理
- 2.5 重连机制
- 3 对照实验
- 3.1 实验一:未使用连接池
- 3.2 实验二:使用连接池
- 4 总结
1 前言
阅读文章之前,请先了解一下thrift相关知识。thrift官方并没有提供客户端连接池的实现方案,而我们在实际使用时,thrift客户端必须复用,来保证较为可观的吞吐量,并避免在高QPS调用情况下,不断的创建、释放客户端所带来的机器端口耗尽问题。本文会详细讲解如何实现一个简单可靠的thrift客户端连接池,并通过对照实验来说明thrift客户端连接池所带来的好处。由于篇幅的原因,本文只粘出关键代码,源代码请查看Thrift Client Pool Demo
1.1 运行环境
- Golang版本: go1.14.3 darwin/amd64
- Thrift Golang库版本: 0.13.0
- Thrift IDL编辑器版本: 0.13.0
1.2 .thrift文件
namespace java com.czl.api.thrift.model
namespace cpp com.czl.api
namespace php com.czl.api
namespace py com.czl.api
namespace js com.czl.apixianz
namespace go com.czl.api
struct ApiRequest {
1: required i16 id;
}
struct ApiResponse{
1:required string name;
}
// service1
service ApiService1{
ApiResponse query(1:ApiRequest request)
}
// service2
service ApiService2{
ApiResponse query(1:ApiRequest request)
}
注:请通过安装Thrift IDL编译器,并生成客户端、服务端代码。
1.3 对照实验说明
通过脚本开启100个协程并发调用rpc服务10分钟,统计这段时间内,未使用thrift客户端连接池与使用客户端连接池服务的平均吞吐量、Thrift API调用平均延迟、机器端口消耗等数据进行性能对比。
- 实验一: 未使用thrift客户端连接池
- 实验二: 使用thrift客户端连接池
2 Thrift客户端连接池实现
2.1 连接池的功能
首先,我们要明确一下连接池的职责,这里我简单的总结一下,连接池主要功能是维护连接的创建、释放,通过缓存连接来复用连接,减少创建连接所带来的开销,提高系统的吞吐量,一般连接池还会有连接断开的重连机制、超时机制等。这里我们可以先定义出大部分连接池都会有的功能,只是定义,可以先不管每个功能的具体实现。每一个空闲Thrift客户端其实底层都维护着一条空闲TCP连接,空闲Thrift客户端与空闲连接在这里其实是同一个概念。
......
// Thrift客户端创建方法,留给业务去实现
type ThriftDial func(addr string, connTimeout time.Duration) (*IdleClient, error)
// 关闭Thrift客户端,留给业务实现
type ThriftClientClose func(c *IdleClient) error
// Thrift客户端连接池
type ThriftPool struct {
// Thrift客户端创建逻辑,业务自己实现
Dial ThriftDial
// Thrift客户端关闭逻辑,业务自己实现
Close ThriftClientClose
// 空闲客户端,用双端队列存储
idle list.List
// 同步锁,确保count、status、idle等公共数据并发操作安全
lock *sync.Mutex
// 记录当前已经创建的Thrift客户端,确保MaxConn配置
count int32
// Thrift客户端连接池状态,目前就open和stop两种
status uint32
// Thrift客户端连接池相关配置
config *ThriftPoolConfig
}
// 连接池配置
type ThriftPoolConfig struct {
// Thrfit Server端地址
Addr string
// 最大连接数
MaxConn int32
// 创建连接超时时间
ConnTimeout time.Duration
// 空闲客户端超时时间,超时主动释放连接,关闭客户端
IdleTimeout time.Duration
// 获取Thrift客户端超时时间
Timeout time.Duration
// 获取Thrift客户端失败重试间隔
interval time.Duration
}
// Thrift客户端
type IdleClient struct {
// Thrift传输层,封装了底层连接建立、维护、关闭、数据读写等细节
Transport thrift.TTransport
// 真正的Thrift客户端,业务创建传入
RawClient interface{}
}
// 封装了Thrift客户端
type idleConn struct {
// 空闲Thrift客户端
c *IdleClient
// 最近一次放入空闲队列的时间
t time.Time
}
// 获取Thrift空闲客户端
func (p *ThriftPool) Get() (*IdleClient, error) {
// 1. 从空闲池中获取空闲客户端,获取到更新数据,返回,否则执行第2步
// 2. 创建新到Thrift客户端,更新数据,返回Thrift客户端
......
}
// 归还Thrift客户端
func (p *ThriftPool) Put(client *IdleCLient) error {
// 1. 如果客户端已经断开,更新数据,返回,否则执行第2步
// 2. 将Thrift客户端丢进空闲连接池,更新数据,返回
......
}
// 超时管理,定期释放空闲太久的连接
func (p *ThriftPool) CheckTimeout() {
// 扫描空闲连接池,将空闲太久的连接主动释放掉,并更新数据
......
}
// 异常连接重连
func (p *ThriftPool) Reconnect(client *IdleClient) (newClient *IdleClient, err error) {
// 1. 关闭旧客户端
// 2. 创建新的客户端,并返回
......
}
// 其他方法
......
这里有两个关键的数据结构,ThriftPool和IdleClient,ThriftPool负责实现整个连接池的功能,IdleClient封装了真正的Thrift客户端。先看一下ThriftPool的定义:
// Thrift客户端创建方法,留给业务去实现
type ThriftDial func(addr string, connTimeout time.Duration) (*IdleClient, error)
// 关闭Thrift客户端,留给业务实现
type ThriftClientClose func(c *IdleClient) error
// Thrift客户端连接池
type ThriftPool struct {
// Thrift客户端创建逻辑,业务自己实现
Dial ThriftDial
// Thrift客户端关闭逻辑,业务自己实现
Close ThriftClientClose
// 空闲客户端,用双端队列存储
idle list.List
// 同步锁,确保count、status、idle等公共数据并发操作安全
lock *sync.Mutex
// 记录当前已经创建的Thrift客户端,确保MaxConn配置
count int32
// Thrift客户端连接池状态,目前就open和stop两种
status uint32
// Thrift客户端连接池相关配置
config *ThriftPoolConfig
}
// 连接池配置
type ThriftPoolConfig struct {
// Thrfit Server端地址
Addr string
// 最大连接数
MaxConn int32
// 创建连接超时时间
ConnTimeout time.Duration
// 空闲客户端超时时间,超时主动释放连接,关闭客户端
IdleTimeout time.Duration
// 获取Thrift客户端超时时间
Timeout time.Duration
// 获取Thrift客户端失败重试间隔
interval time.Duration
}
- Thrift客户端创建与关闭,涉及到业务细节,这里抽离成Dial方法和Close方法。
- 连接池需要维护空闲客户端,这里用双端队列来存储。
- 一般的连接池,都应该支持最大连接数配置,MaxConn可以配置连接池最大连接数,同时我们用count来记录连接池当前已经创建的连接。
- 为了实现连接池的超时管理,当然也得有相关超时配置。
- 连接池的状态、当前连接数等这些属性,是多协程并发操作的,这里用同步锁lock来确保并发操作安全。
在看一下IdleClient实现:
// Thrift客户端
type IdleClient struct {
// Thrift传输层,封装了底层连接建立、维护、关闭、数据读写等细节
Transport thrift.TTransport
// 真正的Thrift客户端,业务创建传入
RawClient interface{}
}
// 封装了Thrift客户端
type idleConn struct {
// 空闲Thrift客户端
c *IdleClient
// 最近一次放入空闲队列的时间
t time.Time
}
- RawClient是真正的Thrift客户端,与实际逻辑相关。
- Transport Thrift传输层,Thrift传输层,封装了底层连接建立、维护、关闭、数据读写等细节。
- idleConn封装了IdleClient,用来实现空闲连接超时管理,idleConn记录一个时间,这个时间是Thrift客户端最近一次被放入空闲队列的时间。
2.2 获取连接
......
var nowFunc = time.Now
......
// 获取Thrift空闲客户端
func (p *ThriftPool) Get() (*IdleClient, error) {
return p.get(nowFunc().Add(p.config.Timeout))
}
// 获取连接的逻辑实现
// expire设定了一个超时时间点,当没有可用连接时,程序会休眠一小段时间后重试
// 如果一直获取不到连接,一旦到达超时时间点,则报ErrOverMax错误
func (p *ThriftPool) get(expire time.Time) (*IdleClient, error) {
if atomic.LoadUint32(&p.status) == poolStop {
return nil, ErrPoolClosed
}
// 判断是否超额
p.lock.Lock()
if p.idle.Len() == 0 && atomic.LoadInt32(&p.count) >= p.config.MaxConn {
p.lock.Unlock()
// 不采用递归的方式来实现重试机制,防止栈溢出,这里改用循环方式来实现重试
for {
// 休眠一段时间再重试
time.Sleep(p.config.interval)
// 超时退出
if nowFunc().After(expire) {
return nil, ErrOverMax
}
p.lock.Lock()
if p.idle.Len() == 0 && atomic.LoadInt32(&p.count) >= p.config.MaxConn {
p.lock.Unlock()
} else { // 有可用链接,退出for循环
break
}
}
}
if p.idle.Len() == 0 {
// 先加1,防止首次创建连接时,TCP握手太久,导致p.count未能及时 1,而新的请求已经到来
// 从而导致短暂性实际连接数大于p.count(大部分链接由于无法进入空闲链接队列,而被关闭,处于TIME_WATI状态)
atomic.AddInt32(&p.count, 1)
p.lock.Unlock()
client, err := p.Dial(p.config.Addr, p.config.ConnTimeout)
if err != nil {
atomic.AddInt32(&p.count, -1)
return nil, err
}
// 检查连接是否有效
if !client.Check() {
atomic.AddInt32(&p.count, -1)
return nil, ErrSocketDisconnect
}
return client, nil
}
// 从队头中获取空闲连接
ele := p.idle.Front()
idlec := ele.Value.(*idleConn)
p.idle.Remove(ele)
p.lock.Unlock()
// 连接从空闲队列获取,可能已经关闭了,这里再重新检查一遍
if !idlec.c.Check() {
atomic.AddInt32(&p.count, -1)
return nil, ErrSocketDisconnect
}
return idlec.c, nil
}
p.Get()
的逻辑比较清晰:如果空闲队列没有连接,且当前连接已经到达p.config.MaxConn
,就休眠等待重试;当满足获取连接条件时p.idle.Len() != 0 || atomic.LoadInt32(&p.count) < p.config.MaxConn
,有空闲连接,则返回空闲连接,减少创建连接的开销,没有的话,再重新创建一条新的连接。这里有两个关键的地方需要注意:
-
等待重试的逻辑,不要用递归的方式来实现,防止运行栈溢出。
// 递归的方法实现等待重试逻辑 func (p *ThriftPool) get(expire time.Time) (*IdleClient, error) { // 超时退出 if nowFunc().After(expire) { return nil, ErrOverMax } if atomic.LoadUint32(&p.status) == poolStop { return nil, ErrPoolClosed } // 判断是否超额 p.lock.Lock() if p.idle.Len() == 0 && atomic.LoadInt32(&p.count) >= p.config.MaxConn { p.lock.Unlock() // 休眠递归重试 time.Sleep(p.config.interval) p.get(expire) } ....... }
-
注意
p.lock.Lock()
的和p.lock.UnLock()
调用时机,确保公共数据并发操作安全。
2.3 释放连接
// 归还Thrift客户端
func (p *ThriftPool) Put(client *IdleClient) error {
if client == nil {
return nil
}
if atomic.LoadUint32(&p.status) == poolStop {
err := p.Close(client)
client = nil
return err
}
if atomic.LoadInt32(&p.count) > p.config.MaxConn || !client.Check() {
atomic.AddInt32(&p.count, -1)
err := p.Close(client)
client = nil
return err
}
p.lock.Lock()
p.idle.PushFront(&idleConn{
c: client,
t: nowFunc(),
})
p.lock.Unlock()
return nil
}
p.Put()
逻辑也比较简单,如果连接已经失效,p.count
需要-1,并进行连接关闭操作。否则丢到空闲队列里,这里还是丢到队头,没错,还是丢到队头,p.Get()
和p.Put()
都是从队头操作,有点像堆操作,为啥这么处理,等下面说到空闲连接超时管理就清楚了,这里先记住丢回空闲队列的时候,会更新空闲连接的时间。
2.4 超时管理
获取连接超时管理p.Get()
方法已经讲过了,创建连接超时管理由p.Dial()
去实现,下面说的是空闲连接的超时管理,空闲队列的连接,如果一直没有使用,超过一定时间,需要主动关闭掉,服务端的资源有限,不需要用的连接就主动关掉,而且连接放太久,服务端也会主动关掉。
// 超时管理,定期释放空闲太久的连接
func (p *ThriftPool) CheckTimeout() {
p.lock.Lock()
for p.idle.Len() != 0 {
ele := p.idle.Back()
if ele == nil {
break
}
v := ele.Value.(*idleConn)
if v.t.Add(p.config.IdleTimeout).After(nowFunc()) {
break
}
//timeout && clear
p.idle.Remove(ele)
p.lock.Unlock()
p.Close(v.c) //close client connection
atomic.AddInt32(&p.count, -1)
p.lock.Lock()
}
p.lock.Unlock()
return
}
清理超时空闲连接的时候,是从队尾开始清理掉超时或者无效的连接,直到找到第一个可用的连接或者队列为空。p.Get()
和p.Put()
都从队头操作队列,保证了活跃的连接都在队头,如果一开始创建的连接太多,后面业务操作变少,不需要那么多连接的时候,那多余的连接就会沉到队尾,被超时管理所清理掉。另外,这样设计也可以优化操作的时间复杂度<O(n)。
2.5 重连机制
事实上,thrift的transport层并没有提供一个检查连接是否有效的方法,一开始实现连接池的时候,检测方法是调用thrift.TTransport.IsOpen()
来判断
// 检测连接是否有效
func (c *IdleClient) Check() bool {
if c.Transport == nil || c.RawClient == nil {
return false
}
return c.Transport.IsOpen()
}
可在测试阶段发现当底层当TCP连接被异常断开的时候(服务端重启、服务端宕机等),c.Transport.IsOpen()
并不能如期的返回false
,如果我们查看thrift的源码,可以发现,其实c.Transport.IsOpen()
只和我们是否调用了c.Transport.Open()
方法有关。为了能实现断开重连机制,我们只能在使用阶段发现异常连接时,重连连接。这里我在ThriftPool
上封装了一层代理ThriftPoolAgent
,来实现断开重连逻辑,具体请参考代码实现。
package pool
import (
"fmt"
"github.com/apache/thrift/lib/go/thrift"
"log"
"net"
)
type ThriftPoolAgent struct {
pool *ThriftPool
}
func NewThriftPoolAgent() *ThriftPoolAgent {
return &ThriftPoolAgent{}
}
func (a *ThriftPoolAgent) Init(pool *ThriftPool) {
a.pool = pool
}
// 真正的业务逻辑放到do方法做,ThriftPoolAgent只要保证获取到可用的Thrift客户端,然后传给do方法就行了
func (a *ThriftPoolAgent) Do(do func(rawClient interface{}) error) error {
var (
client *IdleClient
err error
)
defer func() {
if client != nil {
if err == nil {
if rErr := a.releaseClient(client); rErr != nil {
log.Println(fmt.Sprintf("releaseClient error: %v", rErr))
}
} else if _, ok := err.(net.Error); ok {
a.closeClient(client)
} else if _, ok = err.(thrift.TTransportException); ok {
a.closeClient(client)
} else {
if rErr := a.releaseClient(client); rErr != nil {
log.Println(fmt.Sprintf("releaseClient error: %v", rErr))
}
}
}
}()
// 从连接池里获取链接
client, err = a.getClient()
if err != nil {
return err
}
if err = do(client.RawClient); err != nil {
if _, ok := err.(net.Error); ok {
log.Println(fmt.Sprintf("err: retry tcp, %T, %s", err, err.Error()))
// 网络错误,重建连接
client, err = a.reconnect(client)
if err != nil {
return err
}
return do(client.RawClient)
}
if _, ok := err.(thrift.TTransportException); ok {
log.Println(fmt.Sprintf("err: retry tcp, %T, %s", err, err.Error()))
// thrift传输层错误,也重建连接
client, err = a.reconnect(client)
if err != nil {
return err
}
return do(client.RawClient)
}
return err
}
return nil
}
// 获取连接
func (a *ThriftPoolAgent) getClient() (*IdleClient, error) {
return a.pool.Get()
}
// 释放连接
func (a *ThriftPoolAgent) releaseClient(client *IdleClient) error {
return a.pool.Put(client)
}
// 关闭有问题的连接,并重新创建一个新的连接
func (a *ThriftPoolAgent) reconnect(client *IdleClient) (newClient *IdleClient, err error) {
return a.pool.Reconnect(client)
}
// 关闭连接
func (a *ThriftPoolAgent) closeClient(client *IdleClient) {
a.pool.CloseConn(client)
}
// 释放连接池
func (a *ThriftPoolAgent) Release() {
a.pool.Release()
}
func (a *ThriftPoolAgent) GetIdleCount() uint32 {
return a.pool.GetI您可能感兴趣的文章: