Files
YMhut-box-C-/server/unified-management/internal/sources/sources.go
T
QWQLwToo 2513eb2903
build-winui / winui (push) Has been cancelled
继续更新 update 门户站点界面和功能
2026-06-26 20:17:48 +08:00

534 lines
14 KiB
Go

package sources
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
"sync"
"time"
"ymhut-box/server/unified-management/internal/config"
"ymhut-box/server/unified-management/internal/db"
)
type Service struct {
cfg *config.Config
store *db.Store
client *http.Client
stop chan struct{}
once sync.Once
mu sync.RWMutex
jobs map[string]CheckJob
subscribers map[chan Event]struct{}
}
type Event struct {
Type string `json:"type"`
Data map[string]any `json:"data"`
}
type CheckJob struct {
ID string `json:"id"`
Status string `json:"status"`
StartedAt string `json:"startedAt"`
FinishedAt string `json:"finishedAt"`
Total int `json:"total"`
Checked int `json:"checked"`
Stats map[string]int `json:"stats"`
LastError string `json:"lastError"`
}
type legacyMedia struct {
Categories []legacyCategory `json:"categories"`
}
type legacyCategory struct {
ID string `json:"id"`
Name string `json:"name"`
Enabled *bool `json:"enabled"`
Subcategories []legacySubcategory `json:"subcategories"`
}
type legacySubcategory struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
APIURL string `json:"api_url"`
ThumbnailURL string `json:"thumbnail_url"`
RefreshInterval int `json:"refresh_interval"`
SupportedFormats []string `json:"supported_formats"`
Downloadable bool `json:"downloadable"`
}
func NewService(cfg *config.Config, store *db.Store) *Service {
return &Service{
cfg: cfg,
store: store,
client: &http.Client{Timeout: 10 * time.Second},
stop: make(chan struct{}),
jobs: map[string]CheckJob{},
subscribers: map[chan Event]struct{}{},
}
}
func (s *Service) Start(ctx context.Context) {
s.once.Do(func() {
go s.loop()
})
}
func (s *Service) Stop() {
close(s.stop)
}
func (s *Service) loop() {
ticker := time.NewTicker(time.Duration(s.cfg.SourceCheckSeconds) * time.Second)
defer ticker.Stop()
s.CheckDue(context.Background())
for {
select {
case <-ticker.C:
s.CheckDue(context.Background())
case <-s.stop:
return
}
}
}
func (s *Service) ImportLegacyMediaTypesIfEmpty(ctx context.Context) error {
count, err := s.store.CountSources()
if err != nil {
return err
}
if count > 0 {
return nil
}
return s.ImportLegacyMediaTypes(ctx)
}
func (s *Service) ImportLegacyMediaTypes(ctx context.Context) error {
data, err := os.ReadFile(filepath.Join(s.cfg.UpdatePublicDir, "media-types.json"))
if err != nil {
return err
}
var legacy legacyMedia
if err := json.Unmarshal(data, &legacy); err != nil {
return err
}
for _, category := range legacy.Categories {
for _, sub := range category.Subcategories {
if strings.TrimSpace(sub.APIURL) == "" {
continue
}
formats, _ := json.Marshal(sub.SupportedFormats)
_, err := s.store.UpsertSource(db.Source{
CategoryID: defaultString(category.ID, "media"),
CategoryName: defaultString(category.Name, category.ID),
SourceID: defaultString(sub.ID, category.ID+"-"+sub.Name),
Name: defaultString(sub.Name, sub.ID),
Description: sub.Description,
Method: "GET",
APIURL: sub.APIURL,
ThumbnailURL: sub.ThumbnailURL,
ProxyMode: "client_direct",
TimeoutMS: 8000,
RetryCount: 1,
CheckIntervalSec: maxInt(sub.RefreshInterval, 300),
Enabled: legacyEnabled(category.Enabled),
ClientVisible: true,
SupportedFormats: string(formats),
})
if err != nil {
return err
}
}
}
return nil
}
func (s *Service) Catalog(includeHidden bool) (map[string]any, error) {
items, err := s.store.ListSources(includeHidden)
if err != nil {
return nil, err
}
categories := map[string]map[string]any{}
for _, item := range items {
cat, ok := categories[item.CategoryID]
if !ok {
cat = map[string]any{
"id": item.CategoryID,
"name": item.CategoryName,
"enabled": true,
"subcategories": []map[string]any{},
}
categories[item.CategoryID] = cat
}
var formats []string
_ = json.Unmarshal([]byte(item.SupportedFormats), &formats)
sub := map[string]any{
"id": item.SourceID,
"name": item.Name,
"description": item.Description,
"api_url": item.APIURL,
"urlTemplate": firstNonEmpty(item.URLTemplate, item.APIURL),
"thumbnail_url": item.ThumbnailURL,
"method": item.Method,
"proxy_mode": item.ProxyMode,
"proxyMode": item.ProxyMode,
"refresh_interval": item.CheckIntervalSec,
"cacheSeconds": item.CacheSeconds,
"supported_formats": formats,
"downloadable": true,
"health": map[string]any{
"status": item.LastStatus,
"latency_ms": item.LastLatencyMS,
"last_checked_at": item.LastCheckedAt,
"last_error": item.LastError,
"consecutiveFailure": item.ConsecutiveFailure,
"meta": parseHealthMeta(item.LastError),
},
}
cat["subcategories"] = append(cat["subcategories"].([]map[string]any), sub)
}
out := []map[string]any{}
for _, cat := range categories {
out = append(out, cat)
}
return map[string]any{
"layout_version": "2.0.0",
"last_updated": time.Now().UTC().Format(time.RFC3339),
"categories": out,
}, nil
}
func (s *Service) Endpoints(includeHidden bool) ([]map[string]any, error) {
items, err := s.store.ListSources(includeHidden)
if err != nil {
return nil, err
}
out := []map[string]any{}
for _, item := range items {
var formats []string
_ = json.Unmarshal([]byte(item.SupportedFormats), &formats)
out = append(out, map[string]any{
"id": item.SourceID,
"category": item.CategoryID,
"name": item.Name,
"method": item.Method,
"urlTemplate": firstNonEmpty(item.URLTemplate, item.APIURL),
"proxyMode": item.ProxyMode,
"clientVisible": item.ClientVisible,
"enabled": item.Enabled,
"cacheSeconds": item.CacheSeconds,
"supportedFormats": formats,
"health": map[string]any{
"status": item.LastStatus,
"latencyMs": item.LastLatencyMS,
"lastCheckedAt": item.LastCheckedAt,
"lastError": item.LastError,
"consecutiveFailure": item.ConsecutiveFailure,
"meta": parseHealthMeta(item.LastError),
},
})
}
return out, nil
}
func (s *Service) CheckDue(ctx context.Context) {
items, err := s.store.ListSources(true)
if err != nil {
return
}
now := time.Now()
for _, item := range items {
if !item.Enabled {
continue
}
if item.LastCheckedAt != "" {
if last, err := time.Parse(time.RFC3339, item.LastCheckedAt); err == nil && now.Sub(last) < time.Duration(item.CheckIntervalSec)*time.Second {
continue
}
}
_ = s.CheckOne(ctx, item)
}
}
func (s *Service) QueueCheckAll() CheckJob {
items, err := s.store.ListSources(true)
if err != nil {
job := CheckJob{ID: newJobID(), Status: "failed", StartedAt: time.Now().UTC().Format(time.RFC3339), FinishedAt: time.Now().UTC().Format(time.RFC3339), Stats: map[string]int{}, LastError: err.Error()}
s.saveJob(job)
return job
}
filtered := make([]db.Source, 0, len(items))
for _, item := range items {
if item.Enabled {
filtered = append(filtered, item)
}
}
job := CheckJob{ID: newJobID(), Status: "running", StartedAt: time.Now().UTC().Format(time.RFC3339), Total: len(filtered), Stats: map[string]int{"ok": 0, "redirected": 0, "degraded": 0, "error": 0}}
s.saveJob(job)
go s.runCheckJob(context.Background(), job.ID, filtered)
return job
}
func (s *Service) CheckJobs() []CheckJob {
s.mu.RLock()
defer s.mu.RUnlock()
items := make([]CheckJob, 0, len(s.jobs))
for _, item := range s.jobs {
items = append(items, item)
}
sortJobs(items)
return items
}
func (s *Service) CheckJob(id string) (CheckJob, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
item, ok := s.jobs[id]
return item, ok
}
func (s *Service) SubscribeEvents() (<-chan Event, func()) {
ch := make(chan Event, 16)
s.mu.Lock()
s.subscribers[ch] = struct{}{}
s.mu.Unlock()
unsubscribe := func() {
s.mu.Lock()
if _, ok := s.subscribers[ch]; ok {
delete(s.subscribers, ch)
close(ch)
}
s.mu.Unlock()
}
return ch, unsubscribe
}
func (s *Service) runCheckJob(ctx context.Context, id string, items []db.Source) {
if len(items) == 0 {
s.updateJob(id, func(job *CheckJob) {
job.Status = "completed"
job.FinishedAt = time.Now().UTC().Format(time.RFC3339)
})
s.emit("source_check.completed", map[string]any{"jobId": id})
return
}
const concurrency = 4
work := make(chan db.Source)
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for item := range work {
status, err := s.CheckOneStatus(ctx, item)
if err != nil && status == "" {
status = "error"
}
s.updateJob(id, func(job *CheckJob) {
job.Checked++
if job.Stats == nil {
job.Stats = map[string]int{}
}
job.Stats[status]++
if err != nil {
job.LastError = err.Error()
}
})
s.emit("source_check.progress", map[string]any{"jobId": id, "sourceId": item.SourceID, "status": status})
}
}()
}
for _, item := range items {
work <- item
}
close(work)
wg.Wait()
s.updateJob(id, func(job *CheckJob) {
job.Status = "completed"
job.FinishedAt = time.Now().UTC().Format(time.RFC3339)
})
s.emit("source_check.completed", map[string]any{"jobId": id})
}
func (s *Service) CheckSourceID(ctx context.Context, sourceID string) (db.Source, error) {
item, err := s.store.GetSourceBySourceID(sourceID)
if err != nil {
return db.Source{}, err
}
_, err = s.CheckOneStatus(ctx, item)
return item, err
}
func (s *Service) CheckOne(ctx context.Context, item db.Source) error {
_, err := s.CheckOneStatus(ctx, item)
return err
}
func (s *Service) CheckOneStatus(ctx context.Context, item db.Source) (string, error) {
if strings.TrimSpace(item.APIURL) == "" {
return "error", errors.New("source api_url is empty")
}
timeout := time.Duration(item.TimeoutMS) * time.Millisecond
if timeout <= 0 || timeout < 15*time.Second {
timeout = 15 * time.Second
}
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
method := strings.TrimSpace(item.Method)
if method == "" {
method = http.MethodGet
}
req, err := http.NewRequestWithContext(ctx, method, item.APIURL, nil)
if err != nil {
_ = s.store.RecordSourceCheck(item.ID, "error", 0, err.Error())
return "error", err
}
redirects := []string{}
client := *s.client
client.Timeout = timeout
client.CheckRedirect = func(next *http.Request, via []*http.Request) error {
if next.URL == nil || !isHTTPURL(next.URL) {
return errors.New("redirect target must be http or https")
}
redirects = append(redirects, next.URL.String())
if len(via) >= 5 {
return errors.New("too many redirects")
}
return nil
}
start := time.Now()
resp, err := client.Do(req)
latency := int(time.Since(start).Milliseconds())
if err != nil {
_ = s.store.RecordSourceCheck(item.ID, "error", latency, err.Error())
return "error", err
}
defer resp.Body.Close()
status := "ok"
message := ""
if len(redirects) > 0 {
status = "redirected"
message = healthMetaMessage(map[string]any{
"redirected": true,
"redirectCount": len(redirects),
"finalUrl": resp.Request.URL.String(),
"finalStatus": resp.StatusCode,
})
}
if resp.StatusCode >= 400 {
status = "degraded"
message = healthMetaMessage(map[string]any{
"redirected": len(redirects) > 0,
"redirectCount": len(redirects),
"finalUrl": resp.Request.URL.String(),
"finalStatus": resp.StatusCode,
"error": resp.Status,
})
}
if err := s.store.RecordSourceCheck(item.ID, status, latency, message); err != nil {
return status, err
}
s.emit("source_check.item", map[string]any{"sourceId": item.SourceID, "status": status, "latencyMs": latency})
return status, nil
}
func (s *Service) saveJob(job CheckJob) {
s.mu.Lock()
defer s.mu.Unlock()
s.jobs[job.ID] = job
}
func (s *Service) updateJob(id string, mutate func(*CheckJob)) {
s.mu.Lock()
defer s.mu.Unlock()
job := s.jobs[id]
mutate(&job)
s.jobs[id] = job
}
func (s *Service) emit(kind string, data map[string]any) {
event := Event{Type: kind, Data: data}
s.mu.RLock()
defer s.mu.RUnlock()
for ch := range s.subscribers {
select {
case ch <- event:
default:
}
}
}
func newJobID() string {
return fmt.Sprintf("check-%d", time.Now().UnixNano())
}
func sortJobs(items []CheckJob) {
for i := 0; i < len(items); i++ {
for j := i + 1; j < len(items); j++ {
if items[j].StartedAt > items[i].StartedAt {
items[i], items[j] = items[j], items[i]
}
}
}
}
func isHTTPURL(value *url.URL) bool {
scheme := strings.ToLower(value.Scheme)
return scheme == "http" || scheme == "https"
}
func healthMetaMessage(meta map[string]any) string {
data, err := json.Marshal(meta)
if err != nil {
return ""
}
return string(data)
}
func parseHealthMeta(message string) map[string]any {
var meta map[string]any
if strings.TrimSpace(message) == "" || json.Unmarshal([]byte(message), &meta) != nil {
return map[string]any{}
}
return meta
}
func defaultString(value, fallback string) string {
if strings.TrimSpace(value) == "" {
return fallback
}
return value
}
func legacyEnabled(value *bool) bool {
if value == nil {
return true
}
return *value
}
func maxInt(value, fallback int) int {
if value > 0 {
return value
}
return fallback
}
func firstNonEmpty(values ...string) string {
for _, value := range values {
if strings.TrimSpace(value) != "" {
return strings.TrimSpace(value)
}
}
return ""
}