Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"time"

"github.com/sirupsen/logrus"
"github.com/zyxar/socketio/engine"
)

Expand All @@ -15,12 +16,14 @@ type Server struct {
sockLock sync.RWMutex
onError func(err error)
nsps map[string]*namespace
rooms map[string]map[string]*socket
}

// NewServer creates a socket.io server instance upon underlying engine.io transport
func NewServer(interval, timeout time.Duration, parser Parser) (server *Server, err error) {
e, err := engine.NewServer(interval, timeout, func(ß *engine.Socket) {
socket := newSocket(ß, parser)
socket.server = server
socket.attachnsp("/")
nsp := server.creatensp("/")
if err := socket.emitPacket(&Packet{
Expand Down Expand Up @@ -108,6 +111,7 @@ func (s *Server) OnError(fn func(err error)) { s.onError = fn }

// process is the Packet process handle on server side
func (s *Server) process(sock *socket, p *Packet) {
sock.namespace = p.Namespace
nsp, ok := s.getnsp(p.Namespace)
if !ok {
if p.Type > PacketTypeDisconnect {
Expand Down Expand Up @@ -189,6 +193,22 @@ func (s *Server) process(sock *socket, p *Packet) {
}
}

func (s *Server) BroadcastToRoom(room string, event string, args ...interface{}) {
for sid, so := range s.rooms[room] {
if sid == "" || so == nil {
continue
}

if err := so.Emit(event, args...); err != nil {
logrus.Error("[BroadcastToRoom] sid="+sid+", ", err, " ,args=", args)
if err == ErrorNamespaceUnavaialble {
so.LeaveAll()
so.Close()
}
}
}
}

var (
WebsocketTransport = engine.WebsocketTransport
)
56 changes: 49 additions & 7 deletions socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ type Socket interface {
GetHeader(key string) string
Sid() string
io.Closer

Join(room string)
Leave(room string)
LeaveAll()
BroadcastToRoom(room string, event string, args ...interface{})
}

type nspSock struct {
Expand All @@ -46,11 +51,48 @@ func (n *nspSock) EmitError(arg interface{}) (err error) {
}

type socket struct {
ß *engine.Socket
encoder Encoder
decoder Decoder
acks map[string]*ackHandle
mutex sync.RWMutex
ß *engine.Socket
encoder Encoder
decoder Decoder
acks map[string]*ackHandle
mutex sync.RWMutex
server *Server
namespace string
}

func (s *socket) Join(room string) {
server := s.server
server.sockLock.Lock()
if server.rooms == nil {
server.rooms = map[string]map[string]*socket{}
}
if server.rooms[room] == nil {
server.rooms[room] = map[string]*socket{}
}
server.rooms[room][s.Sid()] = s
server.sockLock.Unlock()
}

func (s *socket) Leave(room string) {
server := s.server
server.sockLock.Lock()
if server.rooms[room] != nil {
delete(server.rooms[room], s.Sid())
}
server.sockLock.Unlock()
}

func (s *socket) LeaveAll() {
server := s.server
server.sockLock.Lock()
for _, room := range server.rooms {
delete(room, s.Sid())
}
server.sockLock.Unlock()
}

func (s *socket) BroadcastToRoom(room string, event string, args ...interface{}) {
s.server.BroadcastToRoom(room, event, args...)
}

func newSocket(ß *engine.Socket, parser Parser) *socket {
Expand Down Expand Up @@ -106,14 +148,14 @@ func (s *socket) fireAck(nsp string, id uint64, data []byte, buffer [][]byte, au

// Emit implements Socket.Emit
func (s *socket) Emit(event string, args ...interface{}) (err error) {
return s.emit("/", event, args...)
return s.emit(s.namespace, event, args...)
}

// EmitError implements Socket.EmitError
func (s *socket) EmitError(arg interface{}) (err error) { return s.emitError("/", arg) }

// Namespace implements Socket.Namespace
func (*socket) Namespace() string { return "/" }
func (s *socket) Namespace() string { return s.namespace }

func (s *socket) emit(nsp string, event string, args ...interface{}) (err error) {
s.mutex.RLock()
Expand Down