-
-
Notifications
You must be signed in to change notification settings - Fork 32
/
web_push_subscription.go
157 lines (126 loc) · 3.88 KB
/
web_push_subscription.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package nakama
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"sync"
"time"
"github.com/SherClockHolmes/webpush-go"
)
const (
webPushNoticationSendTimeout = time.Second * 30
webPushNoticationContact = "[email protected]"
)
var (
errWebPushSubscriptionGone = GoneError("web push subscription gone")
)
func (svc *Service) AddWebPushSubscription(ctx context.Context, sub webpush.Subscription) error {
uid, ok := ctx.Value(KeyAuthUserID).(string)
if !ok {
return ErrUnauthenticated
}
query := "INSERT INTO user_web_push_subscriptions (user_id, sub) VALUES ($1, $2)"
_, err := svc.DB.ExecContext(ctx, query, uid, jsonValue{sub})
if isUniqueViolation(err) {
return nil
}
if err != nil {
return fmt.Errorf("could not sql insert user web push subscription: %w", err)
}
return nil
}
func (svc *Service) webPushSubscriptions(ctx context.Context, userID string) ([]webpush.Subscription, error) {
query := "SELECT sub FROM user_web_push_subscriptions WHERE user_id = $1 ORDER BY created_at DESC"
rows, err := svc.DB.QueryContext(ctx, query, userID)
if err != nil {
return nil, fmt.Errorf("could not sql query select user web push susbcriptions: %w", err)
}
defer rows.Close()
var subs []webpush.Subscription
for rows.Next() {
var sub webpush.Subscription
err := rows.Scan(&jsonValue{&sub})
if err != nil {
return nil, fmt.Errorf("could not sql scan user web push subscription: %w", err)
}
subs = append(subs, sub)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("could not sql query iterate over user web push subscriptions: %w", err)
}
return subs, nil
}
func (svc *Service) sendWebPushNotifications(n Notification) {
ctx := context.Background()
subs, err := svc.webPushSubscriptions(ctx, n.UserID)
if err != nil {
_ = svc.Logger.Log("err", err)
return
}
if len(subs) == 0 {
return
}
message, err := json.Marshal(n)
if err != nil {
_ = svc.Logger.Log("err", fmt.Errorf("could not json marshal web push notification message: %w", err))
return
}
var topic string
if n.PostID != nil {
// Topic can have only 32 characters.
// By removing the dashes from the UUID we can go from 36 to 32 characters.
topic = strings.ReplaceAll(*n.PostID, "-", "")
}
var wg sync.WaitGroup
for _, sub := range subs {
wg.Add(1)
sub := sub
go func() {
defer wg.Done()
err := svc.sendWebPushNotification(sub, message, topic)
if errors.Is(err, errWebPushSubscriptionGone) {
err = svc.deleteWebPushSubscription(ctx, n.UserID, sub)
}
if err != nil {
_ = svc.Logger.Log("err", err)
}
}()
}
wg.Wait()
}
func (svc *Service) sendWebPushNotification(sub webpush.Subscription, message []byte, topic string) error {
resp, err := webpush.SendNotification(message, &sub, &webpush.Options{
Subscriber: webPushNoticationContact,
Topic: topic,
VAPIDPrivateKey: svc.VAPIDPrivateKey,
VAPIDPublicKey: svc.VAPIDPublicKey,
TTL: int(webPushNoticationSendTimeout.Seconds()),
})
if err != nil {
return fmt.Errorf("could not send web push notification: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
// subscription has been removed.
if resp.StatusCode == http.StatusGone {
return errWebPushSubscriptionGone
}
if b, err := io.ReadAll(resp.Body); err == nil {
return fmt.Errorf("web push notification send failed with status code %d: %s", resp.StatusCode, string(b))
}
return fmt.Errorf("web push notification send failed with status code %d", resp.StatusCode)
}
return nil
}
func (svc *Service) deleteWebPushSubscription(ctx context.Context, userID string, sub webpush.Subscription) error {
query := "DELETE FROM user_web_push_subscriptions WHERE user_id = $1 AND sub = $2"
_, err := svc.DB.ExecContext(ctx, query, userID, jsonValue{sub})
if err != nil {
return fmt.Errorf("sql delete user web push subscription: %w", err)
}
return nil
}