diff --git a/src/controller/jobmonitor/monitor.go b/src/controller/jobmonitor/monitor.go
index ed2b004f62b..3149634bbc6 100644
--- a/src/controller/jobmonitor/monitor.go
+++ b/src/controller/jobmonitor/monitor.go
@@ -17,7 +17,11 @@ package jobmonitor
 import (
 	"context"
 	"fmt"
+	"github.com/gocraft/work"
+	"github.com/goharbor/harbor/src/common/job"
+	libRedis "github.com/goharbor/harbor/src/lib/redis"
 	"strings"
+	"time"
 
 	jobSvc "github.com/goharbor/harbor/src/jobservice/job"
 	"github.com/goharbor/harbor/src/lib/orm"
@@ -87,12 +91,29 @@ func NewMonitorController() MonitorController {
 		taskManager:           task.NewManager(),
 		queueManager:          jm.NewQueueClient(),
 		queueStatusManager:    queuestatus.Mgr,
-		monitorClient:         jm.GetJobServiceMonitorClient,
+		monitorClient:         jobServiceMonitorClient,
 		jobServiceRedisClient: jm.JobServiceRedisClient,
 		executionDAO:          taskDao.NewExecutionDAO(),
 	}
 }
 
+func jobServiceMonitorClient() (jm.JobServiceMonitorClient, error) {
+	cfg, err := job.GlobalClient.GetJobServiceConfig()
+	if err != nil {
+		return nil, err
+	}
+	config := cfg.RedisPoolConfig
+	pool, err := libRedis.GetRedisPool(jm.JobServicePool, config.RedisURL, &libRedis.PoolParam{
+		PoolMaxIdle:     0,
+		PoolIdleTimeout: time.Duration(config.IdleTimeoutSecond) * time.Second,
+	})
+	if err != nil {
+		log.Errorf("failed to get redis pool: %v", err)
+		return nil, err
+	}
+	return work.NewClient(fmt.Sprintf("{%s}", config.Namespace), pool), nil
+}
+
 func (w *monitorController) ListWorkers(ctx context.Context, poolID string) ([]*jm.Worker, error) {
 	mClient, err := w.monitorClient()
 	if err != nil {
diff --git a/src/controller/replication/execution.go b/src/controller/replication/execution.go
index 9b090b5de7b..6e841639a73 100644
--- a/src/controller/replication/execution.go
+++ b/src/controller/replication/execution.go
@@ -19,8 +19,6 @@ import (
 	"fmt"
 	"time"
 
-	"github.com/goharbor/harbor/src/pkg/jobmonitor"
-
 	"github.com/goharbor/harbor/src/controller/event/operator"
 	"github.com/goharbor/harbor/src/controller/replication/flow"
 	replicationmodel "github.com/goharbor/harbor/src/controller/replication/model"
@@ -78,28 +76,26 @@ type Controller interface {
 // NewController creates a new instance of the replication controller
 func NewController() Controller {
 	return &controller{
-		repMgr:         replication.Mgr,
-		execMgr:        task.ExecMgr,
-		taskMgr:        task.Mgr,
-		regMgr:         reg.Mgr,
-		scheduler:      scheduler.Sched,
-		flowCtl:        flow.NewController(),
-		ormCreator:     orm.Crt,
-		wp:             lib.NewWorkerPool(10),
-		observationMgr: jobmonitor.NewObservationManagerImpl(),
+		repMgr:     replication.Mgr,
+		execMgr:    task.ExecMgr,
+		taskMgr:    task.Mgr,
+		regMgr:     reg.Mgr,
+		scheduler:  scheduler.Sched,
+		flowCtl:    flow.NewController(),
+		ormCreator: orm.Crt,
+		wp:         lib.NewWorkerPool(10),
 	}
 }
 
 type controller struct {
-	repMgr         replication.Manager
-	execMgr        task.ExecutionManager
-	taskMgr        task.Manager
-	regMgr         reg.Manager
-	scheduler      scheduler.Scheduler
-	flowCtl        flow.Controller
-	ormCreator     orm.Creator
-	wp             *lib.WorkerPool
-	observationMgr jobmonitor.ObservationManager
+	repMgr     replication.Manager
+	execMgr    task.ExecutionManager
+	taskMgr    task.Manager
+	regMgr     reg.Manager
+	scheduler  scheduler.Scheduler
+	flowCtl    flow.Controller
+	ormCreator orm.Creator
+	wp         *lib.WorkerPool
 }
 
 func (c *controller) Start(ctx context.Context, policy *replicationmodel.Policy, resource *model.Resource, trigger string) (int64, error) {
@@ -118,16 +114,23 @@ func (c *controller) Start(ctx context.Context, policy *replicationmodel.Policy,
 	if err != nil {
 		return 0, err
 	}
+
 	if policy.SingleActiveReplication {
-		o, err := c.observationMgr.ObservationByJobNameAndPolicyID(ctx, job.ReplicationVendorType, policy.ID)
+		count, err := c.execMgr.Count(ctx, &q.Query{
+			Keywords: map[string]interface{}{
+				"VendorType": job.ReplicationVendorType,
+				"VendorID":   policy.ID,
+				"Status":     job.RunningStatus.String(),
+			},
+		})
 		if err != nil {
 			return 0, err
 		}
-		if o != nil {
+
+		if count > 1 {
 			if err = c.execMgr.MarkSkipped(ctx, id, "Execution skipped: active replication still in progress."); err != nil {
 				return 0, err
 			}
-			return id, nil
 		}
 	}
 
diff --git a/src/controller/replication/flow/copy.go b/src/controller/replication/flow/copy.go
index a44a2ad6982..5c845663bc9 100644
--- a/src/controller/replication/flow/copy.go
+++ b/src/controller/replication/flow/copy.go
@@ -17,8 +17,6 @@ package flow
 import (
 	"context"
 	"encoding/json"
-	"github.com/goharbor/harbor/src/pkg/jobmonitor"
-
 	repctlmodel "github.com/goharbor/harbor/src/controller/replication/model"
 	"github.com/goharbor/harbor/src/jobservice/job"
 	"github.com/goharbor/harbor/src/jobservice/logger"
@@ -28,12 +26,11 @@ import (
 )
 
 type copyFlow struct {
-	executionID    int64
-	resources      []*model.Resource
-	policy         *repctlmodel.Policy
-	executionMgr   task.ExecutionManager
-	taskMgr        task.Manager
-	observationMgr jobmonitor.ObservationManager
+	executionID  int64
+	resources    []*model.Resource
+	policy       *repctlmodel.Policy
+	executionMgr task.ExecutionManager
+	taskMgr      task.Manager
 }
 
 // NewCopyFlow returns an instance of the copy flow which replicates the resources from
@@ -41,12 +38,11 @@ type copyFlow struct {
 // will fetch the resources first
 func NewCopyFlow(executionID int64, policy *repctlmodel.Policy, resources ...*model.Resource) Flow {
 	return &copyFlow{
-		executionMgr:   task.ExecMgr,
-		taskMgr:        task.Mgr,
-		executionID:    executionID,
-		policy:         policy,
-		resources:      resources,
-		observationMgr: jobmonitor.NewObservationManagerImpl(),
+		executionMgr: task.ExecMgr,
+		taskMgr:      task.Mgr,
+		executionID:  executionID,
+		policy:       policy,
+		resources:    resources,
 	}
 }
 
@@ -144,7 +140,6 @@ func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources [
 				"dst_resource":  string(dest),
 				"speed":         c.policy.Speed,
 				"copy_by_chunk": c.policy.CopyByChunk,
-				"policy_id":     c.policy.ID,
 			},
 		}
 
diff --git a/src/jobservice/common/rds/scripts.go b/src/jobservice/common/rds/scripts.go
index ca43ec489f7..fd176312988 100644
--- a/src/jobservice/common/rds/scripts.go
+++ b/src/jobservice/common/rds/scripts.go
@@ -42,7 +42,7 @@ local function compare(status, revision)
   local aCode = stCode(ARGV[1])
   local aRev = tonumber(ARGV[2]) or 0
   local aCheckInT = tonumber(ARGV[3]) or 0
-  if revision < aRev or 
+  if revision < aRev or
     ( revision == aRev and sCode <= aCode ) or
     ( revision == aRev and aCheckInT ~= 0 )
   then
@@ -96,7 +96,7 @@ if res then
         redis.call('persist', KEYS[1])
       end
     end
-    
+
     return 'ok'
   end
 end
diff --git a/src/pkg/jobmonitor/observations.go b/src/pkg/jobmonitor/observations.go
deleted file mode 100644
index f52c5190c0c..00000000000
--- a/src/pkg/jobmonitor/observations.go
+++ /dev/null
@@ -1,87 +0,0 @@
-// Copyright Project Harbor Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package jobmonitor
-
-import (
-	"context"
-	"encoding/json"
-	"fmt"
-	"time"
-
-	"github.com/gocraft/work"
-
-	"github.com/goharbor/harbor/src/common/job"
-	"github.com/goharbor/harbor/src/lib/errors"
-	"github.com/goharbor/harbor/src/lib/log"
-	libRedis "github.com/goharbor/harbor/src/lib/redis"
-)
-
-type ObservationManager interface {
-	// ObservationByJobNameAndPolicyID scans and filters active observations
-	ObservationByJobNameAndPolicyID(ctx context.Context, jobName string, policyID int64) (observation *work.WorkerObservation, err error)
-}
-
-type ObservationManagerImpl struct {
-}
-
-func NewObservationManagerImpl() *ObservationManagerImpl {
-	return &ObservationManagerImpl{}
-}
-
-func (m *ObservationManagerImpl) ObservationByJobNameAndPolicyID(_ context.Context, jobName string, policyID int64) (observation *work.WorkerObservation, err error) {
-	monitorClient, err := GetJobServiceMonitorClient()
-	if err != nil {
-		return nil, errors.New(nil).WithCode(errors.PreconditionCode).WithMessagef("unable to get job monitor's client: %v", err)
-	}
-	observations, err := monitorClient.WorkerObservations()
-	if err != nil {
-		return nil, errors.New(nil).WithCode(errors.PreconditionCode).WithMessagef("unable to get jobs observations: %v", err)
-	}
-	for _, o := range observations {
-		if observationMatch(o, jobName, policyID) {
-			return o, nil
-		}
-	}
-	return nil, nil
-}
-
-func observationMatch(o *work.WorkerObservation, jobName string, policyID int64) bool {
-	if o.JobName != jobName {
-		return false
-	}
-	args := map[string]interface{}{}
-	if err := json.Unmarshal([]byte(o.ArgsJSON), &args); err != nil {
-		return false
-	}
-	policyIDFromArgs, ok := args["policy_id"].(float64)
-	return ok && int64(policyIDFromArgs) == policyID
-}
-
-func GetJobServiceMonitorClient() (JobServiceMonitorClient, error) {
-	cfg, err := job.GlobalClient.GetJobServiceConfig()
-	if err != nil {
-		return nil, err
-	}
-	config := cfg.RedisPoolConfig
-	pool, err := libRedis.GetRedisPool(JobServicePool, config.RedisURL, &libRedis.PoolParam{
-		PoolMaxIdle:     0,
-		PoolIdleTimeout: time.Duration(config.IdleTimeoutSecond) * time.Second,
-	})
-	if err != nil {
-		log.Errorf("failed to get redis pool: %v", err)
-		return nil, err
-	}
-	return work.NewClient(fmt.Sprintf("{%s}", config.Namespace), pool), nil
-}