-
Notifications
You must be signed in to change notification settings - Fork 0
/
event_converter.go
83 lines (66 loc) · 1.95 KB
/
event_converter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package hevent
import (
"context"
"errors"
"github.com/kamva/hexa"
"github.com/kamva/tracer"
)
const (
HeaderKeyReplyChannel = "_reply_channel"
HeaderKeyPayloadEncoder = "_payload_encoder" // the message body.
)
type RawMessageConverter interface {
EventToRaw(c context.Context, e *Event) (*RawMessage, error)
// RawMsgToMessage converts the raw message to a message.
// primary is the primary driver's message that its receiver will get.
RawMsgToMessage(c context.Context, raw *RawMessage, primary interface{}) (context.Context, Message, error)
}
type rawMessageConverter struct {
p hexa.ContextPropagator
e Encoder
}
func NewRawMessageConverter(p hexa.ContextPropagator, e Encoder) RawMessageConverter {
return &rawMessageConverter{
p: p,
e: e,
}
}
func (m *rawMessageConverter) EventToRaw(ctx context.Context, event *Event) (*RawMessage, error) {
payload, err := m.e.Encode(event.Payload)
if err != nil {
return nil, tracer.Trace(err)
}
headers, err := m.p.Inject(ctx)
if err != nil {
return nil, tracer.Trace(err)
}
headers[HeaderKeyReplyChannel] = []byte(event.ReplyChannel)
headers[HeaderKeyPayloadEncoder] = []byte(m.e.Name())
return &RawMessage{
Headers: headers,
Payload: payload,
}, err
}
func (m *rawMessageConverter) RawMsgToMessage(c context.Context, rawMsg *RawMessage, primary interface{}) (
ctx context.Context, msg Message, err error) {
ctx, err = m.p.Extract(c, rawMsg.Headers)
if err != nil {
err = tracer.Trace(err)
return
}
encoderName := string(rawMsg.Headers[HeaderKeyPayloadEncoder])
encoder, ok := encoders[encoderName]
if !ok {
err = errors.New("can not find message payload's encoder/decoder")
return
}
msg = Message{
Primary: primary,
Headers: rawMsg.Headers,
CorrelationId: hexa.CtxCorrelationId(ctx),
ReplyChannel: string(rawMsg.Headers[HeaderKeyReplyChannel]),
Payload: encoder.Decoder(rawMsg.Payload),
}
return
}
var _ RawMessageConverter = &rawMessageConverter{}