diff --git a/server.go b/server.go index 317db96..4468381 100644 --- a/server.go +++ b/server.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/sirupsen/logrus" "github.com/zyxar/socketio/engine" ) @@ -15,12 +16,14 @@ type Server struct { sockLock sync.RWMutex onError func(err error) nsps map[string]*namespace + rooms map[string]map[string]*socket } // NewServer creates a socket.io server instance upon underlying engine.io transport func NewServer(interval, timeout time.Duration, parser Parser) (server *Server, err error) { e, err := engine.NewServer(interval, timeout, func(ß *engine.Socket) { socket := newSocket(ß, parser) + socket.server = server socket.attachnsp("/") nsp := server.creatensp("/") if err := socket.emitPacket(&Packet{ @@ -108,6 +111,7 @@ func (s *Server) OnError(fn func(err error)) { s.onError = fn } // process is the Packet process handle on server side func (s *Server) process(sock *socket, p *Packet) { + sock.namespace = p.Namespace nsp, ok := s.getnsp(p.Namespace) if !ok { if p.Type > PacketTypeDisconnect { @@ -189,6 +193,22 @@ func (s *Server) process(sock *socket, p *Packet) { } } +func (s *Server) BroadcastToRoom(room string, event string, args ...interface{}) { + for sid, so := range s.rooms[room] { + if sid == "" || so == nil { + continue + } + + if err := so.Emit(event, args...); err != nil { + logrus.Error("[BroadcastToRoom] sid="+sid+", ", err, " ,args=", args) + if err == ErrorNamespaceUnavaialble { + so.LeaveAll() + so.Close() + } + } + } +} + var ( WebsocketTransport = engine.WebsocketTransport ) diff --git a/socket.go b/socket.go index 2c0743c..bc605c8 100644 --- a/socket.go +++ b/socket.go @@ -25,6 +25,11 @@ type Socket interface { GetHeader(key string) string Sid() string io.Closer + + Join(room string) + Leave(room string) + LeaveAll() + BroadcastToRoom(room string, event string, args ...interface{}) } type nspSock struct { @@ -46,11 +51,48 @@ func (n *nspSock) EmitError(arg interface{}) (err error) { } type socket struct { - ß *engine.Socket - encoder Encoder - decoder Decoder - acks map[string]*ackHandle - mutex sync.RWMutex + ß *engine.Socket + encoder Encoder + decoder Decoder + acks map[string]*ackHandle + mutex sync.RWMutex + server *Server + namespace string +} + +func (s *socket) Join(room string) { + server := s.server + server.sockLock.Lock() + if server.rooms == nil { + server.rooms = map[string]map[string]*socket{} + } + if server.rooms[room] == nil { + server.rooms[room] = map[string]*socket{} + } + server.rooms[room][s.Sid()] = s + server.sockLock.Unlock() +} + +func (s *socket) Leave(room string) { + server := s.server + server.sockLock.Lock() + if server.rooms[room] != nil { + delete(server.rooms[room], s.Sid()) + } + server.sockLock.Unlock() +} + +func (s *socket) LeaveAll() { + server := s.server + server.sockLock.Lock() + for _, room := range server.rooms { + delete(room, s.Sid()) + } + server.sockLock.Unlock() +} + +func (s *socket) BroadcastToRoom(room string, event string, args ...interface{}) { + s.server.BroadcastToRoom(room, event, args...) } func newSocket(ß *engine.Socket, parser Parser) *socket { @@ -106,14 +148,14 @@ func (s *socket) fireAck(nsp string, id uint64, data []byte, buffer [][]byte, au // Emit implements Socket.Emit func (s *socket) Emit(event string, args ...interface{}) (err error) { - return s.emit("/", event, args...) + return s.emit(s.namespace, event, args...) } // EmitError implements Socket.EmitError func (s *socket) EmitError(arg interface{}) (err error) { return s.emitError("/", arg) } // Namespace implements Socket.Namespace -func (*socket) Namespace() string { return "/" } +func (s *socket) Namespace() string { return s.namespace } func (s *socket) emit(nsp string, event string, args ...interface{}) (err error) { s.mutex.RLock()