@@ -0,0 +1,120 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type SyncResult struct {
|
||||
Direction string `json:"direction"`
|
||||
Tables map[string]int `json:"tables"`
|
||||
FinishedAt string `json:"finishedAt"`
|
||||
}
|
||||
|
||||
type tableSpec struct {
|
||||
Name string
|
||||
Columns []string
|
||||
Conflict []string
|
||||
}
|
||||
|
||||
var syncTables = []tableSpec{
|
||||
{"feedbacks", []string{"code", "received_at", "title", "type", "severity", "category", "priority", "contact", "body", "status", "status_detail", "note", "public_reply", "handled_by", "assignee", "due_at", "resolved_at", "archived_at", "sla_level", "source_channel", "risk_score", "resolution", "package_path", "encrypted_package_path", "package_sha256", "plain_package_sha256", "remote_addr", "summary_text", "included_files", "mail_sent", "updated_at", "last_activity_at"}, []string{"code"}},
|
||||
{"mail_records", []string{"id", "feedback_code", "kind", "status", "to_address", "subject", "plain_body", "html_body", "attachment_path", "attachment_name", "error_message", "created_at", "sent_at"}, []string{"id"}},
|
||||
{"feedback_events", []string{"id", "feedback_code", "event_type", "actor", "from_value", "to_value", "message", "created_at"}, []string{"id"}},
|
||||
{"feedback_comments", []string{"id", "feedback_code", "author", "body", "internal", "created_at"}, []string{"id"}},
|
||||
{"feedback_tags", []string{"feedback_code", "tag", "created_at"}, []string{"feedback_code", "tag"}},
|
||||
{"audit_logs", []string{"id", "actor", "type", "target", "message", "ip", "user_agent", "created_at"}, []string{"id"}},
|
||||
{"webhook_deliveries", []string{"id", "webhook_name", "event", "status", "attempts", "response_code", "error_message", "payload_sha256", "created_at", "finished_at"}, []string{"id"}},
|
||||
}
|
||||
|
||||
func (s *Store) ImportSQLiteToRemote() (SyncResult, error) {
|
||||
s.mu.RLock()
|
||||
remote := s.remoteDB
|
||||
remoteDialect := s.remoteDialect
|
||||
local := s.localDB
|
||||
localDialect := s.localDialect
|
||||
s.mu.RUnlock()
|
||||
if remote == nil {
|
||||
return SyncResult{}, fmt.Errorf("remote database is not configured")
|
||||
}
|
||||
result, err := copyAllTables(local, localDialect, remote, remoteDialect, "sqlite_to_remote")
|
||||
s.setSyncStatus(result, err)
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (s *Store) SyncNow() (SyncResult, error) {
|
||||
return s.syncRemoteToSQLite()
|
||||
}
|
||||
|
||||
func (s *Store) syncRemoteToSQLite() (SyncResult, error) {
|
||||
s.mu.RLock()
|
||||
remote := s.remoteDB
|
||||
remoteDialect := s.remoteDialect
|
||||
local := s.localDB
|
||||
localDialect := s.localDialect
|
||||
s.mu.RUnlock()
|
||||
if remote == nil {
|
||||
return SyncResult{}, nil
|
||||
}
|
||||
result, err := copyAllTables(remote, remoteDialect, local, localDialect, "remote_to_sqlite")
|
||||
s.setSyncStatus(result, err)
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (s *Store) setSyncStatus(result SyncResult, err error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if err != nil {
|
||||
s.status.LastSyncError = err.Error()
|
||||
return
|
||||
}
|
||||
if result.FinishedAt != "" {
|
||||
s.status.LastSyncAt = result.FinishedAt
|
||||
}
|
||||
s.status.LastSyncError = ""
|
||||
}
|
||||
|
||||
func copyAllTables(src *sql.DB, srcDialect dialect, dst *sql.DB, dstDialect dialect, direction string) (SyncResult, error) {
|
||||
result := SyncResult{Direction: direction, Tables: map[string]int{}, FinishedAt: Now()}
|
||||
for _, table := range syncTables {
|
||||
copied, err := copyTable(src, srcDialect, dst, dstDialect, table)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
result.Tables[table.Name] = copied
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func copyTable(src *sql.DB, srcDialect dialect, dst *sql.DB, dstDialect dialect, spec tableSpec) (int, error) {
|
||||
selectSQL := "SELECT " + strings.Join(spec.Columns, ", ") + " FROM " + spec.Name
|
||||
rows, err := src.Query(srcDialect.rebind(selectSQL))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
insertSQL := dstDialect.rebind(dstDialect.upsert(spec.Name, spec.Columns, spec.Conflict))
|
||||
count := 0
|
||||
for rows.Next() {
|
||||
values := make([]any, len(spec.Columns))
|
||||
ptrs := make([]any, len(spec.Columns))
|
||||
for index := range values {
|
||||
ptrs[index] = &values[index]
|
||||
}
|
||||
if err := rows.Scan(ptrs...); err != nil {
|
||||
return count, err
|
||||
}
|
||||
for index, value := range values {
|
||||
if bytes, ok := value.([]byte); ok {
|
||||
values[index] = string(bytes)
|
||||
}
|
||||
}
|
||||
if _, err := dst.Exec(insertSQL, values...); err != nil {
|
||||
return count, err
|
||||
}
|
||||
count++
|
||||
}
|
||||
return count, rows.Err()
|
||||
}
|
||||
Reference in New Issue
Block a user