diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..64262b0 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/redix.iml b/.idea/redix.iml new file mode 100644 index 0000000..5e764c4 --- /dev/null +++ b/.idea/redix.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/README.md b/README.md index 52499f4..ee71b3a 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,8 @@ Core Commands 6) "USER_3" ## in the hgetall response, redix removed the prefix you specified `/users/` ``` +- `PUBLISH ` +- `SUBSCRIBE ` Configurations ============== diff --git a/internals/datastore/contract/engine.go b/internals/datastore/contract/engine.go index 47a09fe..eb98d0d 100644 --- a/internals/datastore/contract/engine.go +++ b/internals/datastore/contract/engine.go @@ -12,6 +12,8 @@ type Engine interface { Write(*WriteInput) (*WriteOutput, error) Read(*ReadInput) (*ReadOutput, error) Iterate(*IteratorOpts) error + Publish([]byte, []byte) error + Subscribe([]byte, func([]byte) error) error } // WriteInput represents a PUT request diff --git a/internals/datastore/engines/filesystem/filesystem.go b/internals/datastore/engines/filesystem/filesystem.go index b0969c2..74905d2 100644 --- a/internals/datastore/engines/filesystem/filesystem.go +++ b/internals/datastore/engines/filesystem/filesystem.go @@ -185,3 +185,13 @@ func (e *Engine) Iterate(opts *contract.IteratorOpts) error { func (e *Engine) Close() error { return nil } + +// Publish not supported in filesystem mode +func (e *Engine) Publish(channel []byte, payload []byte) error { + return fmt.Errorf("the %s driver doesn't support publish/subscribe", Name) +} + +// Subscribe not supported in filesystem mode +func (e *Engine) Subscribe(channel []byte, cb func([]byte) error) error { + return fmt.Errorf("the %s driver doesn't support publish/subscribe", Name) +} diff --git a/internals/datastore/engines/postgresql/postgres.go b/internals/datastore/engines/postgresql/postgresql.go similarity index 83% rename from internals/datastore/engines/postgresql/postgres.go rename to internals/datastore/engines/postgresql/postgresql.go index 5d05191..be5729d 100644 --- a/internals/datastore/engines/postgresql/postgres.go +++ b/internals/datastore/engines/postgresql/postgresql.go @@ -2,6 +2,7 @@ package postgresql import ( "context" + "crypto/md5" "encoding/json" "fmt" "strconv" @@ -268,3 +269,41 @@ func (e *Engine) Close() error { e.conn.Close() return nil } + +// Publish submits the payload to the specified channel +func (e *Engine) Publish(channel []byte, payload []byte) error { + newChannel := fmt.Sprintf("%x", md5.Sum(channel)) + if _, err := e.conn.Exec(context.Background(), "SELECT pg_notify($1, $2)", newChannel, payload); err != nil { + return err + } + + return nil +} + +// Subscribe listens for the incoming payloads on the specified channel +func (e *Engine) Subscribe(channel []byte, cb func([]byte) error) error { + if cb == nil { + return fmt.Errorf("you must specify a callback (cb)") + } + + conn, err := e.conn.Acquire(context.Background()) + if err != nil { + return err + } + + newChannel := fmt.Sprintf("\"%x\"", md5.Sum(channel)) + if _, err := conn.Exec(context.Background(), "LISTEN "+newChannel); err != nil { + return fmt.Errorf("database::listen::err %s", err.Error()) + } + + for { + notification, err := conn.Conn().WaitForNotification(context.Background()) + if err != nil { + return fmt.Errorf("database::notification::err %s", err.Error()) + } + + if err := cb([]byte(notification.Payload)); err != nil { + return fmt.Errorf("unable to process notification due to: %s", err.Error()) + } + } +} diff --git a/internals/redis/commands/core_commands.go b/internals/redis/commands/core_commands.go index d75d9a6..1e0aea4 100644 --- a/internals/redis/commands/core_commands.go +++ b/internals/redis/commands/core_commands.go @@ -308,4 +308,49 @@ func init() { c.Conn.WriteString("OK") }) + + // PUBLISH + HandleFunc("publish", func(c *Context) { + if c.Argc < 2 { + c.Conn.WriteError("ERR wrong number of arguments for 'publish' command") + return + } + + if err := c.Engine.Publish(c.AbsoluteKeyPath([]byte("redix"), c.Argv[0]), c.Argv[1]); err != nil { + c.Conn.WriteError("ERR %s " + err.Error()) + return + } + + c.Conn.WriteInt(0) + }) + + HandleFunc("subscribe", func(c *Context) { + if c.Argc != 1 { + c.Conn.WriteError("ERR wrong number of arguments for 'subscribe' command") + return + } + + conn := c.Conn.Detach() + defer conn.Close() + + conn.WriteArray(3) + conn.WriteBulkString("subscribe") + conn.WriteBulk(c.Argv[0]) + conn.WriteInt(1) + conn.Flush() + + err := c.Engine.Subscribe(c.AbsoluteKeyPath([]byte("redix"), c.Argv[0]), func(msg []byte) error { + conn.WriteArray(3) + conn.WriteBulkString("message") + conn.WriteBulk(c.Argv[0]) + conn.WriteBulk(msg) + conn.Flush() + return nil + }) + + if err != nil { + c.Conn.WriteError("ERR " + err.Error()) + return + } + }) } diff --git a/main.go b/main.go index 87a1463..62ab554 100644 --- a/main.go +++ b/main.go @@ -9,8 +9,10 @@ import ( "github.com/alash3al/redix/internals/config" "github.com/alash3al/redix/internals/datastore/contract" - _ "github.com/alash3al/redix/internals/datastore/engines/postgresql" "github.com/alash3al/redix/internals/redis" + + _ "github.com/alash3al/redix/internals/datastore/engines/filesystem" + _ "github.com/alash3al/redix/internals/datastore/engines/postgresql" ) var ( diff --git a/redix.hcl b/redix.hcl index 8db12de..4a08b84 100644 --- a/redix.hcl +++ b/redix.hcl @@ -6,10 +6,10 @@ server { } } -// engine "postgresql" { -// dsn = "postgresql://postgres@localhost/redix" -// } - -engine "filesystem" { - dsn = "./data/" +engine "postgresql" { + dsn = "postgresql://postgres@localhost/redix" } + +#engine "filesystem" { +# dsn = "./data/" +#}