Skip to content

Commit

Permalink
added support for basic pub/sub
Browse files Browse the repository at this point in the history
  • Loading branch information
alash3al committed Sep 16, 2022
1 parent aab400d commit ebf8f14
Show file tree
Hide file tree
Showing 11 changed files with 138 additions and 7 deletions.
8 changes: 8 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions .idea/redix.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ Core Commands
6) "USER_3"
## in the hgetall response, redix removed the prefix you specified `/users/`
```
- `PUBLISH <channel|topic|anyword> <message here>`
- `SUBSCRIBE <channel|topic|anyword>`

Configurations
==============
Expand Down
2 changes: 2 additions & 0 deletions internals/datastore/contract/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ type Engine interface {
Write(*WriteInput) (*WriteOutput, error)
Read(*ReadInput) (*ReadOutput, error)
Iterate(*IteratorOpts) error
Publish([]byte, []byte) error
Subscribe([]byte, func([]byte) error) error
}

// WriteInput represents a PUT request
Expand Down
10 changes: 10 additions & 0 deletions internals/datastore/engines/filesystem/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,13 @@ func (e *Engine) Iterate(opts *contract.IteratorOpts) error {
func (e *Engine) Close() error {
return nil
}

// Publish not supported in filesystem mode
func (e *Engine) Publish(channel []byte, payload []byte) error {
return fmt.Errorf("the %s driver doesn't support publish/subscribe", Name)
}

// Subscribe not supported in filesystem mode
func (e *Engine) Subscribe(channel []byte, cb func([]byte) error) error {
return fmt.Errorf("the %s driver doesn't support publish/subscribe", Name)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package postgresql

import (
"context"
"crypto/md5"
"encoding/json"
"fmt"
"strconv"
Expand Down Expand Up @@ -268,3 +269,41 @@ func (e *Engine) Close() error {
e.conn.Close()
return nil
}

// Publish submits the payload to the specified channel
func (e *Engine) Publish(channel []byte, payload []byte) error {
newChannel := fmt.Sprintf("%x", md5.Sum(channel))
if _, err := e.conn.Exec(context.Background(), "SELECT pg_notify($1, $2)", newChannel, payload); err != nil {
return err
}

return nil
}

// Subscribe listens for the incoming payloads on the specified channel
func (e *Engine) Subscribe(channel []byte, cb func([]byte) error) error {
if cb == nil {
return fmt.Errorf("you must specify a callback (cb)")
}

conn, err := e.conn.Acquire(context.Background())
if err != nil {
return err
}

newChannel := fmt.Sprintf("\"%x\"", md5.Sum(channel))
if _, err := conn.Exec(context.Background(), "LISTEN "+newChannel); err != nil {
return fmt.Errorf("database::listen::err %s", err.Error())
}

for {
notification, err := conn.Conn().WaitForNotification(context.Background())
if err != nil {
return fmt.Errorf("database::notification::err %s", err.Error())
}

if err := cb([]byte(notification.Payload)); err != nil {
return fmt.Errorf("unable to process notification due to: %s", err.Error())
}
}
}
45 changes: 45 additions & 0 deletions internals/redis/commands/core_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,4 +308,49 @@ func init() {

c.Conn.WriteString("OK")
})

// PUBLISH
HandleFunc("publish", func(c *Context) {
if c.Argc < 2 {
c.Conn.WriteError("ERR wrong number of arguments for 'publish' command")
return
}

if err := c.Engine.Publish(c.AbsoluteKeyPath([]byte("redix"), c.Argv[0]), c.Argv[1]); err != nil {
c.Conn.WriteError("ERR %s " + err.Error())
return
}

c.Conn.WriteInt(0)
})

HandleFunc("subscribe", func(c *Context) {
if c.Argc != 1 {
c.Conn.WriteError("ERR wrong number of arguments for 'subscribe' command")
return
}

conn := c.Conn.Detach()
defer conn.Close()

conn.WriteArray(3)
conn.WriteBulkString("subscribe")
conn.WriteBulk(c.Argv[0])
conn.WriteInt(1)
conn.Flush()

err := c.Engine.Subscribe(c.AbsoluteKeyPath([]byte("redix"), c.Argv[0]), func(msg []byte) error {
conn.WriteArray(3)
conn.WriteBulkString("message")
conn.WriteBulk(c.Argv[0])
conn.WriteBulk(msg)
conn.Flush()
return nil
})

if err != nil {
c.Conn.WriteError("ERR " + err.Error())
return
}
})
}
4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (

"github.com/alash3al/redix/internals/config"
"github.com/alash3al/redix/internals/datastore/contract"
_ "github.com/alash3al/redix/internals/datastore/engines/postgresql"
"github.com/alash3al/redix/internals/redis"

_ "github.com/alash3al/redix/internals/datastore/engines/filesystem"
_ "github.com/alash3al/redix/internals/datastore/engines/postgresql"
)

var (
Expand Down
12 changes: 6 additions & 6 deletions redix.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ server {
}
}

// engine "postgresql" {
// dsn = "postgresql://postgres@localhost/redix"
// }

engine "filesystem" {
dsn = "./data/"
engine "postgresql" {
dsn = "postgresql://postgres@localhost/redix"
}

#engine "filesystem" {
# dsn = "./data/"
#}

0 comments on commit ebf8f14

Please sign in to comment.