From 8efc245f0bbb2e7a9144258bec7c79f7c9045811 Mon Sep 17 00:00:00 2001 From: Yuvraj Date: Thu, 17 Oct 2024 02:19:53 +0530 Subject: [PATCH] added workflow and execution db entity --- idl/cloud/v1/cloud.proto | 94 +++++++++++--------- server/repository/gormimpl/execution.go | 102 ++++++++++++++++++++++ server/repository/gormimpl/workflow.go | 102 ++++++++++++++++++++++ server/repository/interface/execution.go | 23 +++++ server/repository/interface/repo.go | 2 + server/repository/interface/workflow.go | 25 ++++++ server/repository/model/task/execution.go | 51 +++++++++++ server/repository/model/task/task.go | 9 +- server/repository/model/task/workflow.go | 33 +++++++ server/repository/postgres.go | 20 ++++- server/route/task.go | 4 + 11 files changed, 412 insertions(+), 53 deletions(-) create mode 100644 server/repository/gormimpl/execution.go create mode 100644 server/repository/gormimpl/workflow.go create mode 100644 server/repository/interface/execution.go create mode 100644 server/repository/interface/workflow.go create mode 100644 server/repository/model/task/execution.go create mode 100644 server/repository/model/task/workflow.go diff --git a/idl/cloud/v1/cloud.proto b/idl/cloud/v1/cloud.proto index 47988b2..cd053ca 100644 --- a/idl/cloud/v1/cloud.proto +++ b/idl/cloud/v1/cloud.proto @@ -6,7 +6,6 @@ import "validate/validate.proto"; import "google/protobuf/empty.proto"; import "google/protobuf/timestamp.proto"; - // Enum for Task statuses enum TaskStatusEnum { QUEUED = 0; // Task is in the queue, waiting to be processed @@ -101,49 +100,59 @@ message Task { string description = 9 [(validate.rules).string = { max_len: 5000 }]; - repeated string dependencies = 10; // IDs of tasks that must complete before this task + + // IDs of tasks that must complete before this task. + repeated string dependencies = 10; + + // Base image for the task execution environment. string base_image = 11; + + // Entrypoint for the task execution. string entrypoint = 12; + + // Arguments for the task execution. repeated string args = 13; + + // Environment variables for the task execution. map env = 14; } -// Workflow represents a collection of tasks organized in a DAG +// Workflow represents a collection of tasks organized in a Directed Acyclic Graph (DAG). message Workflow { - string id = 1; - string name = 2; - string description = 3; - repeated Task tasks = 4; - map metadata = 5; - } - - // ExecutionStatus represents the current state of a task or workflow execution + string id = 1; // Unique identifier for the workflow. + string name = 2; // Name of the workflow. + string description = 3; // Description of the workflow. + repeated Task tasks = 4; // List of tasks in the workflow. + map metadata = 5; // Additional metadata for the workflow. +} + +// ExecutionStatus represents the current state of a task or workflow execution. enum ExecutionStatus { - EXECUTION_STATUS_UNSPECIFIED = 0; - EXECUTION_STATUS_PENDING = 1; - EXECUTION_STATUS_RUNNING = 2; - EXECUTION_STATUS_COMPLETED = 3; - EXECUTION_STATUS_FAILED = 4; - } - - // TaskExecution represents the execution of a task + EXECUTION_STATUS_UNSPECIFIED = 0; // Status is not specified. + EXECUTION_STATUS_PENDING = 1; // Task or workflow is pending execution. + EXECUTION_STATUS_RUNNING = 2; // Task or workflow is currently running. + EXECUTION_STATUS_COMPLETED = 3; // Task or workflow has completed execution. + EXECUTION_STATUS_FAILED = 4; // Task or workflow has failed. +} + +// TaskExecution represents the execution of a task. message TaskExecution { - string task_id = 1; - ExecutionStatus status = 2; - google.protobuf.Timestamp created_at = 3; - google.protobuf.Timestamp updated_at = 4; - map execution_metadata = 5; - } - - // WorkflowExecution represents the execution of a workflow + string task_id = 1; // Unique identifier for the task being executed. + ExecutionStatus status = 2; // Current execution status of the task. + google.protobuf.Timestamp created_at = 3; // Timestamp of when the task execution started. + google.protobuf.Timestamp updated_at = 4; // Timestamp of the last update to the task execution. + map execution_metadata = 5; // Metadata related to the task execution. +} + +// WorkflowExecution represents the execution of a workflow. message WorkflowExecution { - string workflow_id = 1; - ExecutionStatus status = 2; - repeated TaskExecution task_executions = 3; - google.protobuf.Timestamp created_at = 4; - google.protobuf.Timestamp updated_at = 5; - map execution_metadata = 6; - } + string workflow_id = 1; // Unique identifier for the workflow being executed. + ExecutionStatus status = 2; // Current execution status of the workflow. + repeated TaskExecution task_executions = 3; // List of task executions within the workflow. + google.protobuf.Timestamp created_at = 4; // Timestamp of when the workflow execution started. + google.protobuf.Timestamp updated_at = 5; // Timestamp of the last update to the workflow execution. + map execution_metadata = 6; // Metadata related to the workflow execution. +} // Message for Task history message TaskHistory { @@ -197,7 +206,6 @@ message UpdateTaskStatusRequest { string message = 3 [(validate.rules).string = {max_len: 2000}]; } - // Task Management service definition service TaskManagementService { // Creates a new task based on the provided request. @@ -224,11 +232,14 @@ service TaskManagementService { // Returns a GetStatusResponse containing a map of status counts. rpc GetStatus(GetStatusRequest) returns (GetStatusResponse) {} + // Sends a heartbeat signal to indicate the service is alive. rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {} + // Pulls events related to task execution. rpc PullEvents(PullEventsRequest) returns (stream PullEventsResponse) {} } +// Message for heartbeat request message HeartbeatRequest { // Timestamp of the heartbeat, in ISO 8601 format (UTC). // This timestamp indicates when the heartbeat was sent. @@ -243,6 +254,7 @@ message HeartbeatRequest { }]; } +// Message for heartbeat response message HeartbeatResponse { // Response message for the heartbeat request. // Currently, this message is empty, indicating successful receipt of the heartbeat. @@ -262,16 +274,13 @@ message PullEventsResponse { // Message for work assignments message WorkAssignment { - // Unique identifier for the assignment - int64 assignment_id = 1 ; + // Unique identifier for the assignment. + int64 assignment_id = 1; - // The task to be executed + // The task to be executed. Task task = 2 [(validate.rules).message.required = true]; } - - - // Message for GetStatus request (empty) message GetStatusRequest {} @@ -283,7 +292,8 @@ message GetStatusResponse { // Message for Task List message TaskList { - repeated Task tasks = 1; // List of tasks in the system. + // List of tasks in the system. + repeated Task tasks = 1; } // Message for Task List request diff --git a/server/repository/gormimpl/execution.go b/server/repository/gormimpl/execution.go new file mode 100644 index 0000000..a751429 --- /dev/null +++ b/server/repository/gormimpl/execution.go @@ -0,0 +1,102 @@ +package gormimpl + +import ( + "context" + "fmt" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "gorm.io/gorm" + + interfaces "task/server/repository/interface" + models "task/server/repository/model/task" +) + +var ( + executionOperations = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "execution_repository_operations_total", + Help: "The total number of execution repository operations", + }, + []string{"operation", "status"}, + ) + executionLatency = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "execution_repository_operation_duration_seconds", + Help: "Duration of execution repository operations in seconds", + Buckets: prometheus.DefBuckets, + }, + []string{"operation"}, + ) +) + +// ExecutionRepo implements the ExecutionRepo interface using GORM for database operations +// and River for execution queue management. +type ExecutionRepo struct { + db *gorm.DB +} + +// CreateExecution creates a new execution in the database and enqueues it for processing. +// It returns the created execution with its assigned ID or an error if the operation fails. +func (s *ExecutionRepo) CreateExecution(ctx context.Context, execution models.Execution) (models.Execution, error) { + timer := prometheus.NewTimer(executionLatency.WithLabelValues("create")) + defer timer.ObserveDuration() + + result := s.db.Create(&execution) + if result.Error != nil { + executionOperations.WithLabelValues("create", "error").Inc() + return models.Execution{}, fmt.Errorf("failed to create execution: %w", result.Error) + } + + if execution.ID == 0 { + executionOperations.WithLabelValues("create", "error").Inc() + return models.Execution{}, fmt.Errorf("failed to get execution ID after creation") + } + + executionOperations.WithLabelValues("create", "success").Inc() + return execution, nil +} + +// GetExecution retrieves a execution from the database by its ID. +// It returns a pointer to the execution if found, or an error if the execution doesn't exist or if the operation fails. +func (s *ExecutionRepo) GetExecution(ctx context.Context, executionID uint) (*models.Execution, error) { + timer := prometheus.NewTimer(executionLatency.WithLabelValues("get")) + defer timer.ObserveDuration() + + var execution models.Execution + if err := s.db.First(&execution, executionID).Error; err != nil { + executionOperations.WithLabelValues("get", "error").Inc() + return nil, fmt.Errorf("failed to retrieve execution by ID: %w", err) + } + executionOperations.WithLabelValues("get", "success").Inc() + return &execution, nil +} + +// ListExecutions retrieves a paginated list of executions from the database, filtered by status and type. +// The 'limit' parameter specifies the maximum number of executions to return, +// 'offset' determines the starting point for pagination, +// 'status' allows filtering by execution status, and 'executionType' allows filtering by execution type. +// It returns a slice of executions and an error if the operation fails. +func (s *ExecutionRepo) ListExecution(ctx context.Context) ([]models.Execution, error) { + timer := prometheus.NewTimer(executionLatency.WithLabelValues("list")) + defer timer.ObserveDuration() + + var executions []models.Execution + + // Execute the query + if err := s.db.Find(&executions).Error; err != nil { + executionOperations.WithLabelValues("list", "error").Inc() + return nil, fmt.Errorf("failed to retrieve executions: %w", err) + } + + executionOperations.WithLabelValues("list", "success").Inc() + return executions, nil +} + +// NewExecutionRepo creates and returns a new instance of ExecutionRepo. +// It requires a GORM database connection and a River client for execution queue management. +func NewExecutionRepo(db *gorm.DB) interfaces.ExecutionRepo { + return &ExecutionRepo{ + db: db, + } +} diff --git a/server/repository/gormimpl/workflow.go b/server/repository/gormimpl/workflow.go new file mode 100644 index 0000000..4c5f67f --- /dev/null +++ b/server/repository/gormimpl/workflow.go @@ -0,0 +1,102 @@ +package gormimpl + +import ( + "context" + "fmt" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "gorm.io/gorm" + + interfaces "task/server/repository/interface" + models "task/server/repository/model/task" +) + +var ( + workflowOperations = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "workflow_repository_operations_total", + Help: "The total number of workflow repository operations", + }, + []string{"operation", "status"}, + ) + workflowLatency = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "workflow_repository_operation_duration_seconds", + Help: "Duration of workflow repository operations in seconds", + Buckets: prometheus.DefBuckets, + }, + []string{"operation"}, + ) +) + +// WorkflowRepo implements the WorkflowRepo interface using GORM for database operations +// and River for workflow queue management. +type WorkflowRepo struct { + db *gorm.DB +} + +// CreateWorkflow creates a new workflow in the database and enqueues it for processing. +// It returns the created workflow with its assigned ID or an error if the operation fails. +func (s *WorkflowRepo) CreateWorkflow(ctx context.Context, workflow models.Workflow) (models.Workflow, error) { + timer := prometheus.NewTimer(workflowLatency.WithLabelValues("create")) + defer timer.ObserveDuration() + + result := s.db.Create(&workflow) + if result.Error != nil { + workflowOperations.WithLabelValues("create", "error").Inc() + return models.Workflow{}, fmt.Errorf("failed to create workflow: %w", result.Error) + } + + if workflow.ID == 0 { + workflowOperations.WithLabelValues("create", "error").Inc() + return models.Workflow{}, fmt.Errorf("failed to get workflow ID after creation") + } + + workflowOperations.WithLabelValues("create", "success").Inc() + return workflow, nil +} + +// GetWorkflow retrieves a workflow from the database by its ID. +// It returns a pointer to the workflow if found, or an error if the workflow doesn't exist or if the operation fails. +func (s *WorkflowRepo) GetWorkflow(ctx context.Context, workflowID uint) (*models.Workflow, error) { + timer := prometheus.NewTimer(workflowLatency.WithLabelValues("get")) + defer timer.ObserveDuration() + + var workflow models.Workflow + if err := s.db.First(&workflow, workflowID).Error; err != nil { + workflowOperations.WithLabelValues("get", "error").Inc() + return nil, fmt.Errorf("failed to retrieve workflow by ID: %w", err) + } + workflowOperations.WithLabelValues("get", "success").Inc() + return &workflow, nil +} + +// ListWorkflows retrieves a paginated list of workflows from the database, filtered by status and type. +// The 'limit' parameter specifies the maximum number of workflows to return, +// 'offset' determines the starting point for pagination, +// 'status' allows filtering by workflow status, and 'workflowType' allows filtering by workflow type. +// It returns a slice of workflows and an error if the operation fails. +func (s *WorkflowRepo) ListWorkflow(ctx context.Context) ([]models.Workflow, error) { + timer := prometheus.NewTimer(workflowLatency.WithLabelValues("list")) + defer timer.ObserveDuration() + + var workflows []models.Workflow + + // Execute the query + if err := s.db.Find(&workflows).Error; err != nil { + workflowOperations.WithLabelValues("list", "error").Inc() + return nil, fmt.Errorf("failed to retrieve workflows: %w", err) + } + + workflowOperations.WithLabelValues("list", "success").Inc() + return workflows, nil +} + +// NewWorkflowRepo creates and returns a new instance of WorkflowRepo. +// It requires a GORM database connection and a River client for workflow queue management. +func NewWorkflowRepo(db *gorm.DB) interfaces.WorkflowRepo { + return &WorkflowRepo{ + db: db, + } +} diff --git a/server/repository/interface/execution.go b/server/repository/interface/execution.go new file mode 100644 index 0000000..99ed5f5 --- /dev/null +++ b/server/repository/interface/execution.go @@ -0,0 +1,23 @@ +package interfaces + +import ( + "context" + + model "task/server/repository/model/task" +) + +// ExecutionRepo defines the interface for the task history repository. +// It handles operations related to task history management. +// +//go:generate mockery --output=../mocks --case=underscore --all --with-expecter +type ExecutionRepo interface { + // CreateExecution creates a history entry for a task. + // It takes a context.Context parameter for handling request-scoped values and deadlines. + CreateExecution(ctx context.Context, execution model.Execution) (model.Execution, error) + + // GetExecution retrieves the history of a task by its ID. + // Returns a slice of task history entries, or an error if none found. + GetExecution(ctx context.Context, taskID uint) (*model.Execution, error) + + ListExecution(ctx context.Context) ([]model.Execution, error) +} diff --git a/server/repository/interface/repo.go b/server/repository/interface/repo.go index f4e5c35..014a708 100644 --- a/server/repository/interface/repo.go +++ b/server/repository/interface/repo.go @@ -4,4 +4,6 @@ package interfaces type TaskManagmentInterface interface { TaskRepo() TaskRepo TaskHistoryRepo() TaskHistoryRepo + WorkflowRepo() WorkflowRepo + ExecutionRepo() ExecutionRepo } diff --git a/server/repository/interface/workflow.go b/server/repository/interface/workflow.go new file mode 100644 index 0000000..300f2a9 --- /dev/null +++ b/server/repository/interface/workflow.go @@ -0,0 +1,25 @@ +package interfaces + +import ( + "context" + + model "task/server/repository/model/task" +) + +// WorkflowRepo defines the interface for the task history repository. +// It handles operations related to task history management. +// +//go:generate mockery --output=../mocks --case=underscore --all --with-expecter +type WorkflowRepo interface { + // CreateWorkflow creates a history entry for a task. + // It takes a context.Context parameter for handling request-scoped values and deadlines. + CreateWorkflow(ctx context.Context, workflow model.Workflow) (model.Workflow, error) + + // GetWorkflow retrieves the history of a task by its ID. + // Returns a slice of task history entries, or an error if none found. + GetWorkflow(ctx context.Context, workflowID uint) (*model.Workflow, error) + + // ListTaskHistories lists all history entries for a given task, with pagination support. + // Returns a slice of task history entries, along with a pagination token (if any) for subsequent queries. + ListWorkflow(ctx context.Context) ([]model.Workflow, error) +} diff --git a/server/repository/model/task/execution.go b/server/repository/model/task/execution.go new file mode 100644 index 0000000..a3b02b8 --- /dev/null +++ b/server/repository/model/task/execution.go @@ -0,0 +1,51 @@ +package task + +import ( + "errors" + "time" + + "gorm.io/gorm" +) + +// Execution represents a task with its attributes. +type Execution struct { + gorm.Model + TaskID uint `json:"task_id" gorm:"not null"` // Foreign key for Task + Status int `json:"status" gorm:"type:int;not null"` // Refers to the custom PostgreSQL enum + Payload string `json:"payload" gorm:"type:jsonb;not null"` // Storing JSON as a string in PostgreSQL + Retries int `json:"retries" gorm:"default:0;check:retries >= 0 AND retries <= 10"` + Priority int `json:"priority" gorm:"default:0;check:priority >= 0"` + CreatedAt time.Time `json:"created_at" gorm:"autoCreateTime; not null"` + UpdatedAt time.Time `json:"updated_at" gorm:"autoUpdateTime; not null"` +} + +// TableName returns the custom table name for the Task model. +func (*Execution) TableName() string { + return "executions" +} + +// BeforeCreate ensures that the TaskType and TaskStatusEnum are valid before the task is created. +func (t *Execution) BeforeCreate(tx *gorm.DB) (err error) { + // Set CreatedAt to current time + t.CreatedAt = time.Now() + + // Ensure task status is valid + if t.Status > 4 { + return errors.New("invalid task status") + } + + return nil +} + +// BeforeCreate ensures that the TaskType and TaskStatusEnum are valid before the task is created. +func (t *Execution) BeforeUpdate(tx *gorm.DB) (err error) { + // Set CreatedAt to current time + t.UpdatedAt = time.Now() + + // Ensure task status is valid + if t.Status > 4 { + return errors.New("invalid task status") + } + + return nil +} diff --git a/server/repository/model/task/task.go b/server/repository/model/task/task.go index beba8de..0926ec9 100644 --- a/server/repository/model/task/task.go +++ b/server/repository/model/task/task.go @@ -1,7 +1,6 @@ package task import ( - "encoding/json" "errors" "time" @@ -16,9 +15,11 @@ type Task struct { Type string `json:"type" gorm:"type:varchar(255);not null"` // Refers to the custom PostgreSQL enum Status int `json:"status" gorm:"type:int;not null"` // Refers to the custom PostgreSQL enum Payload string `json:"payload" gorm:"type:jsonb;not null"` // Storing JSON as a string in PostgreSQL + Spec []byte `json:"spec" gorm:"type:bytea;not null"` // Storing binary data in PostgreSQL Retries int `json:"retries" gorm:"default:0;check:retries >= 0 AND retries <= 10"` Priority int `json:"priority" gorm:"default:0;check:priority >= 0"` CreatedAt time.Time `json:"created_at" gorm:"autoCreateTime; not null"` + UpdatedAt time.Time `json:"updated_at" gorm:"autoUpdateTime; not null"` } // TableName returns the custom table name for the Task model. @@ -41,11 +42,5 @@ func (t *Task) BeforeCreate(tx *gorm.DB) (err error) { return errors.New("invalid task status") } - // Ensure payload is valid JSON - var js json.RawMessage - if err := json.Unmarshal([]byte(t.Payload), &js); err != nil { - return errors.New("invalid JSON payload") - } - return nil } diff --git a/server/repository/model/task/workflow.go b/server/repository/model/task/workflow.go new file mode 100644 index 0000000..97b961a --- /dev/null +++ b/server/repository/model/task/workflow.go @@ -0,0 +1,33 @@ +package task + +import ( + "time" + + "gorm.io/gorm" +) + +// Workflow represents a task with its attributes. +type Workflow struct { + gorm.Model + Name string `json:"name" gorm:"type:varchar(255);not null"` + Description string `json:"description" gorm:"type:text;not null"` + Payload string `json:"payload" gorm:"type:jsonb;not null"` // Storing JSON as a string in PostgreSQL + Spec []byte `json:"spec" gorm:"type:bytea;not null"` // Storing binary data in PostgreSQL + Retries int `json:"retries" gorm:"default:0;check:retries >= 0 AND retries <= 10"` + Priority int `json:"priority" gorm:"default:0;check:priority >= 0"` + CreatedAt time.Time `json:"created_at" gorm:"autoCreateTime; not null"` + UpdatedAt time.Time `json:"updated_at" gorm:"autoUpdateTime; not null"` +} + +// TableName returns the custom table name for the Task model. +func (*Workflow) TableName() string { + return "workflows" +} + +// BeforeCreate ensures that the TaskType and TaskStatusEnum are valid before the task is created. +func (t *Workflow) BeforeCreate(tx *gorm.DB) (err error) { + // Set CreatedAt to current time + t.CreatedAt = time.Now() + + return nil +} diff --git a/server/repository/postgres.go b/server/repository/postgres.go index 63c8158..392ae9b 100644 --- a/server/repository/postgres.go +++ b/server/repository/postgres.go @@ -9,8 +9,10 @@ import ( ) type Postgres struct { - task interfaces.TaskRepo - history interfaces.TaskHistoryRepo + task interfaces.TaskRepo + history interfaces.TaskHistoryRepo + workflow interfaces.WorkflowRepo + execution interfaces.ExecutionRepo } func (r Postgres) TaskRepo() interfaces.TaskRepo { @@ -22,9 +24,19 @@ func (r Postgres) TaskHistoryRepo() interfaces.TaskHistoryRepo { return r.history } +func (r Postgres) WorkflowRepo() interfaces.WorkflowRepo { + return r.workflow +} + +func (r Postgres) ExecutionRepo() interfaces.ExecutionRepo { + return r.execution +} + func NewPostgresRepo(db *gorm.DB) interfaces.TaskManagmentInterface { return &Postgres{ - task: gormimpl.NewTaskRepo(db), - history: gormimpl.NewTaskHistoryRepo(db), + task: gormimpl.NewTaskRepo(db), + history: gormimpl.NewTaskHistoryRepo(db), + workflow: gormimpl.NewWorkflowRepo(db), + execution: gormimpl.NewExecutionRepo(db), } } diff --git a/server/route/task.go b/server/route/task.go index 218ff3a..013c875 100644 --- a/server/route/task.go +++ b/server/route/task.go @@ -38,6 +38,8 @@ type TaskServer struct { logger *log.Logger validator *protovalidate.Validator metrics *taskMetrics + workflowRepo interfaces.WorkflowRepo + executionRepo interfaces.ExecutionRepo channel chan task.Task maxWorkers int clientHeartbeats sync.Map @@ -102,6 +104,8 @@ func NewTaskServer(repo interfaces.TaskManagmentInterface) cloudv1connect.TaskMa server := &TaskServer{ taskRepo: repo.TaskRepo(), historyRepo: repo.TaskHistoryRepo(), + workflowRepo: repo.WorkflowRepo(), + executionRepo: repo.ExecutionRepo(), logger: log.New(os.Stdout, logPrefix, log.LstdFlags|log.Lshortfile), validator: validator, metrics: newTaskMetrics(),