package sources import ( "context" "encoding/json" "errors" "fmt" "io" "net/http" "net/url" "os" "path/filepath" "regexp" "strings" "sync" "time" "ymhut-box/server/unified-management/internal/config" "ymhut-box/server/unified-management/internal/db" ) type Service struct { cfg *config.Config store *db.Store client *http.Client stop chan struct{} once sync.Once mu sync.RWMutex jobs map[string]CheckJob subscribers map[chan Event]struct{} } type Event struct { Type string `json:"type"` Data map[string]any `json:"data"` } type CheckJob struct { ID string `json:"id"` Status string `json:"status"` StartedAt string `json:"startedAt"` FinishedAt string `json:"finishedAt"` Total int `json:"total"` Checked int `json:"checked"` Stats map[string]int `json:"stats"` LastError string `json:"lastError"` } type mediaResolution struct { URL string Key string MediaType string Direct bool } type mediaCandidate struct { Resolution mediaResolution Score int Depth int Order int } type legacyMedia struct { Categories []legacyCategory `json:"categories"` } const maxSourceProbeBytes int64 = 2 * 1024 * 1024 var absoluteURLPattern = regexp.MustCompile(`https?://[^\s"'<>\\]+`) type legacyCategory struct { ID string `json:"id"` Name string `json:"name"` Enabled *bool `json:"enabled"` Subcategories []legacySubcategory `json:"subcategories"` } type legacySubcategory struct { ID string `json:"id"` Name string `json:"name"` Description string `json:"description"` APIURL string `json:"api_url"` ThumbnailURL string `json:"thumbnail_url"` RefreshInterval int `json:"refresh_interval"` SupportedFormats []string `json:"supported_formats"` Downloadable bool `json:"downloadable"` } func NewService(cfg *config.Config, store *db.Store) *Service { return &Service{ cfg: cfg, store: store, client: &http.Client{Timeout: 10 * time.Second}, stop: make(chan struct{}), jobs: map[string]CheckJob{}, subscribers: map[chan Event]struct{}{}, } } func (s *Service) Start(ctx context.Context) { s.once.Do(func() { go s.loop() }) } func (s *Service) Stop() { close(s.stop) } func (s *Service) loop() { ticker := time.NewTicker(time.Duration(s.cfg.SourceCheckSeconds) * time.Second) defer ticker.Stop() s.CheckDue(context.Background()) for { select { case <-ticker.C: s.CheckDue(context.Background()) case <-s.stop: return } } } func (s *Service) ImportLegacyMediaTypesIfEmpty(ctx context.Context) error { count, err := s.store.CountSources() if err != nil { return err } if count > 0 { return nil } return s.ImportLegacyMediaTypes(ctx) } func (s *Service) ImportLegacyMediaTypes(ctx context.Context) error { data, err := os.ReadFile(filepath.Join(s.cfg.UpdatePublicDir, "media-types.json")) if err != nil { return err } var legacy legacyMedia if err := json.Unmarshal(data, &legacy); err != nil { return err } for _, category := range legacy.Categories { for _, sub := range category.Subcategories { if strings.TrimSpace(sub.APIURL) == "" { continue } formats, _ := json.Marshal(sub.SupportedFormats) _, err := s.store.UpsertSource(db.Source{ CategoryID: defaultString(category.ID, "media"), CategoryName: defaultString(category.Name, category.ID), SourceID: defaultString(sub.ID, category.ID+"-"+sub.Name), Name: defaultString(sub.Name, sub.ID), Description: sub.Description, Method: "GET", APIURL: sub.APIURL, ThumbnailURL: sub.ThumbnailURL, ProxyMode: "client_direct", TimeoutMS: 8000, RetryCount: 1, CheckIntervalSec: maxInt(sub.RefreshInterval, 300), Enabled: legacyEnabled(category.Enabled), ClientVisible: true, SupportedFormats: string(formats), }) if err != nil { return err } } } return nil } func (s *Service) DeleteSourceAndPublishCompatibility(ctx context.Context, sourceID, actor string) error { sourceID = strings.TrimSpace(sourceID) if sourceID == "" || strings.ContainsAny(sourceID, `/\`) || strings.Contains(sourceID, "..") { return errors.New("invalid source id") } if _, err := s.store.GetSourceBySourceID(sourceID); err != nil { return err } if err := s.store.DeleteSource(sourceID); err != nil { return err } if err := s.PublishLegacyMediaTypes(ctx, actor); err != nil { return err } _ = s.store.InsertAudit(db.AuditLog{Actor: firstNonEmpty(actor, "admin"), Type: "source.deleted", Target: sourceID, Message: "客户端接口已删除并同步兼容 media-types.json"}) return nil } func (s *Service) PublishLegacyMediaTypes(ctx context.Context, actor string) error { catalog, err := s.Catalog(false) if err != nil { return err } data, err := json.MarshalIndent(catalog, "", " ") if err != nil { return err } formatted := append(data, '\n') path := filepath.Join(s.cfg.UpdatePublicDir, "media-types.json") if err := atomicWrite(path, formatted); err != nil { return err } _, _ = s.store.SaveLegacyRevision("media-types", string(formatted), "generated from source database", firstNonEmpty(actor, "system")) return nil } func (s *Service) Catalog(includeHidden bool) (map[string]any, error) { items, err := s.store.ListSources(includeHidden) if err != nil { return nil, err } categories := map[string]map[string]any{} for _, item := range items { cat, ok := categories[item.CategoryID] if !ok { cat = map[string]any{ "id": item.CategoryID, "name": item.CategoryName, "enabled": true, "subcategories": []map[string]any{}, } categories[item.CategoryID] = cat } var formats []string _ = json.Unmarshal([]byte(item.SupportedFormats), &formats) sub := map[string]any{ "id": item.SourceID, "name": item.Name, "description": item.Description, "api_url": item.APIURL, "urlTemplate": firstNonEmpty(item.URLTemplate, item.APIURL), "thumbnail_url": item.ThumbnailURL, "method": item.Method, "proxy_mode": item.ProxyMode, "proxyMode": item.ProxyMode, "refresh_interval": item.CheckIntervalSec, "cacheSeconds": item.CacheSeconds, "supported_formats": formats, "downloadable": true, "health": map[string]any{ "status": item.LastStatus, "latency_ms": item.LastLatencyMS, "last_checked_at": item.LastCheckedAt, "last_error": item.LastError, "consecutiveFailure": item.ConsecutiveFailure, "meta": parseHealthMeta(item.LastError), }, } applyResolvedFields(sub, item.LastError) cat["subcategories"] = append(cat["subcategories"].([]map[string]any), sub) } out := []map[string]any{} for _, cat := range categories { out = append(out, cat) } return map[string]any{ "layout_version": "2.0.0", "last_updated": time.Now().UTC().Format(time.RFC3339), "categories": out, }, nil } func (s *Service) Endpoints(includeHidden bool) ([]map[string]any, error) { items, err := s.store.ListSources(includeHidden) if err != nil { return nil, err } out := []map[string]any{} for _, item := range items { var formats []string _ = json.Unmarshal([]byte(item.SupportedFormats), &formats) endpoint := map[string]any{ "id": item.SourceID, "category": item.CategoryID, "name": item.Name, "method": item.Method, "urlTemplate": firstNonEmpty(item.URLTemplate, item.APIURL), "proxyMode": item.ProxyMode, "clientVisible": item.ClientVisible, "enabled": item.Enabled, "cacheSeconds": item.CacheSeconds, "supportedFormats": formats, "health": map[string]any{ "status": item.LastStatus, "latencyMs": item.LastLatencyMS, "lastCheckedAt": item.LastCheckedAt, "lastError": item.LastError, "consecutiveFailure": item.ConsecutiveFailure, "meta": parseHealthMeta(item.LastError), }, } applyResolvedFields(endpoint, item.LastError) out = append(out, endpoint) } return out, nil } func (s *Service) CheckDue(ctx context.Context) { items, err := s.store.ListSources(true) if err != nil { return } now := time.Now() for _, item := range items { if !item.Enabled { continue } if item.LastCheckedAt != "" { if last, err := time.Parse(time.RFC3339, item.LastCheckedAt); err == nil && now.Sub(last) < time.Duration(item.CheckIntervalSec)*time.Second { continue } } _ = s.CheckOne(ctx, item) } } func (s *Service) QueueCheckAll() CheckJob { items, err := s.store.ListSources(true) if err != nil { job := CheckJob{ID: newJobID(), Status: "failed", StartedAt: time.Now().UTC().Format(time.RFC3339), FinishedAt: time.Now().UTC().Format(time.RFC3339), Stats: map[string]int{}, LastError: err.Error()} s.saveJob(job) return job } filtered := make([]db.Source, 0, len(items)) for _, item := range items { if item.Enabled { filtered = append(filtered, item) } } job := CheckJob{ID: newJobID(), Status: "running", StartedAt: time.Now().UTC().Format(time.RFC3339), Total: len(filtered), Stats: map[string]int{"ok": 0, "redirected": 0, "degraded": 0, "error": 0}} s.saveJob(job) go s.runCheckJob(context.Background(), job.ID, filtered) return job } func (s *Service) CheckJobs() []CheckJob { s.mu.RLock() defer s.mu.RUnlock() items := make([]CheckJob, 0, len(s.jobs)) for _, item := range s.jobs { items = append(items, item) } sortJobs(items) return items } func (s *Service) CheckJob(id string) (CheckJob, bool) { s.mu.RLock() defer s.mu.RUnlock() item, ok := s.jobs[id] return item, ok } func (s *Service) SubscribeEvents() (<-chan Event, func()) { ch := make(chan Event, 16) s.mu.Lock() s.subscribers[ch] = struct{}{} s.mu.Unlock() unsubscribe := func() { s.mu.Lock() if _, ok := s.subscribers[ch]; ok { delete(s.subscribers, ch) close(ch) } s.mu.Unlock() } return ch, unsubscribe } func (s *Service) runCheckJob(ctx context.Context, id string, items []db.Source) { if len(items) == 0 { s.updateJob(id, func(job *CheckJob) { job.Status = "completed" job.FinishedAt = time.Now().UTC().Format(time.RFC3339) }) s.emit("source_check.completed", map[string]any{"jobId": id}) return } const concurrency = 4 work := make(chan db.Source) var wg sync.WaitGroup for i := 0; i < concurrency; i++ { wg.Add(1) go func() { defer wg.Done() for item := range work { status, err := s.CheckOneStatus(ctx, item) if err != nil && status == "" { status = "error" } s.updateJob(id, func(job *CheckJob) { job.Checked++ if job.Stats == nil { job.Stats = map[string]int{} } job.Stats[status]++ if err != nil { job.LastError = err.Error() } }) s.emit("source_check.progress", map[string]any{"jobId": id, "sourceId": item.SourceID, "status": status}) } }() } for _, item := range items { work <- item } close(work) wg.Wait() s.updateJob(id, func(job *CheckJob) { job.Status = "completed" job.FinishedAt = time.Now().UTC().Format(time.RFC3339) }) s.emit("source_check.completed", map[string]any{"jobId": id}) } func (s *Service) CheckSourceID(ctx context.Context, sourceID string) (db.Source, error) { item, err := s.store.GetSourceBySourceID(sourceID) if err != nil { return db.Source{}, err } _, err = s.CheckOneStatus(ctx, item) return item, err } func (s *Service) CheckOne(ctx context.Context, item db.Source) error { _, err := s.CheckOneStatus(ctx, item) return err } func (s *Service) CheckOneStatus(ctx context.Context, item db.Source) (string, error) { if strings.TrimSpace(item.APIURL) == "" { return "error", errors.New("source api_url is empty") } timeout := time.Duration(item.TimeoutMS) * time.Millisecond if timeout <= 0 || timeout < 15*time.Second { timeout = 15 * time.Second } ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() method := strings.TrimSpace(item.Method) if method == "" { method = http.MethodGet } req, err := http.NewRequestWithContext(ctx, method, item.APIURL, nil) if err != nil { _ = s.store.RecordSourceCheck(item.ID, "error", 0, err.Error()) return "error", err } redirects := []string{} client := *s.client client.Timeout = timeout client.CheckRedirect = func(next *http.Request, via []*http.Request) error { if next.URL == nil || !isHTTPURL(next.URL) { return errors.New("redirect target must be http or https") } redirects = append(redirects, next.URL.String()) if len(via) >= 5 { return errors.New("too many redirects") } return nil } start := time.Now() resp, err := client.Do(req) latency := int(time.Since(start).Milliseconds()) if err != nil { _ = s.store.RecordSourceCheck(item.ID, "error", latency, err.Error()) return "error", err } defer resp.Body.Close() status := "ok" message := "" if len(redirects) > 0 { status = "redirected" message = healthMetaMessage(map[string]any{ "redirected": true, "redirectCount": len(redirects), "finalUrl": resp.Request.URL.String(), "finalStatus": resp.StatusCode, }) } if resp.StatusCode >= 400 { status = "degraded" message = healthMetaMessage(map[string]any{ "redirected": len(redirects) > 0, "redirectCount": len(redirects), "finalUrl": resp.Request.URL.String(), "finalStatus": resp.StatusCode, "error": resp.Status, }) } if resp.StatusCode < 400 { meta := parseHealthMeta(message) meta["finalUrl"] = resp.Request.URL.String() meta["finalStatus"] = resp.StatusCode if resolution := resolveMediaFromResponse(resp); resolution.URL != "" { meta["resolvedUrl"] = resolution.URL meta["resolvedKey"] = resolution.Key meta["mediaType"] = resolution.MediaType meta["directMedia"] = resolution.Direct } message = healthMetaMessage(meta) } if err := s.store.RecordSourceCheck(item.ID, status, latency, message); err != nil { return status, err } s.emit("source_check.item", map[string]any{"sourceId": item.SourceID, "status": status, "latencyMs": latency}) return status, nil } func (s *Service) saveJob(job CheckJob) { s.mu.Lock() defer s.mu.Unlock() s.jobs[job.ID] = job } func (s *Service) updateJob(id string, mutate func(*CheckJob)) { s.mu.Lock() defer s.mu.Unlock() job := s.jobs[id] mutate(&job) s.jobs[id] = job } func (s *Service) emit(kind string, data map[string]any) { event := Event{Type: kind, Data: data} s.mu.RLock() defer s.mu.RUnlock() for ch := range s.subscribers { select { case ch <- event: default: } } } func newJobID() string { return fmt.Sprintf("check-%d", time.Now().UnixNano()) } func sortJobs(items []CheckJob) { for i := 0; i < len(items); i++ { for j := i + 1; j < len(items); j++ { if items[j].StartedAt > items[i].StartedAt { items[i], items[j] = items[j], items[i] } } } } func isHTTPURL(value *url.URL) bool { scheme := strings.ToLower(value.Scheme) return scheme == "http" || scheme == "https" } func resolveMediaFromResponse(resp *http.Response) mediaResolution { if resp == nil || resp.Request == nil || resp.Request.URL == nil { return mediaResolution{} } finalURL := resp.Request.URL contentType := strings.ToLower(strings.TrimSpace(strings.Split(resp.Header.Get("Content-Type"), ";")[0])) if mediaType := mediaTypeFromContentType(contentType); mediaType != "" || looksLikeMediaURL(finalURL) { return mediaResolution{URL: finalURL.String(), Key: "response", MediaType: firstNonEmpty(mediaType, mediaTypeFromURL(finalURL)), Direct: true} } if !canProbeText(contentType, resp.ContentLength) { return mediaResolution{} } reader := io.LimitReader(resp.Body, maxSourceProbeBytes+1) data, err := io.ReadAll(reader) if err != nil || int64(len(data)) > maxSourceProbeBytes { return mediaResolution{} } text := strings.TrimSpace(string(data)) if text == "" { return mediaResolution{} } var decoded any if json.Unmarshal(data, &decoded) == nil { if candidate, ok := bestJSONMediaCandidate(decoded, finalURL); ok { return candidate.Resolution } } if candidate, ok := bestTextMediaCandidate(text, finalURL); ok { return candidate.Resolution } return mediaResolution{} } func canProbeText(contentType string, length int64) bool { if length > maxSourceProbeBytes { return false } if contentType == "" || strings.Contains(contentType, "json") { return true } return strings.HasPrefix(contentType, "text/") || strings.Contains(contentType, "javascript") || strings.Contains(contentType, "xml") || strings.Contains(contentType, "form") } func bestJSONMediaCandidate(value any, base *url.URL) (mediaCandidate, bool) { candidates := []mediaCandidate{} order := 0 collectJSONMediaCandidates(value, "", base, 0, &order, &candidates) return bestCandidate(candidates) } func collectJSONMediaCandidates(value any, key string, base *url.URL, depth int, order *int, candidates *[]mediaCandidate) { switch typed := value.(type) { case map[string]any: for childKey, childValue := range typed { nextKey := childKey if key != "" { nextKey = key + "." + childKey } collectJSONMediaCandidates(childValue, nextKey, base, depth+1, order, candidates) } case []any: for _, childValue := range typed { collectJSONMediaCandidates(childValue, key, base, depth+1, order, candidates) } case string: *order = *order + 1 if candidate, ok := candidateFromString(key, typed, base, depth, *order); ok { *candidates = append(*candidates, candidate) } } } func bestTextMediaCandidate(text string, base *url.URL) (mediaCandidate, bool) { candidates := []mediaCandidate{} matches := absoluteURLPattern.FindAllString(text, 30) for index, match := range matches { if candidate, ok := candidateFromString("text", strings.TrimRight(match, ".,);]}'\""), base, 0, index+1); ok { candidates = append(candidates, candidate) } } return bestCandidate(candidates) } func candidateFromString(key, value string, base *url.URL, depth, order int) (mediaCandidate, bool) { raw := strings.TrimSpace(value) if raw == "" { return mediaCandidate{}, false } urls := []string{raw} if !strings.Contains(raw, "://") { urls = append(urls, absoluteURLPattern.FindAllString(raw, 10)...) } for _, candidate := range urls { resolved, ok := resolveCandidateURL(candidate, base) if !ok { continue } mediaType := mediaTypeFromURL(resolved) if mediaType == "" { continue } keyScore := mediaKeyScore(key) score := 100 + keyScore - depth return mediaCandidate{ Resolution: mediaResolution{ URL: resolved.String(), Key: key, MediaType: mediaType, }, Score: score, Depth: depth, Order: order, }, true } return mediaCandidate{}, false } func resolveCandidateURL(value string, base *url.URL) (*url.URL, bool) { value = strings.TrimSpace(strings.Trim(value, `"'`)) if value == "" { return nil, false } parsed, err := url.Parse(value) if err != nil { return nil, false } if !parsed.IsAbs() { if base == nil { return nil, false } parsed = base.ResolveReference(parsed) } if !isHTTPURL(parsed) { return nil, false } return parsed, true } func bestCandidate(candidates []mediaCandidate) (mediaCandidate, bool) { if len(candidates) == 0 { return mediaCandidate{}, false } best := candidates[0] for _, candidate := range candidates[1:] { if candidate.Score > best.Score || (candidate.Score == best.Score && candidate.Depth < best.Depth) || (candidate.Score == best.Score && candidate.Depth == best.Depth && candidate.Order < best.Order) { best = candidate } } return best, true } func mediaKeyScore(key string) int { last := key if index := strings.LastIndex(last, "."); index >= 0 { last = last[index+1:] } normalized := strings.ToLower(strings.TrimSpace(last)) switch normalized { case "url", "src", "image", "img", "pic", "cover", "thumbnail", "video", "file", "media": return 80 case "href", "poster", "preview", "download", "play", "audio": return 60 } for _, token := range []string{"url", "src", "image", "img", "pic", "cover", "thumb", "video", "file", "media"} { if strings.Contains(normalized, token) { return 40 } } return 0 } func looksLikeMediaURL(value *url.URL) bool { return mediaTypeFromURL(value) != "" } func mediaTypeFromURL(value *url.URL) string { if value == nil { return "" } extension := strings.ToLower(strings.TrimPrefix(filepath.Ext(value.Path), ".")) switch extension { case "jpg", "jpeg", "png", "webp", "gif", "bmp", "tif", "tiff": return "image" case "mp4", "webm", "m3u8", "mkv", "mov", "m4v", "avi", "wmv": return "video" case "mp3", "wav", "flac", "aac", "m4a", "ogg", "wma": return "audio" default: return "" } } func mediaTypeFromContentType(value string) string { switch { case strings.HasPrefix(value, "image/"): return "image" case strings.HasPrefix(value, "video/") || value == "application/vnd.apple.mpegurl" || value == "application/x-mpegurl": return "video" case strings.HasPrefix(value, "audio/"): return "audio" default: return "" } } func applyResolvedFields(target map[string]any, message string) { meta := parseHealthMeta(message) resolvedURL, _ := meta["resolvedUrl"].(string) resolvedKey, _ := meta["resolvedKey"].(string) mediaType, _ := meta["mediaType"].(string) if strings.TrimSpace(resolvedURL) != "" { target["resolvedUrl"] = resolvedURL target["resolved_url"] = resolvedURL } if strings.TrimSpace(resolvedKey) != "" { target["resolvedKey"] = resolvedKey target["resolved_key"] = resolvedKey } if strings.TrimSpace(mediaType) != "" { target["mediaType"] = mediaType target["media_type"] = mediaType } } func atomicWrite(path string, data []byte) error { dir := filepath.Dir(path) if err := os.MkdirAll(dir, 0o750); err != nil { return err } tmp, err := os.CreateTemp(dir, "."+filepath.Base(path)+".*.tmp") if err != nil { return err } tmpName := tmp.Name() defer os.Remove(tmpName) if _, err := tmp.Write(data); err != nil { _ = tmp.Close() return err } if err := tmp.Close(); err != nil { return err } if err := os.Chmod(tmpName, 0o640); err != nil { return err } return os.Rename(tmpName, path) } func healthMetaMessage(meta map[string]any) string { data, err := json.Marshal(meta) if err != nil { return "" } return string(data) } func parseHealthMeta(message string) map[string]any { var meta map[string]any if strings.TrimSpace(message) == "" || json.Unmarshal([]byte(message), &meta) != nil { return map[string]any{} } return meta } func defaultString(value, fallback string) string { if strings.TrimSpace(value) == "" { return fallback } return value } func legacyEnabled(value *bool) bool { if value == nil { return true } return *value } func maxInt(value, fallback int) int { if value > 0 { return value } return fallback } func firstNonEmpty(values ...string) string { for _, value := range values { if strings.TrimSpace(value) != "" { return strings.TrimSpace(value) } } return "" }