Skip to content

Commit

Permalink
chore(release): [email protected]
Browse files Browse the repository at this point in the history
  • Loading branch information
zishang520 committed Sep 22, 2024
2 parents e2c3b95 + 97ddd02 commit bc66e41
Show file tree
Hide file tree
Showing 27 changed files with 1,753 additions and 222 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func main() {
client.Emit("auth", client.Handshake().Auth)

client.On("message-with-ack", func(args ...interface{}) {
ack := args[len(args)-1].(func([]any, error))
ack := args[len(args)-1].(socket.Ack)
ack(args[:len(args)-1], nil)
})
})
Expand Down Expand Up @@ -286,7 +286,7 @@ func main() {
client.Emit("auth", client.Handshake().Auth)

client.On("message-with-ack", func(args ...interface{}) {
ack := args[len(args)-1].(func([]any, error))
ack := args[len(args)-1].(socket.Ack)
ack(args[:len(args)-1], nil)
})
})
Expand Down
37 changes: 37 additions & 0 deletions adapter/adapter-type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package adapter

import (
"github.com/zishang520/socket.io/v2/socket"
)

type (
Adapter = socket.Adapter

SessionAwareAdapter = socket.SessionAwareAdapter

AdapterConstructor = socket.AdapterConstructor

// A cluster-ready adapter. Any extending interface must:
//
// - implement [ClusterAdapter.DoPublish] and [ClusterAdapter.DoPublishResponse]
//
// - call [ClusterAdapter.OnMessage] and [ClusterAdapter.OnResponse]
ClusterAdapter interface {
Adapter

Uid() ServerId
OnMessage(*ClusterMessage, Offset)
OnResponse(*ClusterResponse)
Publish(*ClusterMessage)
PublishAndReturnOffset(*ClusterMessage) (Offset, error)
DoPublish(*ClusterMessage) (Offset, error)
PublishResponse(ServerId, *ClusterResponse)
DoPublishResponse(ServerId, *ClusterResponse) error
}

ClusterAdapterWithHeartbeat interface {
ClusterAdapter

SetOpts(*ClusterAdapterOptions)
}
)
22 changes: 22 additions & 0 deletions adapter/adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package adapter

import (
"github.com/zishang520/socket.io/v2/socket"
)

type (
AdapterBuilder struct {
}
)

func (*AdapterBuilder) New(nsp socket.Namespace) Adapter {
return NewAdapter(nsp)
}

func MakeAdapter() Adapter {
return socket.MakeAdapter()
}

func NewAdapter(nsp socket.Namespace) Adapter {
return socket.NewAdapter(nsp)
}
76 changes: 76 additions & 0 deletions adapter/cluster-adapter-options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package adapter

import (
"time"
)

type (
ClusterAdapterOptionsInterface interface {
SetHeartbeatInterval(time.Duration)
GetRawHeartbeatInterval() *time.Duration
HeartbeatInterval() time.Duration

SetHeartbeatTimeout(int64)
GetRawHeartbeatTimeout() *int64
HeartbeatTimeout() int64
}

ClusterAdapterOptions struct {
// The number of ms between two heartbeats.
//
// Default: 5_000 * time.Millisecond
heartbeatInterval *time.Duration

// The number of ms without heartbeat before we consider a node down.
//
// Default: 10_000
heartbeatTimeout *int64
}
)

func DefaultClusterAdapterOptions() *ClusterAdapterOptions {
return &ClusterAdapterOptions{}
}

func (s *ClusterAdapterOptions) Assign(data ClusterAdapterOptionsInterface) (ClusterAdapterOptionsInterface, error) {
if data == nil {
return s, nil
}
if s.GetRawHeartbeatInterval() == nil {
s.SetHeartbeatInterval(data.HeartbeatInterval())
}

if s.GetRawHeartbeatTimeout() == nil {
s.SetHeartbeatTimeout(data.HeartbeatTimeout())
}

return s, nil
}

func (s *ClusterAdapterOptions) SetHeartbeatInterval(heartbeatInterval time.Duration) {
s.heartbeatInterval = &heartbeatInterval
}
func (s *ClusterAdapterOptions) GetRawHeartbeatInterval() *time.Duration {
return s.heartbeatInterval
}
func (s *ClusterAdapterOptions) HeartbeatInterval() time.Duration {
if s.heartbeatInterval == nil {
return time.Duration(5_000 * time.Millisecond)
}

return *s.heartbeatInterval
}

func (s *ClusterAdapterOptions) SetHeartbeatTimeout(heartbeatTimeout int64) {
s.heartbeatTimeout = &heartbeatTimeout
}
func (s *ClusterAdapterOptions) GetRawHeartbeatTimeout() *int64 {
return s.heartbeatTimeout
}
func (s *ClusterAdapterOptions) HeartbeatTimeout() int64 {
if s.heartbeatTimeout == nil {
return 10_000
}

return *s.heartbeatTimeout
}
132 changes: 132 additions & 0 deletions adapter/cluster-adapter-type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package adapter

import (
"sync/atomic"
"time"

"github.com/zishang520/engine.io/v2/types"
"github.com/zishang520/engine.io/v2/utils"
"github.com/zishang520/socket.io-go-parser/v2/parser"
"github.com/zishang520/socket.io/v2/socket"
)

