Skip to content

Commit

Permalink
skip executions based on database lookup
Browse files Browse the repository at this point in the history
Signed-off-by: Maksym Trofimenko <[email protected]>
  • Loading branch information
Maksym Trofimenko committed Jan 30, 2025
1 parent 6c81e0f commit 207a7a0
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 128 deletions.
23 changes: 22 additions & 1 deletion src/controller/jobmonitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ package jobmonitor
import (
"context"
"fmt"
"github.com/gocraft/work"
"github.com/goharbor/harbor/src/common/job"
libRedis "github.com/goharbor/harbor/src/lib/redis"
"strings"
"time"

jobSvc "github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/orm"
Expand Down Expand Up @@ -87,12 +91,29 @@ func NewMonitorController() MonitorController {
taskManager: task.NewManager(),
queueManager: jm.NewQueueClient(),
queueStatusManager: queuestatus.Mgr,
monitorClient: jm.GetJobServiceMonitorClient,
monitorClient: jobServiceMonitorClient,
jobServiceRedisClient: jm.JobServiceRedisClient,
executionDAO: taskDao.NewExecutionDAO(),
}
}

func jobServiceMonitorClient() (jm.JobServiceMonitorClient, error) {
cfg, err := job.GlobalClient.GetJobServiceConfig()
if err != nil {
return nil, err
}
config := cfg.RedisPoolConfig
pool, err := libRedis.GetRedisPool(jm.JobServicePool, config.RedisURL, &libRedis.PoolParam{
PoolMaxIdle: 0,
PoolIdleTimeout: time.Duration(config.IdleTimeoutSecond) * time.Second,
})
if err != nil {
log.Errorf("failed to get redis pool: %v", err)
return nil, err
}
return work.NewClient(fmt.Sprintf("{%s}", config.Namespace), pool), nil
}

func (w *monitorController) ListWorkers(ctx context.Context, poolID string) ([]*jm.Worker, error) {
mClient, err := w.monitorClient()
if err != nil {
Expand Down
49 changes: 26 additions & 23 deletions src/controller/replication/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"fmt"
"time"

"github.com/goharbor/harbor/src/pkg/jobmonitor"

"github.com/goharbor/harbor/src/controller/event/operator"
"github.com/goharbor/harbor/src/controller/replication/flow"
replicationmodel "github.com/goharbor/harbor/src/controller/replication/model"
Expand Down Expand Up @@ -78,28 +76,26 @@ type Controller interface {
// NewController creates a new instance of the replication controller
func NewController() Controller {
return &controller{
repMgr: replication.Mgr,
execMgr: task.ExecMgr,
taskMgr: task.Mgr,
regMgr: reg.Mgr,
scheduler: scheduler.Sched,
flowCtl: flow.NewController(),
ormCreator: orm.Crt,
wp: lib.NewWorkerPool(10),
observationMgr: jobmonitor.NewObservationManagerImpl(),
repMgr: replication.Mgr,
execMgr: task.ExecMgr,
taskMgr: task.Mgr,
regMgr: reg.Mgr,
scheduler: scheduler.Sched,
flowCtl: flow.NewController(),
ormCreator: orm.Crt,
wp: lib.NewWorkerPool(10),
}
}

type controller struct {
repMgr replication.Manager
execMgr task.ExecutionManager
taskMgr task.Manager
regMgr reg.Manager
scheduler scheduler.Scheduler
flowCtl flow.Controller
ormCreator orm.Creator
wp *lib.WorkerPool
observationMgr jobmonitor.ObservationManager
repMgr replication.Manager
execMgr task.ExecutionManager
taskMgr task.Manager
regMgr reg.Manager
scheduler scheduler.Scheduler
flowCtl flow.Controller
ormCreator orm.Creator
wp *lib.WorkerPool
}

func (c *controller) Start(ctx context.Context, policy *replicationmodel.Policy, resource *model.Resource, trigger string) (int64, error) {
Expand All @@ -118,16 +114,23 @@ func (c *controller) Start(ctx context.Context, policy *replicationmodel.Policy,
if err != nil {
return 0, err
}

if policy.SingleActiveReplication {
o, err := c.observationMgr.ObservationByJobNameAndPolicyID(ctx, job.ReplicationVendorType, policy.ID)
count, err := c.execMgr.Count(ctx, &q.Query{
Keywords: map[string]interface{}{
"VendorType": job.ReplicationVendorType,
"VendorID": policy.ID,
"Status": job.RunningStatus.String(),
},
})
if err != nil {
return 0, err
}
if o != nil {

if count > 1 {
if err = c.execMgr.MarkSkipped(ctx, id, "Execution skipped: active replication still in progress."); err != nil {
return 0, err
}
return id, nil
}
}

Expand Down
25 changes: 10 additions & 15 deletions src/controller/replication/flow/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ package flow
import (
"context"
"encoding/json"
"github.com/goharbor/harbor/src/pkg/jobmonitor"

repctlmodel "github.com/goharbor/harbor/src/controller/replication/model"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger"
Expand All @@ -28,25 +26,23 @@ import (
)

type copyFlow struct {
executionID int64
resources []*model.Resource
policy *repctlmodel.Policy
executionMgr task.ExecutionManager
taskMgr task.Manager
observationMgr jobmonitor.ObservationManager
executionID int64
resources []*model.Resource
policy *repctlmodel.Policy
executionMgr task.ExecutionManager
taskMgr task.Manager
}

// NewCopyFlow returns an instance of the copy flow which replicates the resources from
// the source registry to the destination registry. If the parameter "resources" isn't provided,
// will fetch the resources first
func NewCopyFlow(executionID int64, policy *repctlmodel.Policy, resources ...*model.Resource) Flow {
return &copyFlow{
executionMgr: task.ExecMgr,
taskMgr: task.Mgr,
executionID: executionID,
policy: policy,
resources: resources,
observationMgr: jobmonitor.NewObservationManagerImpl(),
executionMgr: task.ExecMgr,
taskMgr: task.Mgr,
executionID: executionID,
policy: policy,
resources: resources,
}
}

Expand Down Expand Up @@ -144,7 +140,6 @@ func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources [
"dst_resource": string(dest),
"speed": c.policy.Speed,
"copy_by_chunk": c.policy.CopyByChunk,
"policy_id": c.policy.ID,
},
}

Expand Down
4 changes: 2 additions & 2 deletions src/jobservice/common/rds/scripts.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ local function compare(status, revision)
local aCode = stCode(ARGV[1])
local aRev = tonumber(ARGV[2]) or 0
local aCheckInT = tonumber(ARGV[3]) or 0
if revision < aRev or
if revision < aRev or
( revision == aRev and sCode <= aCode ) or
( revision == aRev and aCheckInT ~= 0 )
then
Expand Down Expand Up @@ -96,7 +96,7 @@ if res then
redis.call('persist', KEYS[1])
end
end
return 'ok'
end
end
Expand Down
87 changes: 0 additions & 87 deletions src/pkg/jobmonitor/observations.go

This file was deleted.

0 comments on commit 207a7a0

Please sign in to comment.