feat: implement real-time updates and enhance monitoring system
- Add structured logging with slog throughout application - Implement real-time updates using Server-Sent Events and HTMX - Add broadcaster system for instant UI updates when agents report stats - Replace meta refresh with HTMX-powered seamless updates - Add new API endpoints for HTMX fragments and SSE events - Update templates to use HTMX for instant data refresh - Enhance README with real-time features and updated documentation - Remove obsolete template generation file
This commit is contained in:
@@ -4,12 +4,57 @@ import (
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"nerd-monitor/internal/stats"
|
||||
"nerd-monitor/internal/store"
|
||||
)
|
||||
|
||||
// Broadcaster manages Server-Sent Events clients.
|
||||
type Broadcaster struct {
|
||||
clients map[chan string]bool
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
var apiBroadcaster = &Broadcaster{
|
||||
clients: make(map[chan string]bool),
|
||||
}
|
||||
|
||||
// Broadcast sends a message to all connected SSE clients.
|
||||
func (b *Broadcaster) Broadcast(message string) {
|
||||
b.mu.RLock()
|
||||
defer b.mu.RUnlock()
|
||||
|
||||
for clientChan := range b.clients {
|
||||
select {
|
||||
case clientChan <- message:
|
||||
default:
|
||||
// Client channel is full, skip
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// AddClient adds a new SSE client.
|
||||
func (b *Broadcaster) AddClient(clientChan chan string) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
b.clients[clientChan] = true
|
||||
}
|
||||
|
||||
// RemoveClient removes an SSE client.
|
||||
func (b *Broadcaster) RemoveClient(clientChan chan string) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
delete(b.clients, clientChan)
|
||||
close(clientChan)
|
||||
}
|
||||
|
||||
// GetAPIBroadcaster returns the API broadcaster instance.
|
||||
func GetAPIBroadcaster() *Broadcaster {
|
||||
return apiBroadcaster
|
||||
}
|
||||
|
||||
// Handler manages HTTP requests.
|
||||
type Handler struct {
|
||||
store *store.Store
|
||||
@@ -40,6 +85,12 @@ func (h *Handler) ReportStats(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
slog.Debug("Updating agent stats", "agentID", agentID, "hostname", stat.Hostname, "cpu", stat.CPUUsage)
|
||||
h.store.UpdateAgent(agentID, &stat)
|
||||
|
||||
// Broadcast update to all connected SSE clients
|
||||
message := "event: stats-update\ndata: {\"type\": \"stats-update\", \"agentId\": \"" + agentID + "\"}\n\n"
|
||||
slog.Info("Broadcasting stats update", "agentID", agentID)
|
||||
apiBroadcaster.Broadcast(message)
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(map[string]string{"status": "ok"})
|
||||
slog.Debug("Stats report processed successfully", "agentID", agentID)
|
||||
|
||||
@@ -2,11 +2,13 @@ package ui
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"nerd-monitor/internal/api"
|
||||
"nerd-monitor/internal/auth"
|
||||
"nerd-monitor/internal/store"
|
||||
"nerd-monitor/views"
|
||||
@@ -14,15 +16,20 @@ import (
|
||||
|
||||
// Handler serves UI pages.
|
||||
type Handler struct {
|
||||
store *store.Store
|
||||
auth *auth.Manager
|
||||
store *store.Store
|
||||
auth *auth.Manager
|
||||
broadcaster interface {
|
||||
AddClient(chan string)
|
||||
RemoveClient(chan string)
|
||||
}
|
||||
}
|
||||
|
||||
// New creates a new UI handler.
|
||||
func New(s *store.Store, a *auth.Manager) *Handler {
|
||||
return &Handler{
|
||||
store: s,
|
||||
auth: a,
|
||||
store: s,
|
||||
auth: a,
|
||||
broadcaster: api.GetAPIBroadcaster(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -212,3 +219,177 @@ func (h *Handler) getStaleAgents() []*store.AgentStats {
|
||||
|
||||
return stale
|
||||
}
|
||||
|
||||
// GetDashboardTable returns HTML fragment for the agent table.
|
||||
func (h *Handler) GetDashboardTable(w http.ResponseWriter, r *http.Request) {
|
||||
slog.Debug("HTMX dashboard table request", "method", r.Method, "path", r.URL.Path, "remoteAddr", r.RemoteAddr)
|
||||
|
||||
agents := h.store.GetAllAgents()
|
||||
|
||||
w.Header().Set("Content-Type", "text/html")
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
|
||||
if len(agents) == 0 {
|
||||
w.Write([]byte(`<div class="alert alert-info">
|
||||
<strong>ℹ️ No agents connected</strong>
|
||||
<p style="margin-top: 0.5rem; font-size: 0.875rem;">
|
||||
Start an agent to see its statistics appear here.
|
||||
</p>
|
||||
</div>`))
|
||||
return
|
||||
}
|
||||
|
||||
w.Write([]byte(`<div class="card">
|
||||
<div class="card-title">Active Agents</div>
|
||||
<table>
|
||||
<thead>
|
||||
<tr>
|
||||
<th>Hostname</th>
|
||||
<th>CPU Usage</th>
|
||||
<th>Memory</th>
|
||||
<th>Disk</th>
|
||||
<th>Last Seen</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>`))
|
||||
|
||||
for _, agent := range agents {
|
||||
// Determine online status (agent is online if reported within last 15 seconds)
|
||||
isOnline := time.Since(agent.LastSeen) < 15*time.Second
|
||||
statusClass := "status-red"
|
||||
statusText := "Offline"
|
||||
if isOnline {
|
||||
statusClass = "status-green"
|
||||
statusText = "Online"
|
||||
}
|
||||
|
||||
row := fmt.Sprintf(`<tr>
|
||||
<td>
|
||||
<div style="display: flex; align-items: center; gap: 0.5rem;">
|
||||
<span class="status-badge %s" style="margin: 0;">%s</span>
|
||||
<a href="/agents/%s" style="color: #3b82f6; text-decoration: none;">%s</a>
|
||||
</div>
|
||||
</td>
|
||||
<td>%.1f%%</td>
|
||||
<td>%s / %s</td>
|
||||
<td>%s / %s</td>
|
||||
<td class="timestamp">%s</td>
|
||||
</tr>`,
|
||||
statusClass,
|
||||
statusText,
|
||||
agent.ID,
|
||||
agent.Hostname,
|
||||
agent.CPUUsage,
|
||||
formatBytes(agent.RAMUsage),
|
||||
formatBytes(agent.RAMTotal),
|
||||
formatBytes(agent.DiskUsage),
|
||||
formatBytes(agent.DiskTotal),
|
||||
agent.LastSeen.Format("2006-01-02 15:04:05"))
|
||||
w.Write([]byte(row))
|
||||
}
|
||||
|
||||
w.Write([]byte(`</tbody></table></div>`))
|
||||
}
|
||||
|
||||
// GetAgentStats returns HTML fragment for agent statistics.
|
||||
func (h *Handler) GetAgentStats(w http.ResponseWriter, r *http.Request) {
|
||||
slog.Debug("HTMX agent stats request", "method", r.Method, "path", r.URL.Path, "remoteAddr", r.RemoteAddr)
|
||||
|
||||
agentID := chi.URLParam(r, "id")
|
||||
if agentID == "" {
|
||||
slog.Warn("Missing agent ID in stats request", "remoteAddr", r.RemoteAddr)
|
||||
http.Error(w, "missing agent id", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
agent := h.store.GetAgent(agentID)
|
||||
if agent == nil {
|
||||
slog.Warn("Agent not found for stats request", "agentID", agentID)
|
||||
http.Error(w, "agent not found", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "text/html")
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
|
||||
// Return just the stats content that should be updated
|
||||
statsHTML := fmt.Sprintf(`<div style="font-size: 2rem; margin: 1rem 0;">%.1f%%</div>
|
||||
<div class="progress-bar">
|
||||
<div class="progress-fill" style="width: %.1f%%"></div>
|
||||
</div>
|
||||
<div style="margin-top: 1rem; font-size: 0.875rem; color: #94a3b8;">
|
||||
Last updated: %s
|
||||
</div>`,
|
||||
agent.CPUUsage,
|
||||
agent.CPUUsage,
|
||||
agent.LastSeen.Format("2006-01-02 15:04:05"))
|
||||
|
||||
w.Write([]byte(statsHTML))
|
||||
}
|
||||
|
||||
// Events provides Server-Sent Events for real-time updates.
|
||||
func (h *Handler) Events(w http.ResponseWriter, r *http.Request) {
|
||||
slog.Info("SSE connection established", "remoteAddr", r.RemoteAddr)
|
||||
|
||||
// Set headers for Server-Sent Events
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
w.Header().Set("Connection", "keep-alive")
|
||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
|
||||
// Create a channel for this client
|
||||
clientChan := make(chan string, 10)
|
||||
h.broadcaster.AddClient(clientChan)
|
||||
|
||||
// Clean up when client disconnects
|
||||
go func() {
|
||||
<-r.Context().Done()
|
||||
slog.Debug("SSE client disconnected", "remoteAddr", r.RemoteAddr)
|
||||
h.broadcaster.RemoveClient(clientChan)
|
||||
}()
|
||||
|
||||
// Send initial connection event
|
||||
fmt.Fprintf(w, "event: connected\ndata: {}\n\n")
|
||||
w.(http.Flusher).Flush()
|
||||
|
||||
// Send a test event after 2 seconds
|
||||
go func() {
|
||||
time.Sleep(2 * time.Second)
|
||||
select {
|
||||
case clientChan <- "event: test\ndata: {\"message\": \"SSE working\"}\n\n":
|
||||
default:
|
||||
}
|
||||
}()
|
||||
|
||||
// Listen for broadcast messages
|
||||
for {
|
||||
select {
|
||||
case <-r.Context().Done():
|
||||
return
|
||||
case message := <-clientChan:
|
||||
slog.Debug("Sending SSE message to client", "remoteAddr", r.RemoteAddr, "message", message)
|
||||
fmt.Fprintf(w, "%s\n", message)
|
||||
w.(http.Flusher).Flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// formatBytes converts bytes to human-readable format.
|
||||
func formatBytes(bytes uint64) string {
|
||||
const (
|
||||
kb = 1024
|
||||
mb = kb * 1024
|
||||
gb = mb * 1024
|
||||
)
|
||||
|
||||
switch {
|
||||
case bytes >= gb:
|
||||
return fmt.Sprintf("%.1f GB", float64(bytes)/float64(gb))
|
||||
case bytes >= mb:
|
||||
return fmt.Sprintf("%.1f MB", float64(bytes)/float64(mb))
|
||||
case bytes >= kb:
|
||||
return fmt.Sprintf("%.1f KB", float64(bytes)/float64(kb))
|
||||
default:
|
||||
return fmt.Sprintf("%d B", bytes)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user