type (
// The unique ID of a server
ServerId string

// The unique ID of a message (for the connection state recovery feature)
Offset string

MessageType int

// Common fields for all messages
ClusterMessage struct {
Uid ServerId `json:"uid,omitempty" msgpack:"uid,omitempty"`
Nsp string `json:"nsp,omitempty" msgpack:"nsp,omitempty"`
Type MessageType `json:"type,omitempty" msgpack:"type,omitempty"`
Data any `json:"data,omitempty" msgpack:"data,omitempty"` // Data will hold the specific message data for different types
}

// PacketOptions represents the options for broadcasting messages.
PacketOptions struct {
Rooms []socket.Room `json:"rooms,omitempty" msgpack:"rooms,omitempty"`
Except []socket.Room `json:"except,omitempty" msgpack:"except,omitempty"`
Flags *socket.BroadcastFlags `json:"flags,omitempty" msgpack:"flags,omitempty"`
}

// Message for BROADCAST
BroadcastMessage struct {
Opts *PacketOptions `json:"opts,omitempty" msgpack:"opts,omitempty"`
Packet *parser.Packet `json:"packet,omitempty" msgpack:"packet,omitempty"`
RequestId *string `json:"requestId,omitempty" msgpack:"requestId,omitempty"`
}

// Message for SOCKETS_JOIN, SOCKETS_LEAVE
SocketsJoinLeaveMessage struct {
Opts *PacketOptions `json:"opts,omitempty" msgpack:"opts,omitempty"`
Rooms []socket.Room `json:"rooms,omitempty" msgpack:"rooms,omitempty"`
}

// Message for DISCONNECT_SOCKETS
DisconnectSocketsMessage struct {
Opts *PacketOptions `json:"opts,omitempty" msgpack:"opts,omitempty"`
Close bool `json:"close,omitempty" msgpack:"close,omitempty"`
}

// Message for FETCH_SOCKETS
FetchSocketsMessage struct {
Opts *PacketOptions `json:"opts,omitempty" msgpack:"opts,omitempty"`
RequestId string `json:"requestId,omitempty" msgpack:"requestId,omitempty"`
}

// Message for SERVER_SIDE_EMIT
ServerSideEmitMessage struct {
RequestId *string `json:"requestId,omitempty" msgpack:"requestId,omitempty"`
Packet []any `json:"packet,omitempty" msgpack:"packet,omitempty"`
}

// ClusterRequest equivalent
ClusterRequest struct {
Type MessageType
Resolve func(*types.Slice[any])
Timeout *atomic.Pointer[utils.Timer]
Expected int64
Current *atomic.Int64
Responses *types.Slice[any]
}

ClusterResponse = ClusterMessage

SocketResponse struct {
Id socket.SocketId `json:"id,omitempty" msgpack:"id,omitempty"`
Handshake *socket.Handshake `json:"handshake,omitempty" msgpack:"handshake,omitempty"`
Rooms []socket.Room `json:"rooms,omitempty" msgpack:"rooms,omitempty"`
Data any `json:"data,omitempty" msgpack:"data,omitempty"`
}

FetchSocketsResponse struct {
RequestId string `json:"requestId,omitempty" msgpack:"requestId,omitempty"`
Sockets []*SocketResponse `json:"sockets,omitempty" msgpack:"sockets,omitempty"`
}

ServerSideEmitResponse struct {
RequestId string `json:"requestId,omitempty" msgpack:"requestId,omitempty"`
Packet []any `json:"packet,omitempty" msgpack:"packet,omitempty"`
}

BroadcastClientCount struct {
RequestId string `json:"requestId,omitempty" msgpack:"requestId,omitempty"`
ClientCount uint64 `json:"clientCount,omitempty" msgpack:"clientCount,omitempty"`
}

BroadcastAck struct {
RequestId string `json:"requestId,omitempty" msgpack:"requestId,omitempty"`
Packet []any `json:"packet,omitempty" msgpack:"packet,omitempty"`
}

ClusterAckRequest struct {
ClientCountCallback func(uint64)
Ack socket.Ack
}
)

const (
EMITTER_UID ServerId = "emitter"
DEFAULT_TIMEOUT time.Duration = 5_000 * time.Millisecond
)

const (
INITIAL_HEARTBEAT MessageType = iota + 1
HEARTBEAT
BROADCAST
SOCKETS_JOIN
SOCKETS_LEAVE
DISCONNECT_SOCKETS
FETCH_SOCKETS
FETCH_SOCKETS_RESPONSE
SERVER_SIDE_EMIT
SERVER_SIDE_EMIT_RESPONSE
BROADCAST_CLIENT_COUNT
BROADCAST_ACK
ADAPTER_CLOSE
)
18 changes: 18 additions & 0 deletions adapter/cluster-adapter-with-heartbeat-type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package adapter

import (
"sync/atomic"

"github.com/zishang520/engine.io/v2/types"
"github.com/zishang520/engine.io/v2/utils"
)

type (
CustomClusterRequest struct {
Type MessageType
Resolve func(*types.Slice[any])
Timeout *atomic.Pointer[utils.Timer]
MissingUids *types.Set[ServerId]
Responses *types.Slice[any]
}
)
Loading

0 comments on commit bc66e41

Please sign in to comment.