golib/cmd/ssechat/sse.go

93 lines
1.5 KiB
Go

package main
import (
"fmt"
"sync"
"time"
)
type AddrStr string
type ServerSentEvent struct {
Type string
Event string
ID string
Data []byte
}
type SSEMember struct {
C chan ServerSentEvent
ticker *time.Ticker
}
type SSEChannel struct {
clients map[AddrStr]SSEMember
clientsMu sync.Mutex
}
func NewSSEChannel() *SSEChannel {
return &SSEChannel{
clients: make(map[AddrStr]SSEMember),
}
}
func (c *SSEChannel) Subscribe(addr AddrStr) {
ch := make(chan ServerSentEvent, 16)
m := SSEMember{
ch,
time.NewTicker(15 * time.Second),
}
c.clientsMu.Lock()
c.clients[addr] = m
c.clientsMu.Unlock()
go func() {
for range m.ticker.C {
m.C <- ServerSentEvent{
"",
"",
"",
[]byte("heartbeat"),
}
}
}()
}
func (c *SSEChannel) Member(addr AddrStr) SSEMember {
c.clientsMu.Lock()
defer c.clientsMu.Unlock()
m := c.clients[addr]
return m
}
func (c *SSEChannel) Broadcast(payload []byte) {
c.clientsMu.Lock()
defer c.clientsMu.Unlock()
for _, m := range c.clients {
// drop-on-full (best effort delivery)
select {
case m.C <- ServerSentEvent{
Type: "data",
Event: "message",
ID: fmt.Sprintf("%d", time.Now().UnixMilli()), // or use atomic counter
Data: payload,
}:
// client was able to receive
default:
// client is backed up
}
}
}
func (c *SSEChannel) Unsubscribe(addr AddrStr) {
c.clientsMu.Lock()
defer c.clientsMu.Unlock()
m := c.clients[addr]
delete(c.clients, addr)
// ticker must be stopped BEFORE closing channel
m.ticker.Stop()
close(m.C)
}