redigo 官网连接池 仅支持从队头拿连接。从队头放回已用连接,导致最热连接一直被使用,不轮询每一个连接,总会引起 Redis 的 Proxy 出现连接或是负载不均衡的问题。建议修改源代码中的 pool.go 文件,增加 pushBack 方法,将已使用的连接加入在队尾。
增加 pushBack 代码示例
// idle connect push list backfunc (l *idleList) pushBack(pc *poolConn) {if l.count == 0 {l.front = pcl.back = pcpc.prev = nilpc.next = nil} else {pc.prev = l.backl.back.next = pcl.back = pcpc.next = nil}l.count++}// idle connection in the pool method, True: pushBack, False: pushFront, default FalseLifo bool// fix add pushBackif p.Lifo == true {p.idle.pushBack(pc)} else {p.idle.pushFront(pc)}
完整代码示例
====================整个代码修改后pool.go=========================// Copyright 2012 Gary Burd//// Licensed under the Apache License, Version 2.0 (the "License"): you may// not use this file except in compliance with the License. You may obtain// a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the// License for the specific language governing permissions and limitations// under the License.package redisimport ("bytes""context""crypto/rand""crypto/sha1""errors""io""strconv""sync""time")var (_ ConnWithTimeout = (*activeConn)(nil)_ ConnWithTimeout = (*errorConn)(nil))var nowFunc = time.Now // for testing// ErrPoolExhausted is returned from a pool connection method (Do, Send,// Receive, Flush, Err) when the maximum number of database connections in the// pool has been reached.var ErrPoolExhausted = errors.New("redigo: connection pool exhausted")var (errConnClosed = errors.New("redigo: connection closed"))// Pool maintains a pool of connections. The application calls the Get method// to get a connection from the pool and the connection's Close method to// return the connection's resources to the pool.//// The following example shows how to use a pool in a web application. The// application creates a pool at application startup and makes it available to// request handlers using a package level variable. The pool configuration used// here is an example, not a recommendation.//// func newPool(addr string) *redis.Pool {// return &redis.Pool{// MaxIdle: 3,// IdleTimeout: 240 * time.Second,// // Dial or DialContext must be set. When both are set, DialContext takes precedence over Dial.// Dial: func () (redis.Conn, error) { return redis.Dial("tcp", addr) },// }// }//// var (// pool *redis.Pool// redisServer = flag.String("redisServer", ":6379", "")// )//// func main() {// flag.Parse()// pool = newPool(*redisServer)// ...// }//// A request handler gets a connection from the pool and closes the connection// when the handler is done://// func serveHome(w http.ResponseWriter, r *http.Request) {// conn := pool.Get()// defer conn.Close()// ...// }//// Use the Dial function to authenticate connections with the AUTH command or// select a database with the SELECT command://// pool := &redis.Pool{// // Other pool configuration not shown in this example.// Dial: func () (redis.Conn, error) {// c, err := redis.Dial("tcp", server)// if err != nil {// return nil, err// }// if _, err := c.Do("AUTH", password); err != nil {// c.Close()// return nil, err// }// if _, err := c.Do("SELECT", db); err != nil {// c.Close()// return nil, err// }// return c, nil// },// }//// Use the TestOnBorrow function to check the health of an idle connection// before the connection is returned to the application. This example PINGs// connections that have been idle more than a minute://// pool := &redis.Pool{// // Other pool configuration not shown in this example.// TestOnBorrow: func(c redis.Conn, t time.Time) error {// if time.Since(t) < time.Minute {// return nil// }// _, err := c.Do("PING")// return err// },// }//type Pool struct {// Dial is an application supplied function for creating and configuring a// connection.//// The connection returned from Dial must not be in a special state// (subscribed to pubsub channel, transaction started, ...).Dial func() (Conn, error)// DialContext is an application supplied function for creating and configuring a// connection with the given context.//// The connection returned from Dial must not be in a special state// (subscribed to pubsub channel, transaction started, ...).DialContext func(ctx context.Context) (Conn, error)// TestOnBorrow is an optional application supplied function for checking// the health of an idle connection before the connection is used again by// the application. Argument t is the time that the connection was returned// to the pool. If the function returns an error, then the connection is// closed.TestOnBorrow func(c Conn, t time.Time) error// Maximum number of idle connections in the pool.MaxIdle int// idle connection in the pool method, True: pushBack, False: pushFront, default FalseLifo bool// Maximum number of connections allocated by the pool at a given time.// When zero, there is no limit on the number of connections in the pool.MaxActive int// Close connections after remaining idle for this duration. If the value// is zero, then idle connections are not closed. Applications should set// the timeout to a value less than the server's timeout.IdleTimeout time.Duration// If Wait is true and the pool is at the MaxActive limit, then Get() waits// for a connection to be returned to the pool before returning.Wait bool// Close connections older than this duration. If the value is zero, then// the pool does not close connections based on age.MaxConnLifetime time.Durationmu sync.Mutex // mu protects the following fieldsclosed bool // set to true when the pool is closed.active int // the number of open connections in the poolinitOnce sync.Once // the init ch once funcch chan struct{} // limits open connections when p.Wait is trueidle idleList // idle connectionswaitCount int64 // total number of connections waited for.waitDuration time.Duration // total time waited for new connections.}// NewPool creates a new pool.//// Deprecated: Initialize the Pool directly as shown in the example.func NewPool(newFn func() (Conn, error), maxIdle int) *Pool {return &Pool{Dial: newFn, MaxIdle: maxIdle}}// Get gets a connection. The application must close the returned connection.// This method always returns a valid connection so that applications can defer// error handling to the first use of the connection. If there is an error// getting an underlying connection, then the connection Err, Do, Send, Flush// and Receive methods return that error.func (p *Pool) Get() Conn {// GetContext returns errorConn in the first argument when an error occurs.c, _ := p.GetContext(context.Background())return c}// GetContext gets a connection using the provided context.//// The provided Context must be non-nil. If the context expires before the// connection is complete, an error is returned. Any expiration on the context// will not affect the returned connection.//// If the function completes without error, then the application must close the// returned connection.func (p *Pool) GetContext(ctx context.Context) (Conn, error) {// Wait until there is a vacant connection in the pool.waited, err := p.waitVacantConn(ctx)if err != nil {return errorConn{err}, err}p.mu.Lock()if waited > 0 {p.waitCount++p.waitDuration += waited}// Prune stale connections at the back of the idle list.if p.IdleTimeout > 0 {n := p.idle.countfor i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ {pc := p.idle.backp.idle.popBack()p.mu.Unlock()pc.c.Close()p.mu.Lock()p.active--}}// Get idle connection from the front of idle list.for p.idle.front != nil {pc := p.idle.frontp.idle.popFront()p.mu.Unlock()if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) &&(p.MaxConnLifetime == 0 || nowFunc().Sub(pc.created) < p.MaxConnLifetime) {return &activeConn{p: p, pc: pc}, nil}pc.c.Close()p.mu.Lock()p.active--}// Check for pool closed before dialing a new connection.if p.closed {p.mu.Unlock()err := errors.New("redigo: get on closed pool")return errorConn{err}, err}// Handle limit for p.Wait == false.if !p.Wait && p.MaxActive > 0 && p.active >= p.MaxActive {p.mu.Unlock()return errorConn{ErrPoolExhausted}, ErrPoolExhausted}p.active++p.mu.Unlock()c, err := p.dial(ctx)if err != nil {p.mu.Lock()p.active--if p.ch != nil && !p.closed {p.ch <- struct{}{}}p.mu.Unlock()return errorConn{err}, err}return &activeConn{p: p, pc: &poolConn{c: c, created: nowFunc()}}, nil}// PoolStats contains pool statistics.type PoolStats struct {// ActiveCount is the number of connections in the pool. The count includes// idle connections and connections in use.ActiveCount int// IdleCount is the number of idle connections in the pool.IdleCount int// WaitCount is the total number of connections waited for.// This value is currently not guaranteed to be 100% accurate.WaitCount int64// WaitDuration is the total time blocked waiting for a new connection.// This value is currently not guaranteed to be 100% accurate.WaitDuration time.Duration}// Stats returns pool's statistics.func (p *Pool) Stats() PoolStats {p.mu.Lock()stats := PoolStats{ActiveCount: p.active,IdleCount: p.idle.count,WaitCount: p.waitCount,WaitDuration: p.waitDuration,}p.mu.Unlock()return stats}// ActiveCount returns the number of connections in the pool. The count// includes idle connections and connections in use.func (p *Pool) ActiveCount() int {p.mu.Lock()active := p.activep.mu.Unlock()return active}// IdleCount returns the number of idle connections in the pool.func (p *Pool) IdleCount() int {p.mu.Lock()idle := p.idle.countp.mu.Unlock()return idle}// Close releases the resources used by the pool.func (p *Pool) Close() error {p.mu.Lock()if p.closed {p.mu.Unlock()return nil}p.closed = truep.active -= p.idle.countpc := p.idle.frontp.idle.count = 0p.idle.front, p.idle.back = nil, nilif p.ch != nil {close(p.ch)}p.mu.Unlock()for ; pc != nil; pc = pc.next {pc.c.Close()}return nil}func (p *Pool) lazyInit() {p.initOnce.Do(func() {p.ch = make(chan struct{}, p.MaxActive)if p.closed {close(p.ch)} else {for i := 0; i < p.MaxActive; i++ {p.ch <- struct{}{}}}})}// waitVacantConn waits for a vacant connection in pool if waiting// is enabled and pool size is limited, otherwise returns instantly.// If ctx expires before that, an error is returned.//// If there were no vacant connection in the pool right away it returns the time spent waiting// for that connection to appear in the pool.func (p *Pool) waitVacantConn(ctx context.Context) (waited time.Duration, err error) {if !p.Wait || p.MaxActive <= 0 {// No wait or no connection limit.return 0, nil}p.lazyInit()// wait indicates if we believe it will block so its not 100% accurate// however for stats it should be good enough.wait := len(p.ch) == 0var start time.Timeif wait {start = time.Now()}select {case <-p.ch:// Additionally check that context hasn't expired while we were waiting,// because `select` picks a random `case` if several of them are "ready".select {case <-ctx.Done():p.ch <- struct{}{}return 0, ctx.Err()default:}case <-ctx.Done():return 0, ctx.Err()}if wait {return time.Since(start), nil}return 0, nil}func (p *Pool) dial(ctx context.Context) (Conn, error) {if p.DialContext != nil {return p.DialContext(ctx)}if p.Dial != nil {return p.Dial()}return nil, errors.New("redigo: must pass Dial or DialContext to pool")}func (p *Pool) put(pc *poolConn, forceClose bool) error {p.mu.Lock()if !p.closed && !forceClose {pc.t = nowFunc()// fix add pushBackif p.Lifo == true {p.idle.pushBack(pc)} else {p.idle.pushFront(pc)}if p.idle.count > p.MaxIdle {pc = p.idle.backp.idle.popBack()} else {pc = nil}}if pc != nil {p.mu.Unlock()pc.c.Close()p.mu.Lock()p.active--}if p.ch != nil && !p.closed {p.ch <- struct{}{}}p.mu.Unlock()return nil}type activeConn struct {p *Poolpc *poolConnstate int}var (sentinel []bytesentinelOnce sync.Once)func initSentinel() {p := make([]byte, 64)if _, err := rand.Read(p); err == nil {sentinel = p} else {h := sha1.New()io.WriteString(h, "Oops, rand failed. Use time instead.") // nolint: errcheckio.WriteString(h, strconv.FormatInt(time.Now().UnixNano(), 10)) // nolint: errchecksentinel = h.Sum(nil)}}func (ac *activeConn) firstError(errs ...error) error {for _, err := range errs[:len(errs)-1] {if err != nil {return err}}return errs[len(errs)-1]}func (ac *activeConn) Close() (err error) {pc := ac.pcif pc == nil {return nil}ac.pc = nilif ac.state&connectionMultiState != 0 {err = pc.c.Send("DISCARD")ac.state &^= (connectionMultiState | connectionWatchState)} else if ac.state&connectionWatchState != 0 {err = pc.c.Send("UNWATCH")ac.state &^= connectionWatchState}if ac.state&connectionSubscribeState != 0 {err = ac.firstError(err,pc.c.Send("UNSUBSCRIBE"),pc.c.Send("PUNSUBSCRIBE"),)// To detect the end of the message stream, ask the server to echo// a sentinel value and read until we see that value.sentinelOnce.Do(initSentinel)err = ac.firstError(err,pc.c.Send("ECHO", sentinel),pc.c.Flush(),)for {p, err2 := pc.c.Receive()if err2 != nil {err = ac.firstError(err, err2)break}if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) {ac.state &^= connectionSubscribeStatebreak}}}_, err2 := pc.c.Do("")return ac.firstError(err,err2,ac.p.put(pc, ac.state != 0 || pc.c.Err() != nil),)}func (ac *activeConn) Err() error {pc := ac.pcif pc == nil {return errConnClosed}return pc.c.Err()}func (ac *activeConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {pc := ac.pcif pc == nil {return nil, errConnClosed}ci := lookupCommandInfo(commandName)ac.state = (ac.state | ci.Set) &^ ci.Clearreturn pc.c.Do(commandName, args...)}func (ac *activeConn) DoWithTimeout(timeout time.Duration, commandName string, args ...interface{}) (reply interface{}, err error) {pc := ac.pcif pc == nil {return nil, errConnClosed}cwt, ok := pc.c.(ConnWithTimeout)if !ok {return nil, errTimeoutNotSupported}ci := lookupCommandInfo(commandName)ac.state = (ac.state | ci.Set) &^ ci.Clearreturn cwt.DoWithTimeout(timeout, commandName, args...)}func (ac *activeConn) Send(commandName string, args ...interface{}) error {pc := ac.pcif pc == nil {return errConnClosed}ci := lookupCommandInfo(commandName)ac.state = (ac.state | ci.Set) &^ ci.Clearreturn pc.c.Send(commandName, args...)}func (ac *activeConn) Flush() error {pc := ac.pcif pc == nil {return errConnClosed}return pc.c.Flush()}func (ac *activeConn) Receive() (reply interface{}, err error) {pc := ac.pcif pc == nil {return nil, errConnClosed}return pc.c.Receive()}func (ac *activeConn) ReceiveWithTimeout(timeout time.Duration) (reply interface{}, err error) {pc := ac.pcif pc == nil {return nil, errConnClosed}cwt, ok := pc.c.(ConnWithTimeout)if !ok {return nil, errTimeoutNotSupported}return cwt.ReceiveWithTimeout(timeout)}type errorConn struct{ err error }func (ec errorConn) Do(string, ...interface{}) (interface{}, error) { return nil, ec.err }func (ec errorConn) DoWithTimeout(time.Duration, string, ...interface{}) (interface{}, error) {return nil, ec.err}func (ec errorConn) Send(string, ...interface{}) error { return ec.err }func (ec errorConn) Err() error { return ec.err }func (ec errorConn) Close() error { return nil }func (ec errorConn) Flush() error { return ec.err }func (ec errorConn) Receive() (interface{}, error) { return nil, ec.err }func (ec errorConn) ReceiveWithTimeout(time.Duration) (interface{}, error) { return nil, ec.err }type idleList struct {count intfront, back *poolConn}type poolConn struct {c Connt time.Timecreated time.Timenext, prev *poolConn}func (l *idleList) pushFront(pc *poolConn) {pc.next = l.frontpc.prev = nilif l.count == 0 {l.back = pc} else {l.front.prev = pc}l.front = pcl.count++}// idle connect push list backfunc (l *idleList) pushBack(pc *poolConn) {if l.count == 0 {l.front = pcl.back = pcpc.prev = nilpc.next = nil} else {pc.prev = l.backl.back.next = pcl.back = pcpc.next = nil}l.count++}func (l *idleList) popFront() {pc := l.frontl.count--if l.count == 0 {l.front, l.back = nil, nil} else {pc.next.prev = nill.front = pc.next}pc.next, pc.prev = nil, nil}func (l *idleList) popBack() {pc := l.backl.count--if l.count == 0 {l.front, l.back = nil, nil} else {pc.prev.next = nill.back = pc.prev}pc.next, pc.prev = nil, nil}==================================================初始方法:// 建立连接池redisClient = &redis.Pool{MaxIdle: maxIdle,MaxActive: maxActive,IdleTimeout: MaxIdleTimeout * time.Second,Wait: true,Lifo: true, # 设置为true,这是重点Dial: func() (redis.Conn, error) {con, err := redis.Dial("tcp", conf["Host"].(string),redis.DialPassword(conf["Password"].(string)),redis.DialDatabase(int(conf["Db"].(int64))),redis.DialConnectTimeout(timeout*time.Second),redis.DialReadTimeout(timeout*time.Second),redis.DialWriteTimeout(timeout*time.Second))if err != nil {return nil, err}return con, nil},}