From c1ba5b4744d4c27735018b990201d42b9c44bf9f Mon Sep 17 00:00:00 2001 From: AJ ONeal Date: Fri, 6 Feb 2026 00:32:20 -0700 Subject: [PATCH] feat: add cmd/ssechat as ServerSentEvents demo --- cmd/ssechat/go.mod | 3 + cmd/ssechat/index.html | 154 +++++++++++++++++++++++++++++++++++++++++ cmd/ssechat/main.go | 118 +++++++++++++++++++++++++++++++ cmd/ssechat/sse.go | 92 ++++++++++++++++++++++++ 4 files changed, 367 insertions(+) create mode 100644 cmd/ssechat/go.mod create mode 100644 cmd/ssechat/index.html create mode 100644 cmd/ssechat/main.go create mode 100644 cmd/ssechat/sse.go diff --git a/cmd/ssechat/go.mod b/cmd/ssechat/go.mod new file mode 100644 index 0000000..06de64a --- /dev/null +++ b/cmd/ssechat/go.mod @@ -0,0 +1,3 @@ +module github.com/therootcompany/golib/cmd/ssechat + +go 1.25.4 diff --git a/cmd/ssechat/index.html b/cmd/ssechat/index.html new file mode 100644 index 0000000..67f6f4b --- /dev/null +++ b/cmd/ssechat/index.html @@ -0,0 +1,154 @@ + + + + + + SSE Chat Room + + + +
+

Simple SSE Chat Room

+

Everyone sees the same messages in real time

+
+ +
+ +
+ + + +
+ + + + diff --git a/cmd/ssechat/main.go b/cmd/ssechat/main.go new file mode 100644 index 0000000..962a530 --- /dev/null +++ b/cmd/ssechat/main.go @@ -0,0 +1,118 @@ +package main + +import ( + _ "embed" + "encoding/json" + "fmt" + "log" + "net/http" + "time" +) + +//go:embed index.html +var indexHTML []byte + +type Message struct { + Time string `json:"time"` + Nick string `json:"nick"` + Text string `json:"text"` +} + +var sse = NewSSEChannel() + +func main() { + mux := http.NewServeMux() + // Serve static HTML + //mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir("static")))) + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/" { + http.NotFound(w, r) + return + } + + w.Header().Set("Content-Type", "text/html") + w.Write(indexHTML) + }) + + // SSE endpoint + mux.HandleFunc("GET /api/events", handleSSE) + + // POST endpoint to send messages + mux.HandleFunc("POST /api/send", handleSend) + + log.Println("Server running on :8080") + log.Fatal(http.ListenAndServe(":8080", mux)) +} + +func handleSSE(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") // helps nginx / some proxies + + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "Streaming unsupported", http.StatusInternalServerError) + return + } + + sse.Subscribe(AddrStr(r.RemoteAddr)) + defer sse.Unsubscribe(AddrStr(r.RemoteAddr)) + + fmt.Fprintf(w, "event: system\ndata: {\"time\":\"%s\",\"text\":\"You joined the room\"}\n\n", time.Now().Format("15:04")) + flusher.Flush() + + // Forward messages to this client + for { + select { + case <-r.Context().Done(): + return + case msg, ok := <-sse.Member(AddrStr(r.RemoteAddr)).C: + if !ok { + return + } + + // SSE format: one data: line with the full JSON + if msg.Event != "" { + fmt.Fprintf(w, "event: %s\n", msg.Event) + } + fmt.Fprintf(w, "%s: %s\n\n", msg.Type, msg.Data) + flusher.Flush() + } + } +} + +func handleSend(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + if err := r.ParseForm(); err != nil { + http.Error(w, "Bad form", http.StatusBadRequest) + return + } + + nick := r.FormValue("nick") + text := r.FormValue("text") + if text == "" { + http.Error(w, "Message required", http.StatusBadRequest) + return + } + + if nick == "" { + nick = "Anonymous" + } + + // In the broadcast loop (handleSend) + msg := Message{ + Time: time.Now().Format("15:04"), + Nick: nick, + Text: text, + } + payload, _ := json.Marshal(msg) + + sse.Broadcast(payload) + + w.WriteHeader(http.StatusAccepted) +} diff --git a/cmd/ssechat/sse.go b/cmd/ssechat/sse.go new file mode 100644 index 0000000..5e1649a --- /dev/null +++ b/cmd/ssechat/sse.go @@ -0,0 +1,92 @@ +package main + +import ( + "fmt" + "sync" + "time" +) + +type AddrStr string + +type ServerSentEvent struct { + Type string + Event string + ID string + Data []byte +} + +type SSEMember struct { + C chan ServerSentEvent + ticker *time.Ticker +} + +type SSEChannel struct { + clients map[AddrStr]SSEMember + clientsMu sync.Mutex +} + +func NewSSEChannel() *SSEChannel { + return &SSEChannel{ + clients: make(map[AddrStr]SSEMember), + } +} + +func (c *SSEChannel) Subscribe(addr AddrStr) { + ch := make(chan ServerSentEvent, 16) + m := SSEMember{ + ch, + time.NewTicker(15 * time.Second), + } + + c.clientsMu.Lock() + c.clients[addr] = m + c.clientsMu.Unlock() + + go func() { + for range m.ticker.C { + m.C <- ServerSentEvent{ + "", + "", + "", + []byte("heartbeat"), + } + } + }() +} + +func (c *SSEChannel) Member(addr AddrStr) SSEMember { + c.clientsMu.Lock() + defer c.clientsMu.Unlock() + m := c.clients[addr] + return m +} + +func (c *SSEChannel) Broadcast(payload []byte) { + c.clientsMu.Lock() + defer c.clientsMu.Unlock() + for _, m := range c.clients { + // drop-on-full (best effort delivery) + select { + case m.C <- ServerSentEvent{ + Type: "data", + Event: "message", + ID: fmt.Sprintf("%d", time.Now().UnixMilli()), // or use atomic counter + Data: payload, + }: + // client was able to receive + default: + // client is backed up + } + } +} + +func (c *SSEChannel) Unsubscribe(addr AddrStr) { + c.clientsMu.Lock() + defer c.clientsMu.Unlock() + m := c.clients[addr] + delete(c.clients, addr) + + // ticker must be stopped BEFORE closing channel + m.ticker.Stop() + close(m.C) +}