-
Notifications
You must be signed in to change notification settings - Fork 61
/
writer.go
136 lines (118 loc) · 2.63 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package qrpc
import (
"context"
"net"
"time"
)
// Writer writes data to connection
type Writer struct {
ctx context.Context
conn net.Conn
timeout int
}
const (
// WriteNoTimeout will never timeout
WriteNoTimeout = -1
// CtxCheckMaxInterval for check ctx.Done
CtxCheckMaxInterval = 3 * time.Second
)
// NewWriter new instance
func NewWriter(ctx context.Context, conn net.Conn) *Writer {
return &Writer{ctx: ctx, conn: conn, timeout: WriteNoTimeout}
}
// NewWriterWithTimeout new instance with timeout
func NewWriterWithTimeout(ctx context.Context, conn net.Conn, timeout int) *Writer {
return &Writer{ctx: ctx, conn: conn, timeout: timeout}
}
// Write writes bytes
func (w *Writer) Write(bytes []byte) (int, error) {
var (
endTime time.Time
writeTimeout time.Duration
offset int
n int
err error
)
timeout := w.timeout
if timeout > 0 {
endTime = time.Now().Add(time.Duration(timeout) * time.Second)
}
size := len(bytes)
for {
if timeout > 0 {
writeTimeout = endTime.Sub(time.Now())
if writeTimeout > CtxCheckMaxInterval {
writeTimeout = CtxCheckMaxInterval
}
} else {
writeTimeout = CtxCheckMaxInterval
}
w.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
n, err = w.conn.Write(bytes[offset:])
offset += n
if err != nil {
if opError, ok := err.(*net.OpError); ok && opError.Timeout() {
if timeout > 0 && time.Now().After(endTime) {
return offset, err
}
} else {
return offset, err
}
}
if offset >= size {
return offset, nil
}
select {
case <-w.ctx.Done():
return offset, w.ctx.Err()
default:
}
}
}
func (w *Writer) writeBuffers(buffs *net.Buffers) (int64, error) {
var (
endTime time.Time
writeTimeout time.Duration
offset int64
n int64
err error
)
timeout := w.timeout
if timeout > 0 {
endTime = time.Now().Add(time.Duration(timeout) * time.Second)
}
var size int64
for _, bytes := range *buffs {
size += int64(len(bytes))
}
for {
if timeout > 0 {
writeTimeout = endTime.Sub(time.Now())
if writeTimeout > CtxCheckMaxInterval {
writeTimeout = CtxCheckMaxInterval
}
} else {
writeTimeout = CtxCheckMaxInterval
}
w.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
n, err = buffs.WriteTo(w.conn)
offset += n
if err != nil {
if opError, ok := err.(*net.OpError); ok && opError.Timeout() {
if timeout > 0 && time.Now().After(endTime) {
return offset, err
}
} else {
return offset, err
}
}
if offset >= size {
return offset, nil
}
select {
case <-w.ctx.Done():
return offset, w.ctx.Err()
default:
}
}
}