Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

working redis client with pool requesting comment #16

Open
kolinfluence opened this issue Mar 25, 2024 · 0 comments
Open

working redis client with pool requesting comment #16

kolinfluence opened this issue Mar 25, 2024 · 0 comments

Comments

@kolinfluence
Copy link

kolinfluence commented Mar 25, 2024

@lesismal i got it working, can u pls give your comments?

i put client pool Put under onclose BUT i will check on how to do keep alive can you pls check and comment? thx

        defer client.pool.Put(tlsConn) //if i comment this line it will work
package main

import (
        "sync"
        "fmt"
        "github.com/lesismal/nbio"
        "github.com/lesismal/nbio/extension/tls"
        "log"
        "strings"
        "sync/atomic"
        "time"
)

type RedisClient struct {
        g          *nbio.Gopher
        addr       string
        tlsConfig  *tls.Config
        responseCh chan []byte
        errorCh    chan error
        pool       *ConnPool
}


var (
        qps int64 // Global counter for queries per second
        tlsConfig = &tls.Config{
                InsecureSkipVerify: true,
        }
)


// A simple structure for a connection pool.
type ConnPool struct {
        pool    chan *tls.Conn
        addr    string
        config  *tls.Config
        maxConn int
        mu      sync.Mutex
}


func NewRedisClient(addr string, tlsConfig *tls.Config, pool *ConnPool) *RedisClient {
        client := &RedisClient{
                g:          nbio.NewGopher(nbio.Config{}),
                addr:       addr,
                tlsConfig:  tlsConfig,
                responseCh: make(chan []byte, 1),
                errorCh:    make(chan error, 1),
                pool:       pool,
        }
        client.setupHandlers()
        return client
}

// NewConnPool creates a new connection pool.
func NewConnPool(addr string, config *tls.Config, maxConn int) *ConnPool {
        return &ConnPool{
                pool:    make(chan *tls.Conn, maxConn),
                addr:    addr,
                config:  config,
                maxConn: maxConn,
        }
}

// Get acquires a connection from the pool.
func (p *ConnPool) Get() (*tls.Conn, error) {
        select {
        case conn := <-p.pool:
                return conn, nil
        default:
                // Pool is empty, create a new connection
                return p.DialNew()
        }
}

func (p *ConnPool) DialNew() (*tls.Conn, error) {
        conn, err := tls.Dial("tcp", p.addr, p.config)
        if err != nil {
                return nil, err
        }
        return conn, nil
}


// Put returns a connection to the pool.
func (p *ConnPool) Put(conn *tls.Conn) {
        select {
        case p.pool <- conn:
                // Connection returned to the pool
log.Printf("pool returned")
        default:
log.Printf("clossssss")
                // Pool is full, close the connection
                conn.Close()
        }
}

// CloseAll drains the pool and closes all connections.
func (p *ConnPool) CloseAll() {
        for {
                select {
                case conn := <-p.pool:
                        conn.Close()
                default:
                        return
                }
        }
}

/*
func NewConnPool(addr string, config *tls.Config, maxConn int) *ConnPool {
        client := &RedisClient{
                g:         nbio.NewGopher(nbio.Config{}),
                addr:      addr,
                tlsConfig: tlsConfig,
                responseCh: make(chan []byte, 1), // Buffer of 1 for non-blocking operation
                errorCh:    make(chan error, 1),
        }
        client.setupHandlers()
        return client
}
*/

func (client *RedisClient) setupHandlers() {
        isClient := true

        client.g.OnOpen(tls.WrapOpen(client.tlsConfig, isClient, func(c *nbio.Conn, tlsConn *tls.Conn) {

                log.Printf("connection open")
                // Initialize connection setup if necessary
        }))
        client.g.OnClose(tls.WrapClose(func(c *nbio.Conn, tlsConn *tls.Conn, err error) {
                log.Printf("connection closed")
                client.errorCh <- err
                client.pool.Put(tlsConn)

        }))
        client.g.OnData(tls.WrapData(func(c *nbio.Conn, tlsConn *tls.Conn, data []byte) {
                log.Printf("data = %s", data)
                // Handle incoming data, Redis protocol
                client.responseCh <- data
        }))
}

func (client *RedisClient) Start() error {
        return client.g.Start()
}

func (client *RedisClient) Stop() {
        client.g.Stop()
        close(client.responseCh)
        close(client.errorCh)
}

