package db import ( "database/sql" "encoding/json" "errors" "fmt" "os" "strings" "time" ) func (s *Store) CopySQLiteToRemote() (string, error) { result, err := s.ImportSQLiteToRemote() return result.FinishedAt, err } func (s *Store) CopyRemoteToSQLite() (string, error) { result, err := s.SyncNow() return result.FinishedAt, err } func (s *Store) ImportSQLiteToRemote() (SyncResult, error) { return s.RunDatabaseSync("sqlite_to_remote") } func (s *Store) SyncNow() (SyncResult, error) { return s.RunDatabaseSync("remote_to_sqlite") } func (s *Store) QueueDatabaseSync(direction string) (DatabaseSyncJob, error) { normalized, err := normalizeSyncDirection(direction) if err != nil { return DatabaseSyncJob{}, err } if !s.trySyncLock() { job, insertErr := s.insertDatabaseSyncJob(normalized, "skipped", []string{"同步任务未启动:已有数据库同步正在执行。"}, []string{"database sync is already running"}, nil, nil, Now()) if insertErr != nil { return DatabaseSyncJob{}, insertErr } return job, errors.New("database sync is already running") } job, err := s.insertDatabaseSyncJob(normalized, "running", []string{"同步任务已创建,正在准备数据库连接。"}, nil, nil, map[string]int{}, "") if err != nil { s.syncMu.Unlock() return DatabaseSyncJob{}, err } s.setCurrentSyncJob(&job) go func(jobID int64, jobDirection string) { defer s.syncMu.Unlock() _, _ = s.runDatabaseSyncLocked(jobID, jobDirection) }(job.ID, normalized) return job, nil } func (s *Store) RunDatabaseSync(direction string) (SyncResult, error) { normalized, err := normalizeSyncDirection(direction) if err != nil { return SyncResult{}, err } if !s.trySyncLock() { result := SyncResult{ Direction: normalized, Status: "skipped", Skipped: true, StartedAt: Now(), FinishedAt: Now(), Tables: map[string]int{}, Warnings: []string{"database sync is already running"}, Output: []string{"同步任务未启动:已有数据库同步正在执行。"}, } _, _ = s.insertDatabaseSyncJob(normalized, result.Status, result.Output, result.Warnings, result.Errors, result.Tables, result.FinishedAt) s.setSyncStatus(result, errors.New("database sync is already running")) return result, errors.New("database sync is already running") } defer s.syncMu.Unlock() job, err := s.insertDatabaseSyncJob(normalized, "running", []string{"同步任务已创建,正在准备数据库连接。"}, nil, nil, map[string]int{}, "") if err != nil { return SyncResult{}, err } s.setCurrentSyncJob(&job) return s.runDatabaseSyncLocked(job.ID, normalized) } func (s *Store) runDatabaseSyncLocked(jobID int64, direction string) (SyncResult, error) { started := Now() if job, err := s.GetDatabaseSyncJob(jobID); err == nil && job.StartedAt != "" { started = job.StartedAt } result := SyncResult{ Direction: direction, Status: "completed", StartedAt: started, Tables: map[string]int{}, Output: []string{"同步开始:" + directionLabel(direction)}, } _ = s.updateDatabaseSyncJob(jobID, "running", result.Output, result.Warnings, result.Errors, result.Tables, "") s.mu.RLock() remote := s.remoteDB remoteDialect := s.remoteDialect local := s.localDB localDialect := s.localDialect s.mu.RUnlock() if remote == nil { result.Status = "skipped" result.Skipped = true result.Warnings = append(result.Warnings, "remote database is not configured") result.Output = append(result.Output, "远程 MySQL 未配置或不可用,未执行数据复制。") result.FinishedAt = Now() _ = s.updateDatabaseSyncJob(jobID, result.Status, result.Output, result.Warnings, result.Errors, result.Tables, result.FinishedAt) s.setSyncStatus(result, nil) return result, nil } src, srcDialect, dst, dstDialect := local, localDialect, remote, remoteDialect if direction == "remote_to_sqlite" { src, srcDialect, dst, dstDialect = remote, remoteDialect, local, localDialect } err := copyAllTablesWithProgress(src, srcDialect, dst, dstDialect, direction, func(table string, count int) { result.Tables[table] = count result.Output = append(result.Output, fmt.Sprintf("%s:%d 条", table, count)) _ = s.updateDatabaseSyncJob(jobID, "running", result.Output, result.Warnings, result.Errors, result.Tables, "") }) result.FinishedAt = Now() if err != nil { result.Status = "failed" result.Errors = append(result.Errors, err.Error()) result.Output = append(result.Output, "同步失败:"+err.Error()) _ = s.updateDatabaseSyncJob(jobID, result.Status, result.Output, result.Warnings, result.Errors, result.Tables, result.FinishedAt) s.setSyncStatus(result, err) return result, err } result.Output = append(result.Output, "同步完成:"+directionLabel(direction)) _ = s.updateDatabaseSyncJob(jobID, result.Status, result.Output, result.Warnings, result.Errors, result.Tables, result.FinishedAt) s.setSyncStatus(result, nil) return result, nil } func (s *Store) setSyncStatus(result SyncResult, err error) { s.mu.Lock() defer s.mu.Unlock() s.status.CurrentSyncJob = nil if err != nil { s.status.LastSyncAt = result.FinishedAt s.status.LastSyncError = err.Error() return } s.status.LastSyncAt = result.FinishedAt s.status.LastSyncError = "" } func (s *Store) trySyncLock() bool { return s.syncMu.TryLock() } func (s *Store) setCurrentSyncJob(job *DatabaseSyncJob) { s.mu.Lock() defer s.mu.Unlock() s.status.CurrentSyncJob = job } func normalizeSyncDirection(value string) (string, error) { switch strings.ToLower(strings.TrimSpace(value)) { case "sqlite_to_remote", "import", "sqlite_to_mysql": return "sqlite_to_remote", nil case "remote_to_sqlite", "sync", "mysql_to_sqlite": return "remote_to_sqlite", nil default: return "", errors.New("unsupported database sync direction") } } func directionLabel(direction string) string { if direction == "sqlite_to_remote" { return "SQLite -> MySQL" } if direction == "remote_to_sqlite" { return "MySQL -> SQLite" } return direction } type tableSpec struct { Name string Columns []string Conflict []string } var syncTables = []tableSpec{ {"schema_migrations", []string{"version", "applied_at", "description"}, []string{"version"}}, {"admin_users", []string{"id", "username", "password_hash", "password_changed", "created_at", "updated_at"}, []string{"id"}}, {"release_packages", []string{"id", "product", "version", "platform", "arch", "file_name", "url", "sha256", "size_bytes", "enabled", "created_at", "updated_at"}, []string{"id"}}, {"release_notices", []string{"id", "version", "build", "channel", "title", "message", "release_notes", "message_md", "release_notes_md", "download_url", "notice_file", "raw_json", "published_at", "created_at", "updated_at"}, []string{"id"}}, {"release_notice_revisions", []string{"id", "version", "raw_json", "note", "created_by", "created_at"}, []string{"id"}}, {"feedback_tickets", []string{"code", "title", "type", "severity", "category", "priority", "contact", "body", "status", "status_detail", "public_reply", "note", "assignee", "handled_by", "due_at", "resolved_at", "archived_at", "sla_level", "source_channel", "risk_score", "resolution", "attachment", "package_path", "encrypted_package_path", "package_sha256", "plain_package_sha256", "summary_text", "included_files", "mail_sent", "remote_addr", "tags", "created_at", "updated_at", "last_activity_at"}, []string{"code"}}, {"feedback_comments", []string{"id", "feedback_code", "author", "body", "internal", "created_at"}, []string{"id"}}, {"feedback_attachments", []string{"id", "feedback_code", "kind", "path", "file_name", "sha256", "size_bytes", "created_at"}, []string{"id"}}, {"feedback_events", []string{"id", "feedback_code", "event_type", "actor", "from_value", "to_value", "message", "created_at"}, []string{"id"}}, {"feedback_tags", []string{"feedback_code", "tag", "created_at"}, []string{"feedback_code", "tag"}}, {"mail_records", []string{"id", "feedback_code", "kind", "status", "to_address", "subject", "plain_body", "html_body", "attachment_path", "attachment_name", "error_message", "created_at", "sent_at"}, []string{"id"}}, {"source_categories", []string{"id", "category_id", "name", "enabled", "ui_config", "created_at", "updated_at"}, []string{"id"}}, {"source_endpoints", []string{"id", "category_id", "category_name", "source_id", "name", "description", "method", "api_url", "url_template", "thumbnail_url", "proxy_mode", "timeout_ms", "retry_count", "cache_seconds", "check_interval_sec", "enabled", "client_visible", "supported_formats", "last_status", "last_latency_ms", "last_checked_at", "last_error", "consecutive_failure", "created_at", "updated_at"}, []string{"id"}}, {"endpoint_health_checks", []string{"id", "source_db_id", "status", "latency_ms", "error", "checked_at"}, []string{"id"}}, {"endpoint_call_logs", []string{"id", "source_id", "status", "latency_ms", "error", "client", "created_at"}, []string{"id"}}, {"database_sync_jobs", []string{"id", "direction", "status", "message", "tables_json", "started_at", "finished_at"}, []string{"id"}}, {"system_settings", []string{"key", "value", "updated_at"}, []string{"key"}}, {"audit_logs", []string{"id", "actor", "type", "target", "message", "ip", "user_agent", "created_at"}, []string{"id"}}, {"legacy_json_revisions", []string{"id", "name", "raw", "note", "created_by", "created_at"}, []string{"id"}}, {"webhook_deliveries", []string{"id", "webhook_name", "event", "status", "attempts", "response_code", "error_message", "payload_sha256", "created_at", "finished_at"}, []string{"id"}}, {"legacy_sync_jobs", []string{"id", "status", "summary", "stats_json", "started_at", "finished_at"}, []string{"id"}}, } func copyAllTables(src *sql.DB, srcDialect dialect, dst *sql.DB, dstDialect dialect, direction string) (SyncResult, error) { result := SyncResult{Direction: direction, Status: "completed", Tables: map[string]int{}, FinishedAt: Now()} err := copyAllTablesWithProgress(src, srcDialect, dst, dstDialect, direction, func(table string, count int) { result.Tables[table] = count }) if err != nil { result.Status = "failed" result.FinishedAt = Now() return result, err } result.FinishedAt = Now() return result, nil } func copyAllTablesWithProgress(src *sql.DB, srcDialect dialect, dst *sql.DB, dstDialect dialect, direction string, progress func(table string, count int)) error { for _, table := range syncTables { count, err := copyTable(src, srcDialect, dst, dstDialect, table) if err != nil { return err } if progress != nil { progress(table.Name, count) } } return nil } func copyTable(src *sql.DB, srcDialect dialect, dst *sql.DB, dstDialect dialect, spec tableSpec) (int, error) { rows, err := src.Query(srcDialect.rebind("SELECT " + srcDialect.columnList(spec.Columns) + " FROM " + spec.Name)) if err != nil { return 0, err } defer rows.Close() insertSQL := dstDialect.rebind(dstDialect.upsert(spec.Name, spec.Columns, spec.Conflict)) count := 0 for rows.Next() { values := make([]any, len(spec.Columns)) ptrs := make([]any, len(spec.Columns)) for index := range values { ptrs[index] = &values[index] } if err := rows.Scan(ptrs...); err != nil { return count, err } for index, value := range values { if bytes, ok := value.([]byte); ok { values[index] = string(bytes) } } if _, err := dst.Exec(insertSQL, values...); err != nil { return count, err } count++ } return count, rows.Err() } func readPrototypeState(path string) (*state, error) { data, err := os.ReadFile(path) if errors.Is(err, os.ErrNotExist) || len(data) == 0 { return nil, nil } if err != nil { return nil, err } trimmed := strings.TrimSpace(string(data)) if !strings.HasPrefix(trimmed, "{") { return nil, nil } var prototype state if err := json.Unmarshal(data, &prototype); err != nil { return nil, fmt.Errorf("existing sqlite path is not a valid sqlite database or JSON prototype: %w", err) } return &prototype, nil } func backupPrototypeFile(path string) error { data, err := os.ReadFile(path) if errors.Is(err, os.ErrNotExist) || len(data) == 0 { return nil } if err != nil { return err } if !strings.HasPrefix(strings.TrimSpace(string(data)), "{") { return nil } backup := path + ".json-prototype-" + time.Now().UTC().Format("20060102-150405") + ".bak" if err := os.WriteFile(backup, data, 0o640); err != nil { return err } return os.Remove(path) } func (s *Store) importPrototype(prototype state) error { for _, admin := range prototype.Admins { if admin.CreatedAt == "" { admin.CreatedAt = Now() } if admin.UpdatedAt == "" { admin.UpdatedAt = admin.CreatedAt } _, _ = s.exec(`INSERT INTO admin_users (id, username, password_hash, password_changed, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)`, admin.ID, admin.Username, admin.PasswordHash, boolInt(admin.PasswordChanged), admin.CreatedAt, admin.UpdatedAt) } for _, item := range prototype.Feedbacks { _ = s.InsertFeedback(item) } for _, item := range prototype.Sources { _, _ = s.UpsertSource(item) } for _, item := range prototype.SourceChecks { _ = s.RecordSourceCheck(item.SourceID, item.Status, item.LatencyMS, item.Error) } for _, item := range prototype.SourceCalls { _ = s.RecordSourceCall(item) } for _, item := range prototype.AuditLogs { _ = s.InsertAudit(item) } return nil }