继续更新 update 门户站点界面和功能

This commit is contained in:
QWQLwToo
2026-06-26 14:33:43 +08:00
parent cd2fd435a2
commit 2171b933eb
5 changed files with 286 additions and 19 deletions
@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"os"
@@ -22,6 +23,25 @@ type Service struct {
client *http.Client
stop chan struct{}
once sync.Once
mu sync.RWMutex
jobs map[string]CheckJob
events chan Event
}
type Event struct {
Type string `json:"type"`
Data map[string]any `json:"data"`
}
type CheckJob struct {
ID string `json:"id"`
Status string `json:"status"`
StartedAt string `json:"startedAt"`
FinishedAt string `json:"finishedAt"`
Total int `json:"total"`
Checked int `json:"checked"`
Stats map[string]int `json:"stats"`
LastError string `json:"lastError"`
}
type legacyMedia struct {
@@ -52,6 +72,8 @@ func NewService(cfg *config.Config, store *db.Store) *Service {
store: store,
client: &http.Client{Timeout: 10 * time.Second},
stop: make(chan struct{}),
jobs: map[string]CheckJob{},
events: make(chan Event, 32),
}
}
@@ -237,21 +259,115 @@ func (s *Service) CheckDue(ctx context.Context) {
}
}
func (s *Service) QueueCheckAll() CheckJob {
items, err := s.store.ListSources(true)
if err != nil {
job := CheckJob{ID: newJobID(), Status: "failed", StartedAt: time.Now().UTC().Format(time.RFC3339), FinishedAt: time.Now().UTC().Format(time.RFC3339), Stats: map[string]int{}, LastError: err.Error()}
s.saveJob(job)
return job
}
filtered := make([]db.Source, 0, len(items))
for _, item := range items {
if item.Enabled {
filtered = append(filtered, item)
}
}
job := CheckJob{ID: newJobID(), Status: "running", StartedAt: time.Now().UTC().Format(time.RFC3339), Total: len(filtered), Stats: map[string]int{"ok": 0, "redirected": 0, "degraded": 0, "error": 0}}
s.saveJob(job)
go s.runCheckJob(context.Background(), job.ID, filtered)
return job
}
func (s *Service) CheckJobs() []CheckJob {
s.mu.RLock()
defer s.mu.RUnlock()
items := make([]CheckJob, 0, len(s.jobs))
for _, item := range s.jobs {
items = append(items, item)
}
sortJobs(items)
return items
}
func (s *Service) CheckJob(id string) (CheckJob, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
item, ok := s.jobs[id]
return item, ok
}
func (s *Service) Events() <-chan Event {
return s.events
}
func (s *Service) runCheckJob(ctx context.Context, id string, items []db.Source) {
if len(items) == 0 {
s.updateJob(id, func(job *CheckJob) {
job.Status = "completed"
job.FinishedAt = time.Now().UTC().Format(time.RFC3339)
})
s.emit("source_check.completed", map[string]any{"jobId": id})
return
}
const concurrency = 4
work := make(chan db.Source)
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for item := range work {
status, err := s.CheckOneStatus(ctx, item)
if err != nil && status == "" {
status = "error"
}
s.updateJob(id, func(job *CheckJob) {
job.Checked++
if job.Stats == nil {
job.Stats = map[string]int{}
}
job.Stats[status]++
if err != nil {
job.LastError = err.Error()
}
})
s.emit("source_check.progress", map[string]any{"jobId": id, "sourceId": item.SourceID, "status": status})
}
}()
}
for _, item := range items {
work <- item
}
close(work)
wg.Wait()
s.updateJob(id, func(job *CheckJob) {
job.Status = "completed"
job.FinishedAt = time.Now().UTC().Format(time.RFC3339)
})
s.emit("source_check.completed", map[string]any{"jobId": id})
}
func (s *Service) CheckSourceID(ctx context.Context, sourceID string) (db.Source, error) {
item, err := s.store.GetSourceBySourceID(sourceID)
if err != nil {
return db.Source{}, err
}
return item, s.CheckOne(ctx, item)
_, err = s.CheckOneStatus(ctx, item)
return item, err
}
func (s *Service) CheckOne(ctx context.Context, item db.Source) error {
_, err := s.CheckOneStatus(ctx, item)
return err
}
func (s *Service) CheckOneStatus(ctx context.Context, item db.Source) (string, error) {
if strings.TrimSpace(item.APIURL) == "" {
return errors.New("source api_url is empty")
return "error", errors.New("source api_url is empty")
}
timeout := time.Duration(item.TimeoutMS) * time.Millisecond
if timeout <= 0 {
timeout = 8 * time.Second
if timeout <= 0 || timeout < 15*time.Second {
timeout = 15 * time.Second
}
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
@@ -262,7 +378,7 @@ func (s *Service) CheckOne(ctx context.Context, item db.Source) error {
req, err := http.NewRequestWithContext(ctx, method, item.APIURL, nil)
if err != nil {
_ = s.store.RecordSourceCheck(item.ID, "error", 0, err.Error())
return err
return "error", err
}
redirects := []string{}
client := *s.client
@@ -282,7 +398,7 @@ func (s *Service) CheckOne(ctx context.Context, item db.Source) error {
latency := int(time.Since(start).Milliseconds())
if err != nil {
_ = s.store.RecordSourceCheck(item.ID, "error", latency, err.Error())
return err
return "error", err
}
defer resp.Body.Close()
status := "ok"
@@ -306,7 +422,47 @@ func (s *Service) CheckOne(ctx context.Context, item db.Source) error {
"error": resp.Status,
})
}
return s.store.RecordSourceCheck(item.ID, status, latency, message)
if err := s.store.RecordSourceCheck(item.ID, status, latency, message); err != nil {
return status, err
}
s.emit("source_check.item", map[string]any{"sourceId": item.SourceID, "status": status, "latencyMs": latency})
return status, nil
}
func (s *Service) saveJob(job CheckJob) {
s.mu.Lock()
defer s.mu.Unlock()
s.jobs[job.ID] = job
}
func (s *Service) updateJob(id string, mutate func(*CheckJob)) {
s.mu.Lock()
defer s.mu.Unlock()
job := s.jobs[id]
mutate(&job)
s.jobs[id] = job
}
func (s *Service) emit(kind string, data map[string]any) {
event := Event{Type: kind, Data: data}
select {
case s.events <- event:
default:
}
}
func newJobID() string {
return fmt.Sprintf("check-%d", time.Now().UnixNano())
}
func sortJobs(items []CheckJob) {
for i := 0; i < len(items); i++ {
for j := i + 1; j < len(items); j++ {
if items[j].StartedAt > items[i].StartedAt {
items[i], items[j] = items[j], items[i]
}
}
}
}
func isHTTPURL(value *url.URL) bool {
@@ -116,6 +116,8 @@ func (r *router) ServeHTTP(w http.ResponseWriter, req *http.Request) {
r.auth.Require(http.HandlerFunc(r.handleAdminSources)).ServeHTTP(w, req)
case strings.HasPrefix(path, "/api/admin/endpoints"):
r.auth.Require(http.HandlerFunc(r.handleAdminEndpoints)).ServeHTTP(w, req)
case path == "/api/admin/events":
r.auth.Require(http.HandlerFunc(r.handleAdminEvents)).ServeHTTP(w, req)
case strings.HasPrefix(path, "/api/admin/legacy"):
r.auth.Require(http.HandlerFunc(r.handleAdminLegacy)).ServeHTTP(w, req)
case strings.HasPrefix(path, "/api/admin/database"):
@@ -637,6 +639,38 @@ func (r *router) handleAdminEndpoints(w http.ResponseWriter, req *http.Request)
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "items": items})
}
func (r *router) handleAdminEvents(w http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodGet {
writeError(w, http.StatusMethodNotAllowed, "METHOD_NOT_ALLOWED", errors.New("GET required"))
return
}
flusher, ok := w.(http.Flusher)
if !ok {
writeError(w, http.StatusInternalServerError, "SSE_UNSUPPORTED", errors.New("streaming is not supported"))
return
}
w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
events := r.sources.Events()
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()
writeSSE(w, "ready", map[string]any{"ok": true, "time": time.Now().UTC().Format(time.RFC3339)})
flusher.Flush()
for {
select {
case event := <-events:
writeSSE(w, event.Type, event.Data)
flusher.Flush()
case <-ticker.C:
writeSSE(w, "heartbeat", map[string]any{"time": time.Now().UTC().Format(time.RFC3339)})
flusher.Flush()
case <-req.Context().Done():
return
}
}
}
func (r *router) handleAdminReleases(w http.ResponseWriter, req *http.Request) {
path := cleanPath(req.URL.Path)
if strings.HasPrefix(path, "/api/admin/releases/notices") {
@@ -786,8 +820,17 @@ func (r *router) handleAdminSources(w http.ResponseWriter, req *http.Request) {
}
writeJSON(w, http.StatusOK, map[string]any{"ok": true})
case req.Method == http.MethodPost && path == "/api/admin/sources/check":
go r.sources.CheckDue(req.Context())
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "queued": true})
job := r.sources.QueueCheckAll()
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "queued": true, "jobId": job.ID, "job": job})
case req.Method == http.MethodGet && path == "/api/admin/sources/check/status":
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "items": r.sources.CheckJobs()})
case req.Method == http.MethodGet && strings.HasPrefix(path, "/api/admin/sources/check/status/"):
jobID := strings.TrimPrefix(path, "/api/admin/sources/check/status/")
if job, ok := r.sources.CheckJob(jobID); ok {
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "job": job})
return
}
writeError(w, http.StatusNotFound, "CHECK_JOB_NOT_FOUND", errors.New("check job not found"))
case req.Method == http.MethodPost && strings.HasPrefix(path, "/api/admin/sources/") && strings.HasSuffix(path, "/check"):
sourceID := strings.TrimSuffix(strings.TrimPrefix(path, "/api/admin/sources/"), "/check")
item, err := r.sources.CheckSourceID(req.Context(), sourceID)
@@ -967,6 +1010,12 @@ func writeJSON(w http.ResponseWriter, status int, payload any) {
_ = json.NewEncoder(w).Encode(payload)
}
func writeSSE(w http.ResponseWriter, event string, payload any) {
data, _ := json.Marshal(payload)
_, _ = w.Write([]byte("event: " + event + "\n"))
_, _ = w.Write([]byte("data: " + string(data) + "\n\n"))
}
func writeError(w http.ResponseWriter, status int, code string, err error) {
message := ""
if err != nil {