Skip to content

Commit

Permalink
API change to ListenTable giving the option to set custom channel, fu…
Browse files Browse the repository at this point in the history
…nction names and a list of tables with event types to watch
  • Loading branch information
kataras committed Nov 4, 2023
1 parent f89cf60 commit d579977
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 39 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ func main() {
│ - UpsertSingle(ctx context.Context, uniqueIndex string, value T, destIdPtr any) error │
│ │
│ - Delete(ctx context.Context, values ...T) (int64, error) │
│ │
│ - ListenTable(ctx context.Context, cb func(notification, error) error) (Closer, error) │
└────────────────────────────────────────────────────────────────────────────────────────────┘
```

Expand Down
2 changes: 1 addition & 1 deletion _examples/live-table/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@

<body>
<div class="container">
<h1>Futuristic Minimalist Template</h1>
<h1>PG Real-Time Data</h1>
<table>
<thead>
<tr>
Expand Down
2 changes: 1 addition & 1 deletion _examples/live-table/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Example of real-time database table to html5 table view using Go, Postgres, HTML
1. Modify the connectionString variable to match your database connection string.
2. Execute `go run main.go`
3. Open a browser window at http://localhost:8080
3.1 Open your database client and execute `INSERT INTO users (email, name, username) VALUES ('kataras2006@hotmail.com', 'Iris', 'iris');`
3.1 Open your database client and execute `INSERT INTO users (email, name, username) VALUES ('john.doe@example.com', 'John Doe', 'johndoe');`
3.2 Make any changes to the users table, e.g. insert new row, update and delete,
4. See the changes in real-time in your browser window (HTML Table).
*/
Expand Down
10 changes: 10 additions & 0 deletions db_information_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"reflect"
"strings"

"github.com/kataras/pg/desc"
)
Expand Down Expand Up @@ -56,6 +57,15 @@ func ExampleDB_ListColumns() {
`[customers.username] pg:"name=username,type=varchar(255),default=''::character varying"`, // before columns convert from struct field should match this.
}

if len(columns) != len(expectedTags) {
fmt.Printf("expected %d columns but got %d\n%s", len(expectedTags), len(columns), strings.Join(expectedTags, "\n"))
fmt.Println("\n=========")
for _, c := range columns {
fmt.Println(c.Name)
}
return
}

for i, column := range columns {
expected := expectedTags[i]
got := fmt.Sprintf("[%s.%s] %s", column.TableName, column.Name, column.FieldTagString(true))
Expand Down
152 changes: 130 additions & 22 deletions db_table_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import (
"errors"
"fmt"
"io"
"strings"
"sync/atomic"

"github.com/kataras/pg/desc"
)

// TableChangeType is the type of the table change.
Expand All @@ -22,6 +25,22 @@ const (
TableChangeTypeDelete TableChangeType = "DELETE"
)

func changesToString(changes []TableChangeType) string {
if len(changes) == 0 {
return ""
}

var b strings.Builder
for i, change := range changes {
b.WriteString(string(change))
if i < len(changes)-1 {
b.WriteString(" OR ")
}
}

return b.String()
}

type (
// TableNotification is the notification message sent by the postgresql server
// when a table change occurs.
Expand Down Expand Up @@ -50,22 +69,97 @@ func (tn TableNotification[T]) GetPayload() string {
return tn.payload
}

// ListenTable registers a function which notifies on the given "table" changes (INSERT, UPDATE, DELETE),
// the subscribed postgres channel is named 'table_change_notifications'.
//
// The callback function can return ErrStop to stop the listener without actual error.
// The callback function can return any other error to stop the listener and return the error.
// The callback function can return nil to continue listening.
//
// TableNotification's New and Old fields are raw json values, use the "json.Unmarshal" to decode them
// to the actual type.
func (db *DB) ListenTable(ctx context.Context, table string, callback func(TableNotificationJSON, error) error) (Closer, error) {
channelName := "table_change_notifications"
// ListenTableOptions is the options for the "DB.ListenTable" method.
type ListenTableOptions struct {
// Tables map of table name and changes to listen for.
//
// Key is the table to listen on for changes.
// Value is changes is the list of table changes to listen for.
// Defaults to {"*": ["INSERT", "UPDATE", "DELETE"] }.
Tables map[string][]TableChangeType

// Channel is the name of the postgres channel to listen on.
// Default: "table_change_notifications".
Channel string

// Function is the name of the postgres function
// which is used to notify on table changes, the
// trigger name is <table_name>_<Function>.
// Defaults to "table_change_notify".
Function string
}

var defaultChangesToWatch = []TableChangeType{TableChangeTypeInsert, TableChangeTypeUpdate, TableChangeTypeDelete}

func (opts *ListenTableOptions) setDefaults() {
if opts.Channel == "" {
opts.Channel = "table_change_notifications"
}

if opts.Function == "" {
opts.Function = "table_change_notify"
}

if len(opts.Tables) == 0 {
opts.Tables = map[string][]TableChangeType{wildcardTableStr: defaultChangesToWatch}
}
}

const wildcardTableStr = "*"

// PrepareListenTable prepares the table for listening for live table updates.
// See "db.ListenTable" method for more.
func (db *DB) PrepareListenTable(ctx context.Context, opts *ListenTableOptions) error {
opts.setDefaults()

isWildcard := false
for table := range opts.Tables {
if table == wildcardTableStr {
isWildcard = true
break
}
}

if isWildcard {
changesToWatch := opts.Tables[wildcardTableStr]
if len(changesToWatch) == 0 {
return nil
}

delete(opts.Tables, wildcardTableStr) // remove the wildcard entry and replace with table names in registered schema.
for _, table := range db.schema.TableNames(desc.TableTypeBase) {
opts.Tables[table] = changesToWatch
}
}

if len(opts.Tables) == 0 {
return nil
}

for table, changes := range opts.Tables {
if err := db.prepareListenTable(ctx, opts.Channel, opts.Function, table, changes); err != nil {
return err
}
}

return nil
}

// PrepareListenTable prepares the table for listening for live table updates.
// See "db.ListenTable" method for more.
func (db *DB) prepareListenTable(ctx context.Context, channel, function, table string, changes []TableChangeType) error {
if table == "" {
return errors.New("empty table name")
}

if len(changes) == 0 {
return nil
}

if atomic.LoadUint32(db.tableChangeNotifyFunctionOnce) == 0 {
// First, check and create the trigger for all tables.
query := fmt.Sprintf(`
CREATE OR REPLACE FUNCTION table_change_notify() RETURNS trigger AS $$
CREATE OR REPLACE FUNCTION %s() RETURNS trigger AS $$
DECLARE
payload text;
channel text := '%s';
Expand All @@ -81,11 +175,11 @@ func (db *DB) ListenTable(ctx context.Context, table string, callback func(Table
END IF;
END;
$$
LANGUAGE plpgsql;`, channelName)
LANGUAGE plpgsql;`, function, channel)

_, err := db.Exec(ctx, query)
if err != nil {
return nil, fmt.Errorf("create or replace function table_change_notify: %w", err)
return fmt.Errorf("create or replace function table_change_notify: %w", err)
}

atomic.StoreUint32(db.tableChangeNotifyFunctionOnce, 1)
Expand All @@ -95,25 +189,39 @@ func (db *DB) ListenTable(ctx context.Context, table string, callback func(Table
_, triggerCreated := db.tableChangeNotifyTriggerOnce[table]
db.tableChangeNotifyOnceMutex.RUnlock()
if !triggerCreated {
query := `CREATE OR REPLACE TRIGGER ` + table + `_table_change_notify
BEFORE INSERT OR
UPDATE OR
DELETE
ON ` + table + `
query := fmt.Sprintf(`CREATE OR REPLACE TRIGGER %s_%s
AFTER %s
ON %s
FOR EACH ROW
EXECUTE FUNCTION table_change_notify();`
EXECUTE FUNCTION table_change_notify();`, table, function, changesToString(changes), table)

_, err := db.Exec(ctx, query)
if err != nil {
return nil, fmt.Errorf("create trigger %s_table_change_notify: %w", table, err)
return fmt.Errorf("create trigger %s_table_change_notify: %w", table, err)
}

db.tableChangeNotifyOnceMutex.Lock()
db.tableChangeNotifyTriggerOnce[table] = struct{}{}
db.tableChangeNotifyOnceMutex.Unlock()
}

conn, err := db.Listen(ctx, channelName)
return nil
}

// ListenTable registers a function which notifies on the given "table" changes (INSERT, UPDATE, DELETE),
// the subscribed postgres channel is named 'table_change_notifications'.
//
// The callback function can return any other error to stop the listener.
// The callback function can return nil to continue listening.
//
// TableNotification's New and Old fields are raw json values, use the "json.Unmarshal" to decode them
// to the actual type.
func (db *DB) ListenTable(ctx context.Context, opts *ListenTableOptions, callback func(TableNotificationJSON, error) error) (Closer, error) {
if err := db.PrepareListenTable(ctx, opts); err != nil {
return nil, err
}

conn, err := db.Listen(ctx, opts.Channel)
if err != nil {
return nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion db_table_listener_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ func ExampleDB_ListenTable() {
}
defer db.Close()

closer, err := db.ListenTable(context.Background(), "customers", func(evt TableNotificationJSON, err error) error {
opts := &ListenTableOptions{
Tables: map[string][]TableChangeType{"customers": defaultChangesToWatch},
}
closer, err := db.ListenTable(context.Background(), opts, func(evt TableNotificationJSON, err error) error {
if err != nil {
fmt.Printf("received error: %v\n", err)
return err
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ go 1.21

require (
github.com/gertd/go-pluralize v0.2.1
github.com/jackc/pgx/v5 v5.4.3
golang.org/x/mod v0.13.0
github.com/jackc/pgx/v5 v5.5.0
golang.org/x/mod v0.14.0
)

require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/sync v0.4.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/text v0.13.0 // indirect
)
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.4.3 h1:cxFyXhxlvAifxnkKKdlxv8XqUf59tDlYjnV5YYfsJJY=
github.com/jackc/pgx/v5 v5.4.3/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA=
github.com/jackc/pgx/v5 v5.5.0 h1:NxstgwndsTRy7eq9/kqYc/BZh5w2hHJV86wjvO+1xPw=
github.com/jackc/pgx/v5 v5.5.0/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand All @@ -20,10 +20,10 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/mod v0.13.0 h1:I/DsJXRlw/8l/0c24sM9yb0T4z9liZTduXvdAWYiysY=
golang.org/x/mod v0.13.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ=
golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0=
golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
17 changes: 12 additions & 5 deletions repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,21 @@ func (repo *Repository[T]) Duplicate(ctx context.Context, id any, newIDPtr any)
// The callback function can return any other error to stop the listener and return the error.
// The callback function can return nil to continue listening.
func (repo *Repository[T]) ListenTable(ctx context.Context, callback func(TableNotification[T], error) error) (Closer, error) {
return repo.db.ListenTable(ctx, repo.td.Name, func(tableEvt TableNotificationJSON, err error) error {
opts := &ListenTableOptions{
Tables: map[string][]TableChangeType{repo.td.Name: defaultChangesToWatch},
}
return repo.db.ListenTable(ctx, opts, func(tableEvt TableNotificationJSON, err error) error {
if err != nil {
failEvt := TableNotification[T]{
Table: repo.td.Name,
Change: tableEvt.Change, // may empty.
if tableEvt.Table == repo.td.Name {
failEvt := TableNotification[T]{
Table: repo.td.Name,
Change: tableEvt.Change, // may empty.
}

return callback(failEvt, err)
}

return callback(failEvt, err)
return err
}

evt := TableNotification[T]{
Expand Down

0 comments on commit d579977

Please sign in to comment.