diff --git a/server/unified-management/internal/sources/sources.go b/server/unified-management/internal/sources/sources.go index e104025..6aed5ea 100644 --- a/server/unified-management/internal/sources/sources.go +++ b/server/unified-management/internal/sources/sources.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "net/http" "net/url" "os" @@ -22,6 +23,25 @@ type Service struct { client *http.Client stop chan struct{} once sync.Once + mu sync.RWMutex + jobs map[string]CheckJob + events chan Event +} + +type Event struct { + Type string `json:"type"` + Data map[string]any `json:"data"` +} + +type CheckJob struct { + ID string `json:"id"` + Status string `json:"status"` + StartedAt string `json:"startedAt"` + FinishedAt string `json:"finishedAt"` + Total int `json:"total"` + Checked int `json:"checked"` + Stats map[string]int `json:"stats"` + LastError string `json:"lastError"` } type legacyMedia struct { @@ -52,6 +72,8 @@ func NewService(cfg *config.Config, store *db.Store) *Service { store: store, client: &http.Client{Timeout: 10 * time.Second}, stop: make(chan struct{}), + jobs: map[string]CheckJob{}, + events: make(chan Event, 32), } } @@ -237,21 +259,115 @@ func (s *Service) CheckDue(ctx context.Context) { } } +func (s *Service) QueueCheckAll() CheckJob { + items, err := s.store.ListSources(true) + if err != nil { + job := CheckJob{ID: newJobID(), Status: "failed", StartedAt: time.Now().UTC().Format(time.RFC3339), FinishedAt: time.Now().UTC().Format(time.RFC3339), Stats: map[string]int{}, LastError: err.Error()} + s.saveJob(job) + return job + } + filtered := make([]db.Source, 0, len(items)) + for _, item := range items { + if item.Enabled { + filtered = append(filtered, item) + } + } + job := CheckJob{ID: newJobID(), Status: "running", StartedAt: time.Now().UTC().Format(time.RFC3339), Total: len(filtered), Stats: map[string]int{"ok": 0, "redirected": 0, "degraded": 0, "error": 0}} + s.saveJob(job) + go s.runCheckJob(context.Background(), job.ID, filtered) + return job +} + +func (s *Service) CheckJobs() []CheckJob { + s.mu.RLock() + defer s.mu.RUnlock() + items := make([]CheckJob, 0, len(s.jobs)) + for _, item := range s.jobs { + items = append(items, item) + } + sortJobs(items) + return items +} + +func (s *Service) CheckJob(id string) (CheckJob, bool) { + s.mu.RLock() + defer s.mu.RUnlock() + item, ok := s.jobs[id] + return item, ok +} + +func (s *Service) Events() <-chan Event { + return s.events +} + +func (s *Service) runCheckJob(ctx context.Context, id string, items []db.Source) { + if len(items) == 0 { + s.updateJob(id, func(job *CheckJob) { + job.Status = "completed" + job.FinishedAt = time.Now().UTC().Format(time.RFC3339) + }) + s.emit("source_check.completed", map[string]any{"jobId": id}) + return + } + const concurrency = 4 + work := make(chan db.Source) + var wg sync.WaitGroup + for i := 0; i < concurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for item := range work { + status, err := s.CheckOneStatus(ctx, item) + if err != nil && status == "" { + status = "error" + } + s.updateJob(id, func(job *CheckJob) { + job.Checked++ + if job.Stats == nil { + job.Stats = map[string]int{} + } + job.Stats[status]++ + if err != nil { + job.LastError = err.Error() + } + }) + s.emit("source_check.progress", map[string]any{"jobId": id, "sourceId": item.SourceID, "status": status}) + } + }() + } + for _, item := range items { + work <- item + } + close(work) + wg.Wait() + s.updateJob(id, func(job *CheckJob) { + job.Status = "completed" + job.FinishedAt = time.Now().UTC().Format(time.RFC3339) + }) + s.emit("source_check.completed", map[string]any{"jobId": id}) +} + func (s *Service) CheckSourceID(ctx context.Context, sourceID string) (db.Source, error) { item, err := s.store.GetSourceBySourceID(sourceID) if err != nil { return db.Source{}, err } - return item, s.CheckOne(ctx, item) + _, err = s.CheckOneStatus(ctx, item) + return item, err } func (s *Service) CheckOne(ctx context.Context, item db.Source) error { + _, err := s.CheckOneStatus(ctx, item) + return err +} + +func (s *Service) CheckOneStatus(ctx context.Context, item db.Source) (string, error) { if strings.TrimSpace(item.APIURL) == "" { - return errors.New("source api_url is empty") + return "error", errors.New("source api_url is empty") } timeout := time.Duration(item.TimeoutMS) * time.Millisecond - if timeout <= 0 { - timeout = 8 * time.Second + if timeout <= 0 || timeout < 15*time.Second { + timeout = 15 * time.Second } ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() @@ -262,7 +378,7 @@ func (s *Service) CheckOne(ctx context.Context, item db.Source) error { req, err := http.NewRequestWithContext(ctx, method, item.APIURL, nil) if err != nil { _ = s.store.RecordSourceCheck(item.ID, "error", 0, err.Error()) - return err + return "error", err } redirects := []string{} client := *s.client @@ -282,7 +398,7 @@ func (s *Service) CheckOne(ctx context.Context, item db.Source) error { latency := int(time.Since(start).Milliseconds()) if err != nil { _ = s.store.RecordSourceCheck(item.ID, "error", latency, err.Error()) - return err + return "error", err } defer resp.Body.Close() status := "ok" @@ -306,7 +422,47 @@ func (s *Service) CheckOne(ctx context.Context, item db.Source) error { "error": resp.Status, }) } - return s.store.RecordSourceCheck(item.ID, status, latency, message) + if err := s.store.RecordSourceCheck(item.ID, status, latency, message); err != nil { + return status, err + } + s.emit("source_check.item", map[string]any{"sourceId": item.SourceID, "status": status, "latencyMs": latency}) + return status, nil +} + +func (s *Service) saveJob(job CheckJob) { + s.mu.Lock() + defer s.mu.Unlock() + s.jobs[job.ID] = job +} + +func (s *Service) updateJob(id string, mutate func(*CheckJob)) { + s.mu.Lock() + defer s.mu.Unlock() + job := s.jobs[id] + mutate(&job) + s.jobs[id] = job +} + +func (s *Service) emit(kind string, data map[string]any) { + event := Event{Type: kind, Data: data} + select { + case s.events <- event: + default: + } +} + +func newJobID() string { + return fmt.Sprintf("check-%d", time.Now().UnixNano()) +} + +func sortJobs(items []CheckJob) { + for i := 0; i < len(items); i++ { + for j := i + 1; j < len(items); j++ { + if items[j].StartedAt > items[i].StartedAt { + items[i], items[j] = items[j], items[i] + } + } + } } func isHTTPURL(value *url.URL) bool { diff --git a/server/unified-management/internal/web/router.go b/server/unified-management/internal/web/router.go index 2f766ed..4cc347d 100644 --- a/server/unified-management/internal/web/router.go +++ b/server/unified-management/internal/web/router.go @@ -116,6 +116,8 @@ func (r *router) ServeHTTP(w http.ResponseWriter, req *http.Request) { r.auth.Require(http.HandlerFunc(r.handleAdminSources)).ServeHTTP(w, req) case strings.HasPrefix(path, "/api/admin/endpoints"): r.auth.Require(http.HandlerFunc(r.handleAdminEndpoints)).ServeHTTP(w, req) + case path == "/api/admin/events": + r.auth.Require(http.HandlerFunc(r.handleAdminEvents)).ServeHTTP(w, req) case strings.HasPrefix(path, "/api/admin/legacy"): r.auth.Require(http.HandlerFunc(r.handleAdminLegacy)).ServeHTTP(w, req) case strings.HasPrefix(path, "/api/admin/database"): @@ -637,6 +639,38 @@ func (r *router) handleAdminEndpoints(w http.ResponseWriter, req *http.Request) writeJSON(w, http.StatusOK, map[string]any{"ok": true, "items": items}) } +func (r *router) handleAdminEvents(w http.ResponseWriter, req *http.Request) { + if req.Method != http.MethodGet { + writeError(w, http.StatusMethodNotAllowed, "METHOD_NOT_ALLOWED", errors.New("GET required")) + return + } + flusher, ok := w.(http.Flusher) + if !ok { + writeError(w, http.StatusInternalServerError, "SSE_UNSUPPORTED", errors.New("streaming is not supported")) + return + } + w.Header().Set("Content-Type", "text/event-stream; charset=utf-8") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + events := r.sources.Events() + ticker := time.NewTicker(15 * time.Second) + defer ticker.Stop() + writeSSE(w, "ready", map[string]any{"ok": true, "time": time.Now().UTC().Format(time.RFC3339)}) + flusher.Flush() + for { + select { + case event := <-events: + writeSSE(w, event.Type, event.Data) + flusher.Flush() + case <-ticker.C: + writeSSE(w, "heartbeat", map[string]any{"time": time.Now().UTC().Format(time.RFC3339)}) + flusher.Flush() + case <-req.Context().Done(): + return + } + } +} + func (r *router) handleAdminReleases(w http.ResponseWriter, req *http.Request) { path := cleanPath(req.URL.Path) if strings.HasPrefix(path, "/api/admin/releases/notices") { @@ -786,8 +820,17 @@ func (r *router) handleAdminSources(w http.ResponseWriter, req *http.Request) { } writeJSON(w, http.StatusOK, map[string]any{"ok": true}) case req.Method == http.MethodPost && path == "/api/admin/sources/check": - go r.sources.CheckDue(req.Context()) - writeJSON(w, http.StatusOK, map[string]any{"ok": true, "queued": true}) + job := r.sources.QueueCheckAll() + writeJSON(w, http.StatusOK, map[string]any{"ok": true, "queued": true, "jobId": job.ID, "job": job}) + case req.Method == http.MethodGet && path == "/api/admin/sources/check/status": + writeJSON(w, http.StatusOK, map[string]any{"ok": true, "items": r.sources.CheckJobs()}) + case req.Method == http.MethodGet && strings.HasPrefix(path, "/api/admin/sources/check/status/"): + jobID := strings.TrimPrefix(path, "/api/admin/sources/check/status/") + if job, ok := r.sources.CheckJob(jobID); ok { + writeJSON(w, http.StatusOK, map[string]any{"ok": true, "job": job}) + return + } + writeError(w, http.StatusNotFound, "CHECK_JOB_NOT_FOUND", errors.New("check job not found")) case req.Method == http.MethodPost && strings.HasPrefix(path, "/api/admin/sources/") && strings.HasSuffix(path, "/check"): sourceID := strings.TrimSuffix(strings.TrimPrefix(path, "/api/admin/sources/"), "/check") item, err := r.sources.CheckSourceID(req.Context(), sourceID) @@ -967,6 +1010,12 @@ func writeJSON(w http.ResponseWriter, status int, payload any) { _ = json.NewEncoder(w).Encode(payload) } +func writeSSE(w http.ResponseWriter, event string, payload any) { + data, _ := json.Marshal(payload) + _, _ = w.Write([]byte("event: " + event + "\n")) + _, _ = w.Write([]byte("data: " + string(data) + "\n\n")) +} + func writeError(w http.ResponseWriter, status int, code string, err error) { message := "" if err != nil { diff --git a/server/unified-management/web/admin/src/App.vue b/server/unified-management/web/admin/src/App.vue index af05169..69bc28b 100644 --- a/server/unified-management/web/admin/src/App.vue +++ b/server/unified-management/web/admin/src/App.vue @@ -59,10 +59,12 @@ const toast = ref(null); const autoRefreshPaused = ref(false); let refreshTimer: number | undefined; let toastTimer: number | undefined; +let events: EventSource | null = null; const captcha = ref(null); const authBootstrap = ref(null); const dashboard = ref({}); +const sourceCheckJobs = ref([]); const feedbackPage = ref({ items: [], total: 0, page: 1, perPage: 20 }); const selectedFeedback = ref(null); const releases = ref(null); @@ -303,6 +305,7 @@ const viewContext = computed(() => ({ selectedFeedback: selectedFeedback.value, selectedNotice: selectedNotice.value, sourceCategories: sourceCategories.value, + sourceCheckJobs: sourceCheckJobs.value, sourceDraft, statusTone, syncDatabase, @@ -385,6 +388,7 @@ async function login() { }); csrf.value = data.csrfToken; localStorage.setItem("ymhut.csrf", csrf.value); + connectAdminEvents(); navigate("/admin/dashboard"); }); } @@ -393,6 +397,8 @@ async function logout() { await api("/api/admin/auth/logout", { method: "POST", body: "{}" }).catch(() => undefined); csrf.value = ""; localStorage.removeItem("ymhut.csrf"); + events?.close(); + events = null; navigate("/admin/login"); } @@ -716,13 +722,19 @@ async function saveSource() { async function checkSources() { await guarded(async () => { - await api("/api/admin/sources/check", { method: "POST", body: "{}" }); - setToast("接口心跳检测已进入队列"); + const data = await api<{ jobId: string; job: any }>("/api/admin/sources/check", { method: "POST", body: "{}" }); + if (data.job) sourceCheckJobs.value = [data.job, ...sourceCheckJobs.value.filter((item) => item.id !== data.job.id)].slice(0, 5); + setToast(`接口心跳检测已进入队列:${data.jobId}`); if (currentPath.value === "/admin/dashboard") await loadDashboard(); if (currentPath.value === "/admin/sources") await loadSources(); }); } +async function loadSourceCheckJobs() { + const data = await api<{ items: any[] }>("/api/admin/sources/check/status"); + sourceCheckJobs.value = data.items || []; +} + async function loadEndpoints() { const data = await api<{ items: any[] }>("/api/admin/endpoints"); endpoints.value = data.items || []; @@ -795,10 +807,11 @@ async function loadAudit() { async function changePassword() { await guarded(async () => { - await api("/api/admin/auth/password", { method: "POST", body: JSON.stringify(passwordForm) }); + const data = await api<{ isDefaultPassword: boolean; warning?: string }>("/api/admin/auth/password", { method: "POST", body: JSON.stringify(passwordForm) }); passwordForm.currentPassword = ""; passwordForm.newPassword = ""; - setToast("后台密码已修改,登录页将不再提示默认密码"); + if (authBootstrap.value) authBootstrap.value.isDefaultPassword = data.isDefaultPassword; + setToast(data.warning || "后台密码已修改,登录页将不再提示默认密码", data.warning ? "warn" : "success"); }); } @@ -909,6 +922,7 @@ function splitList(value: string) { onMounted(() => { void load(); + connectAdminEvents(); refreshTimer = window.setInterval(() => { if (!autoRefreshPaused.value && currentPath.value === "/admin/dashboard" && csrf.value) void loadDashboard(); }, 15000); @@ -916,7 +930,30 @@ onMounted(() => { onUnmounted(() => { if (refreshTimer) window.clearInterval(refreshTimer); + events?.close(); + events = null; }); + +function connectAdminEvents() { + if (!csrf.value || events) return; + events = new EventSource("/api/admin/events", { withCredentials: true }); + const refreshCurrent = () => { + if (autoRefreshPaused.value) return; + if (currentPath.value === "/admin/dashboard") void Promise.all([loadDashboard(), loadSourceCheckJobs().catch(() => undefined)]); + if (currentPath.value === "/admin/sources") void Promise.all([loadSources(), loadSourceCheckJobs().catch(() => undefined)]); + if (currentPath.value === "/admin/endpoints") void loadEndpoints(); + if (currentPath.value === "/admin/audit") void loadAudit(); + if (currentPath.value === "/admin/database") void loadDatabase(); + }; + for (const name of ["source_check.item", "source_check.progress", "source_check.completed", "heartbeat"]) { + events.addEventListener(name, refreshCurrent); + } + events.onerror = () => { + events?.close(); + events = null; + window.setTimeout(connectAdminEvents, 5000); + }; +}