-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwrite.go
More file actions
81 lines (67 loc) · 1.48 KB
/
write.go
File metadata and controls
81 lines (67 loc) · 1.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package stomp
import (
"bytes"
"errors"
"fmt"
"strings"
"time"
)
type frame struct {
body *Frame
options []Option
ch chan error
}
func (c *Conn) safeWrite(f *Frame, options ...Option) error {
ch := make(chan error)
frame := frame{
body: f,
options: options,
ch: ch,
}
select {
case <-c.closeC:
return errors.New("connection closed")
case c.writeC <- frame:
return <-ch
}
}
// unsafeWrite writes the next frame. This function is not thread safe!
func (c *Conn) unsafeWrite(f *Frame, options ...Option) error {
if c.whb > 0 {
c.conn.SetWriteDeadline(time.Now().Add(2 * c.whb))
} else {
c.conn.SetWriteDeadline(time.Time{})
}
_, err := c.conn.Write(encodeFrame(f, options...))
return err
}
func encodeHeader(key, value string) string {
encode := func(v string) string {
v = strings.Replace(v, "\\", "\\\\", -1)
v = strings.Replace(v, "\r", "\\r", -1)
v = strings.Replace(v, "\n", "\\n", -1)
v = strings.Replace(v, ":", "\\c", -1)
return v
}
return fmt.Sprintf("%s:%s", encode(key), encode(value))
}
func encodeFrame(f *Frame, options ...Option) []byte {
for _, fn := range options {
fn(f)
}
// encode command
buf := bytes.Buffer{}
buf.WriteString(f.Command)
buf.WriteString("\n")
// encode header
for key, value := range f.Header {
buf.WriteString(encodeHeader(key, value))
buf.WriteString("\n")
}
buf.WriteString("\n")
// encode body
buf.Write(f.Body)
// terminate frame
buf.WriteString("\x00\n")
return buf.Bytes()
}