func (client *RedisClient) Do(cmd string, args ...string) (interface{}, error) {
        fullCmd := fmt.Sprintf("*%d\r\n$%d\r\n%s\r\n", len(args)+1, len(cmd), cmd)
        for _, arg := range args {
                fullCmd += fmt.Sprintf("$%d\r\n%s\r\n", len(arg), arg)
        }

        atomic.AddInt64(&qps, 1)

        // Use the connection pool to get a TLS connection
        tlsConn, err := client.pool.Get()
        if err != nil {
                return nil, fmt.Errorf("failed to get connection: %v", err)
        }

        //defer client.pool.Put(tlsConn)

        // Convert *tls.Conn to *tls.Conn for use with NBIO
        nbConn, err := nbio.NBConn(tlsConn.Conn())
        if err != nil {
                return nil, fmt.Errorf("failed to convert connection: %v", err)
        }

        // step 3: set tls.Conn and nbio.Conn to each other, and add nbio.Conn to the gopher
        isNonblock := true
        nbConn.SetSession(tlsConn)
        tlsConn.ResetConn(nbConn, isNonblock)
        client.g.AddConn(nbConn)

        // step 4: write data here or in the OnOpen handler or anywhere
        _, err = tlsConn.Write([]byte(fullCmd))
        if err != nil {
                return nil, err
        }

        /*
        //c, err := client.g.Dial(client.addr)
        c, err := tls.Dial("tcp", client.addr, client.tlsConfig)
        if err != nil {
                return nil, err
        }
        defer c.Close()
        */

        select {
        case response := <-client.responseCh:
                return parseResponse(response)
        case err := <-client.errorCh:
                return nil, err
        }
}

func parseResponse(data []byte) (interface{}, error) {
        if len(data) == 0 {
                return nil, fmt.Errorf("no response")
        }

        switch data[0] {
        case '-':
                return string(data[1:]), fmt.Errorf("error response: %s", data[1:])
        case '+', ':':
                return string(data[1:]), nil
        case '$':
                parts := strings.SplitN(string(data[1:]), "\r\n", 2)
                if parts[0] == "-1" {
                        return nil, nil // Null bulk response
                }
                return parts[1], nil
        case '*':
                lines := strings.Split(string(data[1:]), "\r\n")
                var count int
                _, err := fmt.Sscanf(lines[0], "%d", &count)
                if err != nil || count <= 0 {
                        return nil, err
                }
                results := make([]interface{}, count)
                for i := 0; i < count; i++ {
                        results[i] = lines[i+1] // Simplification, might need refinement
                }
                return results, nil
        default:
                return nil, fmt.Errorf("unknown response type: %c", data[0])
        }
}

func main() {
        // TLS configuration for secure Redis connection, adjust as necessary.
        addr := "127.0.0.1:6380"
        pool := NewConnPool(addr, tlsConfig, 3) // Pool with max 3 connections
        client := NewRedisClient(addr, tlsConfig, pool)

        err := client.Start()
        if err != nil {
                log.Fatalf("Failed to start client: %v", err)
        }
        defer client.Stop()

        log.Printf("here1")

        // Use the client
        resp, err := client.Do("SET", "key", "value")
        if err != nil {
                log.Fatalf("Failed to execute command: %v", err)
        }
        fmt.Printf("SET Response: %v\n", resp)
        resp, err = client.Do("SET", "key", "value")
        if err != nil {
                log.Fatalf("Failed to execute command: %v", err)
        }
        fmt.Printf("SET Response: %v\n", resp)
        resp, err = client.Do("SET", "key", "value")
        if err != nil {
                log.Fatalf("Failed to execute command: %v", err)
        }
        fmt.Printf("SET Response: %v\n", resp)
        resp, err = client.Do("SET", "a", "b")
        if err != nil {
                log.Fatalf("Failed to execute command: %v", err)
        }
        fmt.Printf("SET Response: %v\n", resp)

        log.Printf("here2")
        resp2, err := client.Do("GET", "key")
        if err != nil {
                log.Fatalf("Failed to execute command: %v", err)
        }
        fmt.Printf("GET Response: %v\n", resp2)

        log.Printf("here3")
        // Keep the main goroutine alive for a short time to ensure the response is processed
        time.Sleep(1 * time.Second)
}
@kolinfluence kolinfluence changed the title redis client that works but connection pool Put will show error working redis client with pool requesting comment Mar 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant