Files
YMhut-box-C-/server/unified-management/internal/db/store.go
T
admin_gitea f00124c1c0
build-winui / winui (push) Waiting to run
更新客户端适配性问题
2026-06-28 08:56:45 +08:00

333 lines
7.4 KiB
Go

package db
import (
"context"
"database/sql"
"os"
"path/filepath"
"strings"
"sync"
"time"
"ymhut-box/server/unified-management/internal/config"
)
type Store struct {
mu sync.RWMutex
syncMu sync.Mutex
cfg *config.Config
path string
db *sql.DB
dialect dialect
localDB *sql.DB
localDialect dialect
remoteDB *sql.DB
remoteDialect dialect
status DatabaseStatus
stop chan struct{}
}
func Open(cfg *config.Config) (*Store, error) {
path := cfg.Database.SQLitePath
if strings.TrimSpace(path) == "" {
path = filepath.Join(cfg.StorageDir, "unified.sqlite")
}
if err := os.MkdirAll(filepath.Dir(path), 0o750); err != nil {
return nil, err
}
if err := os.MkdirAll(cfg.StorageDir, 0o750); err != nil {
return nil, err
}
prototype, err := readPrototypeState(path)
if err != nil {
return nil, err
}
if prototype != nil {
if err := backupPrototypeFile(path); err != nil {
return nil, err
}
}
localCfg := cfg.Database
localCfg.Provider = "sqlite"
localCfg.SQLitePath = path
local, localDialect, err := openSQLDatabase(localCfg)
if err != nil {
return nil, err
}
local.SetMaxOpenConns(1)
store := &Store{
cfg: cfg,
path: path,
db: local,
dialect: localDialect,
localDB: local,
localDialect: localDialect,
stop: make(chan struct{}),
status: DatabaseStatus{
ActiveProvider: "sqlite",
ConfigProvider: cfg.Database.Provider,
SchemaVersion: CurrentSchemaVersion,
SQLiteReady: true,
LastRecoveredAt: Now(),
},
}
if err := store.migrate(local, localDialect); err != nil {
_ = local.Close()
return nil, err
}
if prototype != nil {
if err := store.importPrototype(*prototype); err != nil {
_ = local.Close()
return nil, err
}
}
if strings.EqualFold(cfg.Database.Provider, "mysql") {
if err := store.openRemote(); err != nil {
store.markFailover(err)
}
}
go store.maintain()
return store, nil
}
func (s *Store) Close() error {
close(s.stop)
s.mu.Lock()
defer s.mu.Unlock()
var err error
if s.remoteDB != nil && s.remoteDB != s.localDB {
err = s.remoteDB.Close()
}
if s.localDB != nil {
if closeErr := s.localDB.Close(); err == nil {
err = closeErr
}
}
return err
}
func (s *Store) Status() DatabaseStatus {
s.mu.RLock()
defer s.mu.RUnlock()
return s.status
}
func (s *Store) Path() string {
return s.path
}
func (s *Store) ReconfigureDatabase(cfg *config.Config) error {
if cfg == nil {
return nil
}
path := cfg.Database.SQLitePath
if strings.TrimSpace(path) == "" {
path = filepath.Join(cfg.StorageDir, "unified.sqlite")
}
if err := os.MkdirAll(filepath.Dir(path), 0o750); err != nil {
return err
}
if err := os.MkdirAll(cfg.StorageDir, 0o750); err != nil {
return err
}
localCfg := cfg.Database
localCfg.Provider = "sqlite"
localCfg.SQLitePath = path
local, localDialect, err := openSQLDatabase(localCfg)
if err != nil {
return err
}
local.SetMaxOpenConns(1)
if err := s.migrate(local, localDialect); err != nil {
_ = local.Close()
return err
}
var remote *sql.DB
var remoteDialect dialect
if strings.EqualFold(cfg.Database.Provider, "mysql") {
remote, remoteDialect, err = openSQLDatabase(cfg.Database)
if err != nil {
_ = local.Close()
return err
}
if err := s.migrate(remote, remoteDialect); err != nil {
_ = remote.Close()
_ = local.Close()
return err
}
}
s.mu.Lock()
oldLocal := s.localDB
oldRemote := s.remoteDB
s.cfg.Database = cfg.Database
s.path = path
s.localDB = local
s.localDialect = localDialect
s.remoteDB = remote
s.remoteDialect = remoteDialect
s.status.ConfigProvider = cfg.Database.Provider
s.status.SchemaVersion = CurrentSchemaVersion
s.status.SQLiteReady = true
s.status.RemoteReady = remote != nil
s.status.LastError = ""
s.status.FailoverActive = false
if remote != nil {
s.db = remote
s.dialect = remoteDialect
s.status.ActiveProvider = "mysql"
} else {
s.db = local
s.dialect = localDialect
s.status.ActiveProvider = "sqlite"
}
s.status.LastRecoveredAt = Now()
s.mu.Unlock()
if oldRemote != nil && oldRemote != oldLocal && oldRemote != remote {
_ = oldRemote.Close()
}
if oldLocal != nil && oldLocal != local {
_ = oldLocal.Close()
}
return nil
}
func (s *Store) active() (*sql.DB, dialect) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.db, s.dialect
}
func (s *Store) exec(query string, args ...any) (sql.Result, error) {
conn, d := s.active()
result, err := conn.Exec(d.rebind(query), args...)
if err != nil {
s.markFailover(err)
conn, d = s.active()
if conn != nil {
result, err = conn.Exec(d.rebind(query), args...)
}
}
return result, err
}
func (s *Store) query(query string, args ...any) (*sql.Rows, error) {
conn, d := s.active()
rows, err := conn.Query(d.rebind(query), args...)
if err != nil {
s.markFailover(err)
conn, d = s.active()
if conn != nil {
rows, err = conn.Query(d.rebind(query), args...)
}
}
return rows, err
}
func (s *Store) queryRow(query string, args ...any) *sql.Row {
conn, d := s.active()
return conn.QueryRow(d.rebind(query), args...)
}
func (s *Store) insertID(query string, args ...any) (int64, error) {
result, err := s.exec(query, args...)
if err != nil {
return 0, err
}
return result.LastInsertId()
}
func (s *Store) maintain() {
ticker := time.NewTicker(time.Duration(s.cfg.Database.HealthIntervalSec) * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.checkRemote()
case <-s.stop:
return
}
}
}
func (s *Store) openRemote() error {
if !strings.EqualFold(s.cfg.Database.Provider, "mysql") {
return nil
}
remote, remoteDialect, err := openSQLDatabase(s.cfg.Database)
if err != nil {
return err
}
if err := s.migrate(remote, remoteDialect); err != nil {
_ = remote.Close()
return err
}
s.mu.Lock()
if s.remoteDB != nil && s.remoteDB != s.localDB && s.remoteDB != remote {
_ = s.remoteDB.Close()
}
s.remoteDB = remote
s.remoteDialect = remoteDialect
s.db = remote
s.dialect = remoteDialect
s.status.ActiveProvider = "mysql"
s.status.ConfigProvider = "mysql"
s.status.SchemaVersion = CurrentSchemaVersion
s.status.RemoteReady = true
s.status.FailoverActive = false
s.status.LastError = ""
s.status.LastRecoveredAt = Now()
s.mu.Unlock()
return nil
}
func (s *Store) checkRemote() {
if !strings.EqualFold(s.cfg.Database.Provider, "mysql") {
return
}
s.mu.RLock()
remote := s.remoteDB
s.mu.RUnlock()
if remote == nil {
if err := s.openRemote(); err != nil {
s.markFailover(err)
}
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err := remote.PingContext(ctx)
cancel()
if err != nil {
s.markFailover(err)
return
}
s.mu.Lock()
if s.db == s.localDB {
s.db = s.remoteDB
s.dialect = s.remoteDialect
}
s.status.ActiveProvider = "mysql"
s.status.RemoteReady = true
s.status.SchemaVersion = CurrentSchemaVersion
s.status.FailoverActive = false
s.status.LastError = ""
s.status.LastRecoveredAt = Now()
s.mu.Unlock()
}
func (s *Store) markFailover(err error) {
if err == nil || !s.cfg.Database.FailoverEnabled {
return
}
s.mu.Lock()
defer s.mu.Unlock()
s.db = s.localDB
s.dialect = s.localDialect
s.status.ActiveProvider = "sqlite"
s.status.ConfigProvider = s.cfg.Database.Provider
s.status.SchemaVersion = CurrentSchemaVersion
s.status.RemoteReady = false
s.status.FailoverActive = !strings.EqualFold(s.cfg.Database.Provider, "sqlite")
s.status.LastError = err.Error()
s.status.LastFailoverAt = Now()
}