-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.go
143 lines (119 loc) · 2.89 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package rpc_stream
import (
"errors"
"fmt"
"net"
"strings"
"sync"
"github.com/stackengine/selog"
)
var (
sLog = selog.Register("rpc_stream", 0)
ErrMissingServFunc = errors.New("Proto struct is missing serv function")
ErrMissingName = errors.New("Proto needs a name")
ErrAlreadyExists = errors.New("Proto already exists")
ErrBadVersions = errors.New("Mux version unsupported")
ErrProtoUnsupported = errors.New("Proto not supported")
ErrNoProto = errors.New("Add must have a proto")
)
type MuxVersion uint8
const (
UnknownVersion MuxVersion = iota
Mux_v1
Mux_v2
)
var MuxVersionName = map[MuxVersion]string{
UnknownVersion: "unknown",
Mux_v1: "Mux_v1",
Mux_v2: "Mux_v2",
}
func (mv MuxVersion) String() string {
str := MuxVersionName[mv]
if len(str) < 1 {
return MuxVersionName[UnknownVersion]
}
return str
}
// known stream types.
const (
RpcTLS = "RPCTLS"
Raft = "RAFT"
Mesh = "MESH"
Registered = "REG"
)
func Nameify(name string) string {
return fmt.Sprintf("%s\n", strings.ToUpper(name))
}
var (
SprotoSw = make(map[MuxVersion]map[string]*Sproto)
proto_lck sync.Mutex
)
func init() {
SprotoSw[Mux_v2] = make(map[string]*Sproto)
// lame but works
SprotoSw[Mux_v2][RpcTLS] = &Sproto{name: RpcTLS}
SprotoSw[Mux_v2][Raft] = &Sproto{name: Raft}
SprotoSw[Mux_v2][Mesh] = &Sproto{name: Mesh}
SprotoSw[Mux_v2][Registered] = &Sproto{name: Registered}
}
type Sproto struct {
name string
serv func(conn net.Conn) error
}
func (p *Sproto) String() string {
return fmt.Sprintf("%-12.12s %p", p.name, p.serv)
}
func NewProto(name string, serv func(net.Conn) error) (*Sproto, error) {
if len(name) < 1 {
return nil, ErrMissingName
}
if serv == nil {
return nil, ErrMissingServFunc
}
return &Sproto{name: strings.ToUpper(name), serv: serv}, nil
}
func Lookup(ver MuxVersion, s string) (func(conn net.Conn) error, error) {
vSw := SprotoSw[ver]
if vSw == nil {
return nil, ErrBadVersions
}
// sLog.Printf("looking for v: %d '%s' in SprotoSw: %#v", ver, s, vSw)
proto := vSw[s]
if proto == nil || proto.serv == nil {
return nil, ErrProtoUnsupported
}
return proto.serv, nil
}
// add a new stream type to the 'proto-switch'
// or override default handlers.
func Add(ver MuxVersion, proto *Sproto) error {
// validate that proto is saneish
if proto == nil {
return ErrNoProto
}
if proto.serv == nil {
return ErrMissingServFunc
}
if len(proto.name) < 1 {
return ErrMissingName
}
proto.name = strings.ToUpper(proto.name)
// these are hard coded and can not be overridden
if proto.name == RpcTLS ||
proto.name == Registered {
return ErrAlreadyExists
}
proto_lck.Lock()
defer proto_lck.Unlock()
ver_proto := SprotoSw[ver]
for i, p := range ver_proto {
// if we find it replace it
if p.name == proto.name {
ver_proto[i] = proto
return nil
}
}
// else add it
ver_proto[proto.name] = proto
return nil
}