Skip to content

Commit 76f552d

Browse files
authored
Merge pull request #46 from JPercivall/Adding_raw_socket_producer
Adding a producer to send over raw TCP and UDP
2 parents 08f064d + 793538d commit 76f552d

File tree

4 files changed

+165
-3
lines changed

4 files changed

+165
-3
lines changed

.gitignore

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
stress/
2+
vflow/
3+
.DS_Store
4+
5+
# Intellij
6+
.idea/
7+
*.iml
8+
*.iws
9+
*~

docs/config.md

+23
Original file line numberDiff line numberDiff line change
@@ -134,3 +134,26 @@ The NATS configuration contains the following key
134134
|Key | Default | Environment variable | Description |
135135
|---------------------| ----------------------|--------------------------|------------------------------------------------------------------|
136136
|url | nats://localhost:4222 | NA | URL addresse
137+
138+
# Raw Socket Configuration
139+
140+
Note that for messages sent over TCP and UDP using this producer, the message deliminator is a new line character ("\n").
141+
142+
## Format
143+
A config file is a plain text file in [YAML](https://en.wikipedia.org/wiki/YAML) format.
144+
145+
```
146+
key: value
147+
```
148+
149+
The default configuration file is /etc/vflow/mq.conf, you can be able to change it through vFlow configuration.
150+
151+
152+
## Configuration Keys
153+
The NATS configuration contains the following key
154+
155+
|Key | Default | Environment variable | Description |
156+
|---------------------| ----------------------|--------------------------|----------------------------------------------------------------------|
157+
|url | localhost:9555 | NA | URL address to send to. Includes the hostname and port. |
158+
|protocol | tcp | NA | Protocol to use to send. Can be either "tcp" or "udp" |
159+
|retry-max | 2 | NA | The number of times a message will be retried before giving up on it |

producer/producer.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,10 @@ type MQueue interface {
4848
// NewProducer constructs new Messaging Queue
4949
func NewProducer(mqName string) *Producer {
5050
var mqRegistered = map[string]MQueue{
51-
"kafka": new(Kafka),
52-
"nsq": new(NSQ),
53-
"nats": new(NATS),
51+
"kafka": new(Kafka),
52+
"nsq": new(NSQ),
53+
"nats": new(NATS),
54+
"rawSocket": new(RawSocket),
5455
}
5556

5657
return &Producer{

producer/rawSocket.go

+129
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
//: ----------------------------------------------------------------------------
2+
//: Copyright (C) 2017 Verizon. All Rights Reserved.
3+
//: All Rights Reserved
4+
//:
5+
//: file: rawSocket.go
6+
//: details: vflow tcp/udp producer plugin
7+
//: author: Joe Percivall
8+
//: date: 12/18/2017
9+
//:
10+
//: Licensed under the Apache License, Version 2.0 (the "License");
11+
//: you may not use this file except in compliance with the License.
12+
//: You may obtain a copy of the License at
13+
//:
14+
//: http://www.apache.org/licenses/LICENSE-2.0
15+
//:
16+
//: Unless required by applicable law or agreed to in writing, software
17+
//: distributed under the License is distributed on an "AS IS" BASIS,
18+
//: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19+
//: See the License for the specific language governing permissions and
20+
//: limitations under the License.
21+
//: ----------------------------------------------------------------------------
22+
23+
package producer
24+
25+
import (
26+
"io/ioutil"
27+
"log"
28+
"strings"
29+
30+
"fmt"
31+
"gopkg.in/yaml.v2"
32+
"net"
33+
)
34+
35+
// RawSocket represents RawSocket producer
36+
type RawSocket struct {
37+
connection net.Conn
38+
config RawSocketConfig
39+
logger *log.Logger
40+
}
41+
42+
// RawSocketConfig is the struct that holds all configuation for RawSocketConfig connections
43+
type RawSocketConfig struct {
44+
URL string `yaml:"url"`
45+
Protocol string `yaml:"protocol"`
46+
MaxRetry int `yaml:"retry-max"`
47+
}
48+
49+
func (rs *RawSocket) setup(configFile string, logger *log.Logger) error {
50+
var err error
51+
rs.config = RawSocketConfig{
52+
URL: "localhost:9555",
53+
Protocol: "tcp",
54+
MaxRetry: 2,
55+
}
56+
57+
if err = rs.load(configFile); err != nil {
58+
logger.Println(err)
59+
return err
60+
}
61+
62+
rs.connection, err = net.Dial(rs.config.Protocol, rs.config.URL)
63+
if err != nil {
64+
logger.Println(err)
65+
return err
66+
}
67+
68+
rs.logger = logger
69+
70+
return nil
71+
}
72+
73+
func (rs *RawSocket) inputMsg(topic string, mCh chan []byte, ec *uint64) {
74+
var (
75+
msg []byte
76+
err error
77+
ok bool
78+
)
79+
80+
rs.logger.Printf("start producer: RawSocket, server: %+v, Protocol: %s\n",
81+
rs.config.URL, rs.config.Protocol)
82+
83+
for {
84+
msg, ok = <-mCh
85+
if !ok {
86+
break
87+
}
88+
89+
for i := 0; ; i++ {
90+
_, err = fmt.Fprintf(rs.connection, string(msg)+"\n")
91+
if err == nil {
92+
break
93+
}
94+
95+
*ec++
96+
97+
if strings.HasSuffix(err.Error(), "broken pipe") {
98+
var newConnection, err = net.Dial(rs.config.Protocol, rs.config.URL)
99+
if err != nil {
100+
rs.logger.Println("Error when attempting to fix the broken pipe", err)
101+
} else {
102+
rs.logger.Println("Successfully reconnected")
103+
rs.connection = newConnection
104+
}
105+
}
106+
107+
if i >= (rs.config.MaxRetry) {
108+
rs.logger.Println("message failed after the configured retry limit:", err)
109+
break
110+
} else {
111+
rs.logger.Println("retrying after error:", err)
112+
}
113+
}
114+
}
115+
}
116+
117+
func (rs *RawSocket) load(f string) error {
118+
b, err := ioutil.ReadFile(f)
119+
if err != nil {
120+
return err
121+
}
122+
123+
err = yaml.Unmarshal(b, &rs.config)
124+
if err != nil {
125+
return err
126+
}
127+
128+
return nil
129+
}

0 commit comments

Comments
 (0)