Skip to content

Commit

Permalink
added async/sync options
Browse files Browse the repository at this point in the history
  • Loading branch information
alash3al committed Jan 14, 2022
1 parent be8d7c2 commit 1a1a12c
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 87 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ Redix isn't
Redix is
==========
- Simple `key => value` storage that speaks redis protocol.
- A database, you can store any size of data till your postgres db be down, or till your disk free size is about to be zero.
- A database which means you can store any size of data till your storage free size is about to be zero.
- Ready to be abused.
- Nested Large Hash-table.
-
- `Async` (all writes happen in the background), or `Sync` it won't respond to the client before writing to the internal datastore

Core Commands
=============
Expand Down
14 changes: 0 additions & 14 deletions config.master.env

This file was deleted.

17 changes: 0 additions & 17 deletions config.replica.env

This file was deleted.

5 changes: 3 additions & 2 deletions internals/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (
type Config struct {
Server struct {
Redis struct {
ListenAddr string `hcl:"listen"`
MaxConns int64 `hcl:"max_connections"`
ListenAddr string `hcl:"listen"`
AsyncWrites bool `hcl:"async"`
MaxConns int64 `hcl:"max_connections"`
} `hcl:"redis,block"`
} `hcl:"server,block"`

Expand Down
2 changes: 2 additions & 0 deletions internals/datastore/contract/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type Engine interface {
Open(string) error
Close() error
Write(*WriteInput) (*WriteOutput, error)
// BatchWrite([]*WriteInput) error
Read(*ReadInput) (*ReadOutput, error)
Iterate(*IteratorOpts) error
}
Expand Down Expand Up @@ -44,6 +45,7 @@ type ReadOutput struct {
TTL time.Duration
}

// IteratorOpts represents the itrator options
type IteratorOpts struct {
Prefix []byte
Callback func(*ReadOutput) error
Expand Down
55 changes: 55 additions & 0 deletions internals/redis/commands/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package commands

import (
"bytes"
"strings"
"sync"

"github.com/alash3al/redix/internals/config"
"github.com/alash3al/redix/internals/datastore/contract"
"github.com/tidwall/redcon"
)

// Context represents the command context
type Context struct {
Conn redcon.Conn
Engine contract.Engine
Cfg *config.Config
Argv [][]byte
Argc int

sync.RWMutex
}

// Session fetches the current session map
func (c *Context) Session() map[string]interface{} {
c.RLock()
m := c.Conn.Context().(map[string]interface{})
c.RUnlock()

return m
}

// SessionSet set a k-v into the current session
func (c *Context) SessionSet(k string, v interface{}) {
c.Lock()

m := c.Conn.Context().(map[string]interface{})
m[k] = v
c.Conn.SetContext(m)

c.Unlock()
}

// SessionGet fetches a value from the current session
func (c *Context) SessionGet(k string) (interface{}, bool) {
val, ok := c.Session()[k]

return val, ok
}

// AbsoluteKeyPath returns the full key path relative to the namespace the namespace
func (c *Context) AbsoluteKeyPath(k ...[]byte) []byte {
ns, _ := c.SessionGet("namespace")
return []byte(ns.(string) + strings.TrimLeft(string(bytes.Join(k, []byte("/"))), "/"))
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package commands
import (
"bytes"
"fmt"
"log"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -96,9 +97,17 @@ func init() {
}
}

if _, err := c.Engine.Write(&writeOpts); err != nil {
c.Conn.WriteError("Err " + err.Error())
return
if c.Cfg.Server.Redis.AsyncWrites {
go (func() {
if _, err := c.Engine.Write(&writeOpts); err != nil {
log.Println("[FATAL]", err.Error())
}
})()
} else {
if _, err := c.Engine.Write(&writeOpts); err != nil {
c.Conn.WriteError("Err " + err.Error())
return
}
}

c.Conn.WriteString("OK")
Expand Down Expand Up @@ -145,6 +154,21 @@ func init() {
delta = c.Argv[1]
}

if c.Cfg.Server.Redis.AsyncWrites {
go (func() {
if _, err := c.Engine.Write(&contract.WriteInput{
Key: c.AbsoluteKeyPath(c.Argv[0]),
Value: delta,
Increment: true,
}); err != nil {
log.Println("[FATAL]", err.Error())
}
})()

c.Conn.WriteNull()
return
}

ret, err := c.Engine.Write(&contract.WriteInput{
Key: c.AbsoluteKeyPath(c.Argv[0]),
Value: delta,
Expand All @@ -166,6 +190,25 @@ func init() {
return
}

if c.Cfg.Server.Redis.AsyncWrites {
go (func() {
for i := range c.Argv {
_, err := c.Engine.Write(&contract.WriteInput{
Key: c.AbsoluteKeyPath(c.Argv[i]),
Value: nil,
})

if err != nil {
log.Println("[FATAL]", err.Error())
return
}
}
})()

c.Conn.WriteString("OK")
return
}

for i := range c.Argv {
_, err := c.Engine.Write(&contract.WriteInput{
Key: c.AbsoluteKeyPath(c.Argv[i]),
Expand Down
49 changes: 0 additions & 49 deletions internals/redis/commands/registry.go
Original file line number Diff line number Diff line change
@@ -1,60 +1,11 @@
package commands

import (
"bytes"
"fmt"
"strings"
"sync"

"github.com/alash3al/redix/internals/config"
"github.com/alash3al/redix/internals/datastore/contract"
"github.com/tidwall/redcon"
)

// Context represents the command context
type Context struct {
Conn redcon.Conn
Engine contract.Engine
Cfg *config.Config
Argv [][]byte
Argc int

sync.RWMutex
}

// Session fetches the current session map
func (c *Context) Session() map[string]interface{} {
c.RLock()
m := c.Conn.Context().(map[string]interface{})
c.RUnlock()

return m
}

// SessionSet set a k-v into the current session
func (c *Context) SessionSet(k string, v interface{}) {
c.Lock()

m := c.Conn.Context().(map[string]interface{})
m[k] = v
c.Conn.SetContext(m)

c.Unlock()
}

// SessionGet fetches a value from the current session
func (c *Context) SessionGet(k string) (interface{}, bool) {
val, ok := c.Session()[k]

return val, ok
}

// AbsoluteKeyPath returns the full key path relative to the namespace the namespace
func (c *Context) AbsoluteKeyPath(k ...[]byte) []byte {
ns, _ := c.SessionGet("namespace")
return []byte(ns.(string) + string(bytes.Join(k, []byte("/"))))
}

// Handler a command handler func
type Handler func(*Context)

Expand Down
1 change: 1 addition & 0 deletions redix.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ server {
redis {
listen = ":6380"
max_connections = 100
async = true
}
}

Expand Down

0 comments on commit 1a1a12c

Please sign in to comment.