Files
YMhut-box-C-/server/unified-management/internal/db/database_sync.go
T
QWQLwToo 7745e7a2d4
build-winui / winui (push) Has been cancelled
服务端媒体源导入/保存/客户端输出链路修复:支持 snake/camel、subcategories/sources,默认客户端可见,保存后发布兼容 media-types.json。
新增数据库同步 Job API、持久化状态、实时输出、最新任务恢复,以及系统日志聚合接口。
管理端优化:日志中心、运维实时状态框、同步输出自动滚动、仪表盘“输出”列、真实延迟空态、本地 favicon/avatar。
新增 server/unified-management/assets/favicon.ico 和 developer-avatar.png,并接好 /favicon.ico、/admin/favicon.ico、/setup/favicon.ico、/assets/*。
WinUI 随机放映室卡片优先显示子接口原始 Description。
Inno 安装器输出框改为选区末尾 + SendMessage 滚动到底部。
2026-06-29 22:28:58 +08:00

334 lines
13 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package db
import (
"database/sql"
"encoding/json"
"errors"
"fmt"
"os"
"strings"
"time"
)
func (s *Store) CopySQLiteToRemote() (string, error) {
result, err := s.ImportSQLiteToRemote()
return result.FinishedAt, err
}
func (s *Store) CopyRemoteToSQLite() (string, error) {
result, err := s.SyncNow()
return result.FinishedAt, err
}
func (s *Store) ImportSQLiteToRemote() (SyncResult, error) {
return s.RunDatabaseSync("sqlite_to_remote")
}
func (s *Store) SyncNow() (SyncResult, error) {
return s.RunDatabaseSync("remote_to_sqlite")
}
func (s *Store) QueueDatabaseSync(direction string) (DatabaseSyncJob, error) {
normalized, err := normalizeSyncDirection(direction)
if err != nil {
return DatabaseSyncJob{}, err
}
if !s.trySyncLock() {
job, insertErr := s.insertDatabaseSyncJob(normalized, "skipped", []string{"同步任务未启动:已有数据库同步正在执行。"}, []string{"database sync is already running"}, nil, nil, Now())
if insertErr != nil {
return DatabaseSyncJob{}, insertErr
}
return job, errors.New("database sync is already running")
}
job, err := s.insertDatabaseSyncJob(normalized, "running", []string{"同步任务已创建,正在准备数据库连接。"}, nil, nil, map[string]int{}, "")
if err != nil {
s.syncMu.Unlock()
return DatabaseSyncJob{}, err
}
s.setCurrentSyncJob(&job)
go func(jobID int64, jobDirection string) {
defer s.syncMu.Unlock()
_, _ = s.runDatabaseSyncLocked(jobID, jobDirection)
}(job.ID, normalized)
return job, nil
}
func (s *Store) RunDatabaseSync(direction string) (SyncResult, error) {
normalized, err := normalizeSyncDirection(direction)
if err != nil {
return SyncResult{}, err
}
if !s.trySyncLock() {
result := SyncResult{
Direction: normalized,
Status: "skipped",
Skipped: true,
StartedAt: Now(),
FinishedAt: Now(),
Tables: map[string]int{},
Warnings: []string{"database sync is already running"},
Output: []string{"同步任务未启动:已有数据库同步正在执行。"},
}
_, _ = s.insertDatabaseSyncJob(normalized, result.Status, result.Output, result.Warnings, result.Errors, result.Tables, result.FinishedAt)
s.setSyncStatus(result, errors.New("database sync is already running"))
return result, errors.New("database sync is already running")
}
defer s.syncMu.Unlock()
job, err := s.insertDatabaseSyncJob(normalized, "running", []string{"同步任务已创建,正在准备数据库连接。"}, nil, nil, map[string]int{}, "")
if err != nil {
return SyncResult{}, err
}
s.setCurrentSyncJob(&job)
return s.runDatabaseSyncLocked(job.ID, normalized)
}
func (s *Store) runDatabaseSyncLocked(jobID int64, direction string) (SyncResult, error) {
started := Now()
if job, err := s.GetDatabaseSyncJob(jobID); err == nil && job.StartedAt != "" {
started = job.StartedAt
}
result := SyncResult{
Direction: direction,
Status: "completed",
StartedAt: started,
Tables: map[string]int{},
Output: []string{"同步开始:" + directionLabel(direction)},
}
_ = s.updateDatabaseSyncJob(jobID, "running", result.Output, result.Warnings, result.Errors, result.Tables, "")
s.mu.RLock()
remote := s.remoteDB
remoteDialect := s.remoteDialect
local := s.localDB
localDialect := s.localDialect
s.mu.RUnlock()
if remote == nil {
result.Status = "skipped"
result.Skipped = true
result.Warnings = append(result.Warnings, "remote database is not configured")
result.Output = append(result.Output, "远程 MySQL 未配置或不可用,未执行数据复制。")
result.FinishedAt = Now()
_ = s.updateDatabaseSyncJob(jobID, result.Status, result.Output, result.Warnings, result.Errors, result.Tables, result.FinishedAt)
s.setSyncStatus(result, nil)
return result, nil
}
src, srcDialect, dst, dstDialect := local, localDialect, remote, remoteDialect
if direction == "remote_to_sqlite" {
src, srcDialect, dst, dstDialect = remote, remoteDialect, local, localDialect
}
err := copyAllTablesWithProgress(src, srcDialect, dst, dstDialect, direction, func(table string, count int) {
result.Tables[table] = count
result.Output = append(result.Output, fmt.Sprintf("%s%d 条", table, count))
_ = s.updateDatabaseSyncJob(jobID, "running", result.Output, result.Warnings, result.Errors, result.Tables, "")
})
result.FinishedAt = Now()
if err != nil {
result.Status = "failed"
result.Errors = append(result.Errors, err.Error())
result.Output = append(result.Output, "同步失败:"+err.Error())
_ = s.updateDatabaseSyncJob(jobID, result.Status, result.Output, result.Warnings, result.Errors, result.Tables, result.FinishedAt)
s.setSyncStatus(result, err)
return result, err
}
result.Output = append(result.Output, "同步完成:"+directionLabel(direction))
_ = s.updateDatabaseSyncJob(jobID, result.Status, result.Output, result.Warnings, result.Errors, result.Tables, result.FinishedAt)
s.setSyncStatus(result, nil)
return result, nil
}
func (s *Store) setSyncStatus(result SyncResult, err error) {
s.mu.Lock()
defer s.mu.Unlock()
s.status.CurrentSyncJob = nil
if err != nil {
s.status.LastSyncAt = result.FinishedAt
s.status.LastSyncError = err.Error()
return
}
s.status.LastSyncAt = result.FinishedAt
s.status.LastSyncError = ""
}
func (s *Store) trySyncLock() bool {
return s.syncMu.TryLock()
}
func (s *Store) setCurrentSyncJob(job *DatabaseSyncJob) {
s.mu.Lock()
defer s.mu.Unlock()
s.status.CurrentSyncJob = job
}
func normalizeSyncDirection(value string) (string, error) {
switch strings.ToLower(strings.TrimSpace(value)) {
case "sqlite_to_remote", "import", "sqlite_to_mysql":
return "sqlite_to_remote", nil
case "remote_to_sqlite", "sync", "mysql_to_sqlite":
return "remote_to_sqlite", nil
default:
return "", errors.New("unsupported database sync direction")
}
}
func directionLabel(direction string) string {
if direction == "sqlite_to_remote" {
return "SQLite -> MySQL"
}
if direction == "remote_to_sqlite" {
return "MySQL -> SQLite"
}
return direction
}
type tableSpec struct {
Name string
Columns []string
Conflict []string
}
var syncTables = []tableSpec{
{"schema_migrations", []string{"version", "applied_at", "description"}, []string{"version"}},
{"admin_users", []string{"id", "username", "password_hash", "password_changed", "created_at", "updated_at"}, []string{"id"}},
{"release_packages", []string{"id", "product", "version", "platform", "arch", "file_name", "url", "sha256", "size_bytes", "enabled", "created_at", "updated_at"}, []string{"id"}},
{"release_notices", []string{"id", "version", "build", "channel", "title", "message", "release_notes", "message_md", "release_notes_md", "download_url", "notice_file", "raw_json", "published_at", "created_at", "updated_at"}, []string{"id"}},
{"release_notice_revisions", []string{"id", "version", "raw_json", "note", "created_by", "created_at"}, []string{"id"}},
{"feedback_tickets", []string{"code", "title", "type", "severity", "category", "priority", "contact", "body", "status", "status_detail", "public_reply", "note", "assignee", "handled_by", "due_at", "resolved_at", "archived_at", "sla_level", "source_channel", "risk_score", "resolution", "attachment", "package_path", "encrypted_package_path", "package_sha256", "plain_package_sha256", "summary_text", "included_files", "mail_sent", "remote_addr", "tags", "created_at", "updated_at", "last_activity_at"}, []string{"code"}},
{"feedback_comments", []string{"id", "feedback_code", "author", "body", "internal", "created_at"}, []string{"id"}},
{"feedback_attachments", []string{"id", "feedback_code", "kind", "path", "file_name", "sha256", "size_bytes", "created_at"}, []string{"id"}},
{"feedback_events", []string{"id", "feedback_code", "event_type", "actor", "from_value", "to_value", "message", "created_at"}, []string{"id"}},
{"feedback_tags", []string{"feedback_code", "tag", "created_at"}, []string{"feedback_code", "tag"}},
{"mail_records", []string{"id", "feedback_code", "kind", "status", "to_address", "subject", "plain_body", "html_body", "attachment_path", "attachment_name", "error_message", "created_at", "sent_at"}, []string{"id"}},
{"source_categories", []string{"id", "category_id", "name", "enabled", "ui_config", "created_at", "updated_at"}, []string{"id"}},
{"source_endpoints", []string{"id", "category_id", "category_name", "source_id", "name", "description", "method", "api_url", "url_template", "thumbnail_url", "proxy_mode", "timeout_ms", "retry_count", "cache_seconds", "check_interval_sec", "enabled", "client_visible", "supported_formats", "last_status", "last_latency_ms", "last_checked_at", "last_error", "consecutive_failure", "created_at", "updated_at"}, []string{"id"}},
{"endpoint_health_checks", []string{"id", "source_db_id", "status", "latency_ms", "error", "checked_at"}, []string{"id"}},
{"endpoint_call_logs", []string{"id", "source_id", "status", "latency_ms", "error", "client", "created_at"}, []string{"id"}},
{"database_sync_jobs", []string{"id", "direction", "status", "message", "tables_json", "started_at", "finished_at"}, []string{"id"}},
{"system_settings", []string{"key", "value", "updated_at"}, []string{"key"}},
{"audit_logs", []string{"id", "actor", "type", "target", "message", "ip", "user_agent", "created_at"}, []string{"id"}},
{"legacy_json_revisions", []string{"id", "name", "raw", "note", "created_by", "created_at"}, []string{"id"}},
{"webhook_deliveries", []string{"id", "webhook_name", "event", "status", "attempts", "response_code", "error_message", "payload_sha256", "created_at", "finished_at"}, []string{"id"}},
{"legacy_sync_jobs", []string{"id", "status", "summary", "stats_json", "started_at", "finished_at"}, []string{"id"}},
}
func copyAllTables(src *sql.DB, srcDialect dialect, dst *sql.DB, dstDialect dialect, direction string) (SyncResult, error) {
result := SyncResult{Direction: direction, Status: "completed", Tables: map[string]int{}, FinishedAt: Now()}
err := copyAllTablesWithProgress(src, srcDialect, dst, dstDialect, direction, func(table string, count int) {
result.Tables[table] = count
})
if err != nil {
result.Status = "failed"
result.FinishedAt = Now()
return result, err
}
result.FinishedAt = Now()
return result, nil
}
func copyAllTablesWithProgress(src *sql.DB, srcDialect dialect, dst *sql.DB, dstDialect dialect, direction string, progress func(table string, count int)) error {
for _, table := range syncTables {
count, err := copyTable(src, srcDialect, dst, dstDialect, table)
if err != nil {
return err
}
if progress != nil {
progress(table.Name, count)
}
}
return nil
}
func copyTable(src *sql.DB, srcDialect dialect, dst *sql.DB, dstDialect dialect, spec tableSpec) (int, error) {
rows, err := src.Query(srcDialect.rebind("SELECT " + srcDialect.columnList(spec.Columns) + " FROM " + spec.Name))
if err != nil {
return 0, err
}
defer rows.Close()
insertSQL := dstDialect.rebind(dstDialect.upsert(spec.Name, spec.Columns, spec.Conflict))
count := 0
for rows.Next() {
values := make([]any, len(spec.Columns))
ptrs := make([]any, len(spec.Columns))
for index := range values {
ptrs[index] = &values[index]
}
if err := rows.Scan(ptrs...); err != nil {
return count, err
}
for index, value := range values {
if bytes, ok := value.([]byte); ok {
values[index] = string(bytes)
}
}
if _, err := dst.Exec(insertSQL, values...); err != nil {
return count, err
}
count++
}
return count, rows.Err()
}
func readPrototypeState(path string) (*state, error) {
data, err := os.ReadFile(path)
if errors.Is(err, os.ErrNotExist) || len(data) == 0 {
return nil, nil
}
if err != nil {
return nil, err
}
trimmed := strings.TrimSpace(string(data))
if !strings.HasPrefix(trimmed, "{") {
return nil, nil
}
var prototype state
if err := json.Unmarshal(data, &prototype); err != nil {
return nil, fmt.Errorf("existing sqlite path is not a valid sqlite database or JSON prototype: %w", err)
}
return &prototype, nil
}
func backupPrototypeFile(path string) error {
data, err := os.ReadFile(path)
if errors.Is(err, os.ErrNotExist) || len(data) == 0 {
return nil
}
if err != nil {
return err
}
if !strings.HasPrefix(strings.TrimSpace(string(data)), "{") {
return nil
}
backup := path + ".json-prototype-" + time.Now().UTC().Format("20060102-150405") + ".bak"
if err := os.WriteFile(backup, data, 0o640); err != nil {
return err
}
return os.Remove(path)
}
func (s *Store) importPrototype(prototype state) error {
for _, admin := range prototype.Admins {
if admin.CreatedAt == "" {
admin.CreatedAt = Now()
}
if admin.UpdatedAt == "" {
admin.UpdatedAt = admin.CreatedAt
}
_, _ = s.exec(`INSERT INTO admin_users (id, username, password_hash, password_changed, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)`,
admin.ID, admin.Username, admin.PasswordHash, boolInt(admin.PasswordChanged), admin.CreatedAt, admin.UpdatedAt)
}
for _, item := range prototype.Feedbacks {
_ = s.InsertFeedback(item)
}
for _, item := range prototype.Sources {
_, _ = s.UpsertSource(item)
}
for _, item := range prototype.SourceChecks {
_ = s.RecordSourceCheck(item.SourceID, item.Status, item.LatencyMS, item.Error)
}
for _, item := range prototype.SourceCalls {
_ = s.RecordSourceCall(item)
}
for _, item := range prototype.AuditLogs {
_ = s.InsertAudit(item)
}
return nil
}