Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ linters-settings:
- name: empty-block
- name: unreachable-code
- name: redefines-builtin-id
govet:
enable:
- fieldalignment

linters:
disable-all: true
Expand Down
10 changes: 5 additions & 5 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@ import (

// App is main application structure
type App struct {
dcs dcs.DCS
critical atomic.Value
ctx context.Context
mode appMode
aofMode aofMode
nodeFailTime map[string]time.Time
splitTime map[string]time.Time
state appState
critical atomic.Value
logger *slog.Logger
config *config.Config
dcs dcs.DCS
shard *valkey.Shard
cache *valkey.SentiCacheNode
daemonLock *flock.Flock
mode appMode
aofMode aofMode
state appState
}

func baseContext() context.Context {
Expand Down
10 changes: 5 additions & 5 deletions internal/app/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ import (

func getHostStatesInParallel(hosts []string, getter func(string) (*HostState, error)) (map[string]*HostState, error) {
type result struct {
name string
state *HostState
err error
state *HostState
name string
}
results := make(chan result, len(hosts))
for _, host := range hosts {
go func(host string) {
state, err := getter(host)
results <- result{host, state, err}
results <- result{err, state, host}
}(host)
}
shardState := make(map[string]*HostState)
Expand All @@ -35,13 +35,13 @@ func getHostStatesInParallel(hosts []string, getter func(string) (*HostState, er

func runParallel(f func(string) error, arguments []string) map[string]error {
type pair struct {
key string
err error
key string
}
errs := make(chan pair, len(arguments))
for _, argValue := range arguments {
go func(host string) {
errs <- pair{host, f(host)}
errs <- pair{f(host), host}
}(argValue)
}
result := make(map[string]error)
Expand Down
11 changes: 11 additions & 0 deletions internal/app/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,17 @@ func (app *App) getHostState(fqdn string) *HostState {
app.setStateError(&state, fqdn, err.Error())
return &state
}
} else if rs.MasterLinkState && !rs.MasterSyncInProgress {
lastIOSeconds, ok := info["master_last_io_seconds_ago"]
if !ok {
app.setStateError(&state, fqdn, "Replica with link up but no master_last_io_seconds_ago in info")
return &state
}
rs.MasterLastIOSeconds, err = strconv.ParseInt(lastIOSeconds, 10, 64)
if err != nil {
app.setStateError(&state, fqdn, err.Error())
return &state
}
}
replicaOffset, ok := info["slave_repl_offset"]
if !ok {
Expand Down
55 changes: 28 additions & 27 deletions internal/app/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,27 +131,27 @@ const (

// HostState contains status check performed by some rdsync process
type HostState struct {
CheckBy string `json:"check_by"`
CheckAt time.Time `json:"check_at"`
PingOk bool `json:"ping_ok"`
PingStable bool `json:"ping_stable"`
ReplicaState *ReplicaState `json:"replica_state"`
SentiCacheState *SentiCacheState `json:"senticache_state"`
ReplicationID string `json:"replication_id"`
IP string `json:"ip"`
RunID string `json:"runid"`
IsMaster bool `json:"is_master"`
IsOffline bool `json:"is_offline"`
IsReadOnly bool `json:"is_read_only"`
IsReplPaused bool `json:"is_repl_paused"`
MasterReplicationOffset int64 `json:"master_replication_offset"`
SecondReplicationOffset int64 `json:"second_replication_offset"`
Error string `json:"error"`
ReplicationID2 string `json:"replication_id2"`
CheckBy string `json:"check_by"`
ConnectedReplicas []string `json:"connected_replicas"`
ReplicationBacklogStart int64 `json:"replication_backlog_start"`
SecondReplicationOffset int64 `json:"second_replication_offset"`
MasterReplicationOffset int64 `json:"master_replication_offset"`
ReplicationBacklogSize int64 `json:"replication_backlog_size"`
MinReplicasToWrite int64 `json:"min_replicas_to_write"`
ReplicationID string `json:"replication_id"`
ReplicationID2 string `json:"replication_id2"`
Error string `json:"error"`
ConnectedReplicas []string `json:"connected_replicas"`
ReplicaState *ReplicaState `json:"replica_state"`
SentiCacheState *SentiCacheState `json:"senticache_state"`
IsReplPaused bool `json:"is_repl_paused"`
IsReadOnly bool `json:"is_read_only"`
IsOffline bool `json:"is_offline"`
IsMaster bool `json:"is_master"`
PingStable bool `json:"ping_stable"`
PingOk bool `json:"ping_ok"`
}

func (hs *HostState) String() string {
Expand Down Expand Up @@ -180,10 +180,11 @@ func (hs *HostState) String() string {
// Master always has this state empty
type ReplicaState struct {
MasterHost string `json:"master_host"`
MasterLinkState bool `json:"master_link_state"`
MasterLinkDownTime int64 `json:"master_link_down_time"`
MasterSyncInProgress bool `json:"master_sync_in_progress"`
ReplicationOffset int64 `json:"replication_offset"`
MasterLastIOSeconds int64 `json:"master_last_io_seconds"`
MasterLinkState bool `json:"master_link_state"`
MasterSyncInProgress bool `json:"master_sync_in_progress"`
}

func (rs *ReplicaState) String() string {
Expand Down Expand Up @@ -212,15 +213,15 @@ const (

// Switchover contains info about currently running or scheduled switchover/failover process
type Switchover struct {
InitiatedAt time.Time `json:"initiated_at"`
StartedAt time.Time `json:"started_at"`
Result *SwitchoverResult `json:"result"`
Progress *SwitchoverProgress `json:"progress"`
From string `json:"from"`
To string `json:"to"`
Cause string `json:"cause"`
InitiatedBy string `json:"initiated_by"`
InitiatedAt time.Time `json:"initiated_at"`
StartedBy string `json:"started_by"`
StartedAt time.Time `json:"started_at"`
Result *SwitchoverResult `json:"result"`
Progress *SwitchoverProgress `json:"progress"`
RunCount int `json:"run_count"`
}

Expand Down Expand Up @@ -250,23 +251,23 @@ func (sw *Switchover) String() string {

// SwitchoverResult contains results of finished/failed switchover
type SwitchoverResult struct {
Ok bool `json:"ok"`
Error string `json:"error"`
FinishedAt time.Time `json:"finished_at"`
Error string `json:"error"`
Ok bool `json:"ok"`
}

// SwitchoverProgress contains intents and status of running switchover
type SwitchoverProgress struct {
Version int `json:"version"`
Phase int `json:"phase"`
NewMaster string `json:"new_master"`
MostRecent string `json:"most_recent"`
Version int `json:"version"`
Phase int `json:"phase"`
}

// Maintenance struct presence means that cluster under manual control
type Maintenance struct {
InitiatedBy string `json:"initiated_by"`
InitiatedAt time.Time `json:"initiated_at"`
InitiatedBy string `json:"initiated_by"`
RdSyncPaused bool `json:"rdsync_paused"`
ShouldLeave bool `json:"should_leave"`
}
Expand All @@ -283,11 +284,11 @@ func (m *Maintenance) String() string {
}

type PoisonPill struct {
Applied bool `json:"applied"`
InitiatedAt time.Time `json:"initiated_at"`
InitiatedBy string `json:"initiated_by"`
TargetHost string `json:"target_host"`
Cause string `json:"cause"`
Applied bool `json:"applied"`
}

func (pp *PoisonPill) String() string {
Expand Down
46 changes: 23 additions & 23 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,64 +15,64 @@ import (

// ValkeyConfig contains valkey connection info and params
type ValkeyConfig struct {
Port int `yaml:"port"`
ClusterBusPort int `yaml:"cluster_bus_port"`
UseTLS bool `yaml:"use_tls"`
AuthPassword string `yaml:"auth_password"`
AofPath string `yaml:"aof_path"`
RestartCommand string `yaml:"restart_command"`
TLSCAPath string `yaml:"tls_ca_path"`
AuthUser string `yaml:"auth_user"`
AuthPassword string `yaml:"auth_password"`
DialTimeout time.Duration `yaml:"dial_timeout"`
FailoverCooldown time.Duration `yaml:"failover_cooldown"`
WaitPromoteTimeout time.Duration `yaml:"wait_promote_timeout"`
WriteTimeout time.Duration `yaml:"write_timeout"`
DNSTTL time.Duration `yaml:"dns_ttl"`
FailoverTimeout time.Duration `yaml:"failover_timeout"`
FailoverCooldown time.Duration `yaml:"failover_cooldown"`
Port int `yaml:"port"`
RestartTimeout time.Duration `yaml:"restart_timeout"`
WaitReplicationTimeout time.Duration `yaml:"wait_replication_timeout"`
WaitCatchupTimeout time.Duration `yaml:"wait_catchup_timeout"`
WaitPromoteTimeout time.Duration `yaml:"wait_promote_timeout"`
DialTimeout time.Duration `yaml:"dial_timeout"`
WaitPromoteForceTimeout time.Duration `yaml:"wait_promote_force_timeout"`
WaitPoisonPillTimeout time.Duration `yaml:"wait_poison_pill_timeout"`
MaxParallelSyncs int `yaml:"max_parallel_syncs"`
AllowDataLoss bool `yaml:"allow_data_loss"`
ClusterBusPort int `yaml:"cluster_bus_port"`
TurnBeforeSwitchover bool `yaml:"turn_before_switchover"`
RestartCommand string `yaml:"restart_command"`
AofPath string `yaml:"aof_path"`
UseTLS bool `yaml:"use_tls"`
AllowDataLoss bool `yaml:"allow_data_loss"`
}

// SentinelModeConfig contains sentinel-mode specific configuration
type SentinelModeConfig struct {
AnnounceHostname bool `yaml:"announce_hostname"`
Name string `yaml:"name"`
RunID string `yaml:"run_id"`
ClusterName string `yaml:"cluster_name"`
CacheAuthUser string `yaml:"cache_auth_user"`
CacheAuthPassword string `yaml:"cache_auth_password"`
CachePort int `yaml:"cache_port"`
CacheRestartCommand string `yaml:"cache_restart_command"`
CacheUpdateSecret string `yaml:"cache_update_secret"`
UseTLS bool `yaml:"use_tls"`
TLSCAPath string `yaml:"tls_ca_path"`
CachePort int `yaml:"cache_port"`
AnnounceHostname bool `yaml:"announce_hostname"`
UseTLS bool `yaml:"use_tls"`
}

// Config contains rdsync application configuration
type Config struct {
LogLevel string `yaml:"loglevel"`
Hostname string `yaml:"hostname"`
Mode string `yaml:"mode"`
AofMode string `yaml:"aof_mode"`
InfoFile string `yaml:"info_file"`
Hostname string `yaml:"hostname"`
LogLevel string `yaml:"loglevel"`
AofMode string `yaml:"aof_mode"`
MaintenanceFile string `yaml:"maintenance_file"`
DaemonLockFile string `yaml:"daemon_lock_file"`
PingStable int `yaml:"ping_stable"`
TickInterval time.Duration `yaml:"tick_interval"`
InactivationDelay time.Duration `yaml:"inactivation_delay"`
HealthCheckInterval time.Duration `yaml:"healthcheck_interval"`
InfoFileHandlerInterval time.Duration `yaml:"info_file_handler_interval"`
PprofAddr string `yaml:"pprof_addr"`
SentinelMode SentinelModeConfig `yaml:"sentinel_mode"`
Zookeeper dcs.ZookeeperConfig `yaml:"zookeeper"`
DcsWaitTimeout time.Duration `yaml:"dcs_wait_timeout"`
Valkey ValkeyConfig `yaml:"valkey"`
SentinelMode SentinelModeConfig `yaml:"sentinel_mode"`
HealthCheckInterval time.Duration `yaml:"healthcheck_interval"`
InfoFileHandlerInterval time.Duration `yaml:"info_file_handler_interval"`
InactivationDelay time.Duration `yaml:"inactivation_delay"`
DcsWaitTimeout time.Duration `yaml:"dcs_wait_timeout"`
TickInterval time.Duration `yaml:"tick_interval"`
PingStable int `yaml:"ping_stable"`
}

// DefaultValkeyConfig returns default configuration for valkey connection info and params
Expand Down
24 changes: 12 additions & 12 deletions internal/dcs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,24 @@ import (

// ZookeeperConfig contains Zookeeper connection info
type ZookeeperConfig struct {
Hostname string `config:"hostname" yaml:"hostname"`
SessionTimeout time.Duration `config:"session_timeout" yaml:"session_timeout"`
CACert string `config:"ca_cert" yaml:"ca_cert"`
Namespace string `config:"namespace,required"`
Hostname string `config:"hostname" yaml:"hostname"`
CertFile string `config:"certfile" yaml:"certfile"`
KeyFile string `config:"keyfile" yaml:"keyfile"`
Password string `config:"password" yaml:"password"`
Username string `config:"username" yaml:"username"`
Hosts []string `config:"hosts,required"`
RandomHostProvider RandomHostProviderConfig `config:"random_host_provider" yaml:"random_host_provider"`
BackoffInterval time.Duration `config:"backoff_interval" yaml:"backoff_interval"`
BackoffRandFactor float64 `config:"backoff_rand_factor" yaml:"backoff_rand_factor"`
BackoffMultiplier float64 `config:"backoff_multiplier" yaml:"backoff_multiplier"`
BackoffMaxInterval time.Duration `config:"backoff_max_interval" yaml:"backoff_max_interval"`
BackoffMaxElapsedTime time.Duration `config:"backoff_max_elapsed_time" yaml:"backoff_max_elapsed_time"`
BackoffMaxRetries uint64 `config:"backoff_max_retries" yaml:"backoff_max_retries"`
RandomHostProvider RandomHostProviderConfig `config:"random_host_provider" yaml:"random_host_provider"`
BackoffMaxElapsedTime time.Duration `config:"backoff_max_elapsed_time" yaml:"backoff_max_elapsed_time"`
BackoffMaxInterval time.Duration `config:"backoff_max_interval" yaml:"backoff_max_interval"`
BackoffMultiplier float64 `config:"backoff_multiplier" yaml:"backoff_multiplier"`
BackoffRandFactor float64 `config:"backoff_rand_factor" yaml:"backoff_rand_factor"`
SessionTimeout time.Duration `config:"session_timeout" yaml:"session_timeout"`
Auth bool `config:"auth" yaml:"auth"`
Username string `config:"username" yaml:"username"`
Password string `config:"password" yaml:"password"`
UseSSL bool `config:"use_ssl" yaml:"use_ssl"`
KeyFile string `config:"keyfile" yaml:"keyfile"`
CertFile string `config:"certfile" yaml:"certfile"`
CACert string `config:"ca_cert" yaml:"ca_cert"`
VerifyCerts bool `config:"verify_certs" yaml:"verify_certs"`
}

Expand Down
6 changes: 3 additions & 3 deletions internal/dcs/zk.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ type zkDCS struct {
conn *zk.Conn
eventsChan <-chan zk.Event
disconnectCallback func() error
isConnected bool
connectedChans []chan struct{}
connectedLock sync.Mutex
closeTimer *time.Timer
connectedChans []chan struct{}
acl []zk.ACL
connectedLock sync.Mutex
isConnected bool
}

type zkLoggerProxy struct{ *slog.Logger }
Expand Down
10 changes: 5 additions & 5 deletions internal/dcs/zk_host_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,21 @@ import (
)

type zkhost struct {
resolved []string
lastLookup time.Time
resolved []string
}

type RandomHostProvider struct {
ctx context.Context
logger *slog.Logger
resolver *net.Resolver
tried map[string]struct{}
hosts sync.Map
useAddrs bool
hostsKeys []string
tried map[string]struct{}
logger *slog.Logger
lookupTTL time.Duration
lookupTimeout time.Duration
lookupTickInterval time.Duration
resolver *net.Resolver
useAddrs bool
}

func NewRandomHostProvider(ctx context.Context, config *RandomHostProviderConfig, useAddrs bool, logger *slog.Logger) *RandomHostProvider {
Expand Down
Loading
Loading