Skip to content

Commit

Permalink
Update to v0.0.11
Browse files Browse the repository at this point in the history
fixes: #20 and more. Relative to kataras/neffos.js@b5a916e
  • Loading branch information
kataras committed Dec 10, 2019
1 parent 6583461 commit 7c3c769
Show file tree
Hide file tree
Showing 14 changed files with 139 additions and 80 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ os:
- linux
- osx
go:
- 1.12.x
- 1.13.x
go_import_path: github.com/kataras/neffos
install:
- go get ./...
Expand Down
22 changes: 19 additions & 3 deletions _examples/protobuf/browser/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ var port = document.location.port ? ":" + document.location.port : "";
var wsURL = scheme + "://" + document.location.hostname + port + "/echo";

var outputTxt = document.getElementById("output");

function addMessage(msg) {
outputTxt.innerHTML += msg + "\n";
}
Expand All @@ -28,7 +29,19 @@ function handleNamespaceConnectedConn(nsConn) {
const userMsg = new protos.UserMessage();
userMsg.setUsername(username);
userMsg.setText(input);
nsConn.emit("chat", userMsg.serializeBinary());

const body = userMsg.serializeBinary()
let msg = new neffos.Message();
msg.Namespace = "default";
msg.Event = "chat";
msg.Body = body;
msg.SetBinary = true;
nsConn.conn.write(msg);
//
// OR: javascript side will check if body is binary,
// and if it's it will convert it to valid utf-8 text before sending.
// To keep the data as they are, please prefer the above commented code (msg.SetBinary = true).
// nsConn.emit("chat", body);
addMessage("Me: " + input);
};
}
Expand All @@ -47,9 +60,12 @@ async function runExample() {
addMessage("disconnected from namespace: " + msg.Namespace);
},
chat: function (nsConn, msg) { // "chat" event.
var serialized = new Uint8Array(msg.Body.split(","));
const userMsg = protos.UserMessage.deserializeBinary(serialized);
console.log(msg);
const userMsg = protos.UserMessage.deserializeBinary(msg.Body);
addMessage(userMsg.getUsername() + ": " + userMsg.getText());
},
chat_test: function (nsConn, msg) {
addMessage(new TextDecoder("utf-8").decode(msg.Body));
}
}
});
Expand Down
4 changes: 2 additions & 2 deletions _examples/protobuf/browser/bundle.js

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions _examples/protobuf/browser/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
"scripts": {
"browserify": "browserify ./app.js -o ./bundle.js",
"minifyES6": "minify ./bundle.js --outFile ./bundle.js",
"build": "npm run-script browserify && npm run-script minifyES6"
"build": "npm run-script browserify && npm run-script minifyES6",
"build-fast": "npm run-script browserify"
},
"dependencies": {
"google-protobuf": "^3.8.0",
"neffos.js": "latest",
"google-protobuf": "^3.11.1",
"neffos.js": "^0.1.24",
"protobufjs": "~6.8.8"
},
"devDependencies": {
Expand Down
15 changes: 11 additions & 4 deletions _examples/protobuf/browser/user_message_pb.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// source: user_message.proto
/**
* @fileoverview
* @enhanceable
Expand Down Expand Up @@ -166,9 +167,12 @@ proto.main.UserMessage.prototype.getUsername = function() {
};


/** @param {string} value */
/**
* @param {string} value
* @return {!proto.main.UserMessage} returns this
*/
proto.main.UserMessage.prototype.setUsername = function(value) {
jspb.Message.setProto3StringField(this, 1, value);
return jspb.Message.setProto3StringField(this, 1, value);
};


Expand All @@ -181,9 +185,12 @@ proto.main.UserMessage.prototype.getText = function() {
};


/** @param {string} value */
/**
* @param {string} value
* @return {!proto.main.UserMessage} returns this
*/
proto.main.UserMessage.prototype.setText = function(value) {
jspb.Message.setProto3StringField(this, 2, value);
return jspb.Message.setProto3StringField(this, 2, value);
};


Expand Down
37 changes: 33 additions & 4 deletions _examples/protobuf/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,20 @@ var serverAndClientEvents = neffos.Namespaces{
},
"chat": func(c *neffos.NSConn, msg neffos.Message) error {
if msg.Err != nil {
log.Printf("remote error: %v\n", msg.Err)
log.Printf("remote error: %v from [%s]\n", msg.Err, c)
return nil
}

if !c.Conn.IsClient() {
// fmt.Printf("[%s] sending message to everyone... [%v:%s]\n", c, msg.SetBinary, string(msg.Body))
// broadcast to all clients except this one, when first parameter is not nil.
c.Conn.Server().Broadcast(c, msg)
} else {
// client received from server's broadcast.
var userMsg UserMessage

if err := proto.Unmarshal(msg.Body, &userMsg); err != nil {
fmt.Printf("[example] error on proto.Unmarshal: %v\nFor msg.Body equals to: %s", err, string(msg.Body))
return err
}
fmt.Printf("[%s] says: %s\n", userMsg.Username, userMsg.Text)
Expand All @@ -84,6 +87,15 @@ var serverAndClientEvents = neffos.Namespaces{
// this error's text.
return nil
},
"chat_test": func(c *neffos.NSConn, msg neffos.Message) error {
if !c.Conn.IsClient() {
c.Conn.Server().Broadcast(c, msg)
return nil
}

fmt.Printf("[%s] says: %s\n", c, string(msg.Body))
return nil
},
},
}

Expand Down Expand Up @@ -123,9 +135,11 @@ func startServer() {
log.Fatal(http.ListenAndServe(addr, nil))
}

const testHelloBinaryWithSeps = false

func startClient() {
// init the websocket connection by dialing the server.
client, err := neffos.Dial(nil, gorilla.DefaultDialer, endpoint, serverAndClientEvents)
client, err := neffos.Dial(nil, gorilla.DefaultDialer, addr+endpoint, serverAndClientEvents)
if err != nil {
log.Fatal(err)
}
Expand All @@ -141,12 +155,21 @@ func startClient() {
log.Fatal(err)
}

if testHelloBinaryWithSeps {
c.Conn.Write(neffos.Message{
Namespace: namespace,
Event: "chat_test",
Body: []byte{';', ';', ';', ';', 'h', 'e', 'l', 'l', 'o', ';'},
SetBinary: true,
})
}

fmt.Fprintf(os.Stdout, "Please specify a username: ")
usernameBytes, _, _ := bufio.NewReader(os.Stdin).ReadLine()
userMsg := &UserMessage{
Username: string(usernameBytes),
// only `Text` field is dynamic, therefore we can reuse this instance value,
// the `Text` field can be filled right before the namespace's `Emit`, check below.
// the `Text` field can be filled right before the `Conn#Write`, check below.
}

fmt.Fprint(os.Stdout, ">> ")
Expand Down Expand Up @@ -175,7 +198,13 @@ func startClient() {
if err != nil {
log.Fatal(err)
}
c.Emit("chat", body)

c.Conn.Write(neffos.Message{
Namespace: namespace,
Event: "chat",
Body: body,
SetBinary: true,
})

fmt.Fprint(os.Stdout, ">> ")
}
Expand Down
2 changes: 1 addition & 1 deletion _examples/protobuf/user_message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ syntax="proto3";
package main;

message UserMessage {
string Username =1;
string Username = 1;
string Text = 2;
}
52 changes: 33 additions & 19 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,22 @@ type (
// Request returns the http request value.
Request() *http.Request
// ReadData reads binary or text messages from the remote connection.
ReadData(timeout time.Duration) (body []byte, err error)
ReadData(timeout time.Duration) (body []byte, typ MessageType, err error)
// WriteBinary sends a binary message to the remote connection.
WriteBinary(body []byte, timeout time.Duration) error
// WriteText sends a text message to the remote connection.
WriteText(body []byte, timeout time.Duration) error
}

// MessageType is a type for readen and to-send data, helpful to set `msg.SetBinary`
// to the rest of the clients through a Broadcast, as SetBinary is not part of the deserialization.
MessageType uint8
)

// See `MessageType` definition for details.
const (
TextMessage = iota + 1
BinaryMessage
)

// Conn contains the websocket connection and the neffos communication functionality.
Expand Down Expand Up @@ -83,7 +93,7 @@ type Conn struct {
allowNativeMessages bool
shouldHandleOnlyNativeMessages bool

queue [][]byte
queue map[MessageType][][]byte
queueMutex sync.Mutex

// used to fire `conn#Close` once.
Expand Down Expand Up @@ -320,7 +330,7 @@ func (c *Conn) startReader() {
// CLIENT is ready when ACK done
// SERVER is ready when ACK is done AND `Server#OnConnected` returns with nil error.
for {
b, err := c.socket.ReadData(c.readTimeout)
b, msgTyp, err := c.socket.ReadData(c.readTimeout)
if err != nil {
c.readiness.unwait(err)
return
Expand All @@ -331,19 +341,19 @@ func (c *Conn) startReader() {
}

if !c.isAcknowledged() {
if !c.handleACK(b) {
if !c.handleACK(msgTyp, b) {
return
}
continue
}

atomic.StoreUint32(c.isInsideHandler, 1)
c.HandlePayload(b)
c.HandlePayload(msgTyp, b)
atomic.StoreUint32(c.isInsideHandler, 0)
}
}

func (c *Conn) handleACK(b []byte) bool {
func (c *Conn) handleACK(msgTyp MessageType, b []byte) bool {
switch typ := b[0]; typ {
case ackBinary:
// from client startup to server.
Expand Down Expand Up @@ -383,7 +393,10 @@ func (c *Conn) handleACK(b []byte) bool {
return false
default:
c.queueMutex.Lock()
c.queue = append(c.queue, b)
if c.queue == nil {
c.queue = make(map[MessageType][][]byte)
}
c.queue[msgTyp] = append(c.queue[msgTyp], b)
c.queueMutex.Unlock()
}

Expand All @@ -395,11 +408,13 @@ func (c *Conn) handleQueue() {
c.queueMutex.Lock()
defer c.queueMutex.Unlock()

for _, b := range c.queue {
c.HandlePayload(b)
}
for msgTyp, q := range c.queue {
for _, b := range q {
c.HandlePayload(msgTyp, b)
}

c.queue = c.queue[0:0]
delete(c.queue, msgTyp)
}
}

// ErrInvalidPayload can be returned by the internal `handleMessage`.
Expand Down Expand Up @@ -475,13 +490,13 @@ func (c *Conn) handleMessage(msg Message) error {
}

// DeserializeMessage returns a Message from the "payload".
func (c *Conn) DeserializeMessage(payload []byte) Message {
return DeserializeMessage(nil, payload, c.allowNativeMessages, c.shouldHandleOnlyNativeMessages)
func (c *Conn) DeserializeMessage(msgTyp MessageType, payload []byte) Message {
return DeserializeMessage(msgTyp, payload, c.allowNativeMessages, c.shouldHandleOnlyNativeMessages)
}

// HandlePayload fires manually a local event based on the "payload".
func (c *Conn) HandlePayload(payload []byte) error {
return c.handleMessage(c.DeserializeMessage(payload))
func (c *Conn) HandlePayload(msgTyp MessageType, payload []byte) error {
return c.handleMessage(c.DeserializeMessage(msgTyp, payload))
}

const syncWaitDur = 15 * time.Millisecond
Expand Down Expand Up @@ -902,8 +917,7 @@ func (c *Conn) Write(msg Message) bool {
}

msg.FromExplicit = ""
b := serializeMessage(nil, msg)
return c.write(b, msg.SetBinary)
return c.write(serializeMessage(msg), msg.SetBinary)
}

// used when `Ask` caller cares only for successful call and not the message, for performance reasons we just use raw bytes.
Expand Down Expand Up @@ -959,13 +973,13 @@ func (c *Conn) ask(ctx context.Context, msg Message, mustWaitOnlyTheNextMessage
// msg.wait is not required on this state
// but we still set it.
go func() {
b, err := c.Socket().ReadData(c.readTimeout)
b, msgTyp, err := c.Socket().ReadData(c.readTimeout)
if err != nil {
ch <- Message{Err: err, isError: true}
return
}

ch <- c.DeserializeMessage(b)
ch <- c.DeserializeMessage(msgTyp, b)
}()
} else {
c.waitingMessagesMutex.Lock()
Expand Down
Loading

0 comments on commit 7c3c769

Please sign in to comment.