From b9d87f0203709c508b4a62be665a20b4369f95da Mon Sep 17 00:00:00 2001 From: Walid Baruni Date: Wed, 9 Oct 2024 17:19:48 +0200 Subject: [PATCH] remove event emitter (#4601) --- pkg/eventhandler/chained_handlers.go | 61 ---------- pkg/eventhandler/chained_handlers_test.go | 111 ------------------ pkg/eventhandler/context_provider.go | 81 ------------- pkg/eventhandler/interfaces.go | 21 ---- .../mock_eventhandler/mock_contextprovider.go | 49 -------- .../mock_eventhandler/mock_handlers.go | 55 --------- pkg/eventhandler/tracer.go | 78 ------------ pkg/models/job_event_string.go | 36 ------ pkg/models/jobevent.go | 107 ----------------- pkg/node/requester.go | 31 +---- pkg/orchestrator/callback.go | 22 +--- pkg/orchestrator/endpoint.go | 10 -- pkg/orchestrator/event_emitter.go | 96 --------------- pkg/orchestrator/planner/event_emitter.go | 54 --------- 14 files changed, 8 insertions(+), 804 deletions(-) delete mode 100644 pkg/eventhandler/chained_handlers.go delete mode 100644 pkg/eventhandler/chained_handlers_test.go delete mode 100644 pkg/eventhandler/context_provider.go delete mode 100644 pkg/eventhandler/interfaces.go delete mode 100644 pkg/eventhandler/mock_eventhandler/mock_contextprovider.go delete mode 100644 pkg/eventhandler/mock_eventhandler/mock_handlers.go delete mode 100644 pkg/eventhandler/tracer.go delete mode 100644 pkg/models/job_event_string.go delete mode 100644 pkg/models/jobevent.go delete mode 100644 pkg/orchestrator/event_emitter.go delete mode 100644 pkg/orchestrator/planner/event_emitter.go diff --git a/pkg/eventhandler/chained_handlers.go b/pkg/eventhandler/chained_handlers.go deleted file mode 100644 index 93cbd95273..0000000000 --- a/pkg/eventhandler/chained_handlers.go +++ /dev/null @@ -1,61 +0,0 @@ -package eventhandler - -import ( - "context" - "fmt" - "time" - - "github.com/rs/zerolog/log" - - "github.com/bacalhau-project/bacalhau/pkg/models" -) - -// An event handler implementation that chains multiple event handlers, and accepts a context provider -// to setup up the context once for all handlers. -type ChainedJobEventHandler struct { - eventHandlers []JobEventHandler - contextProvider ContextProvider -} - -func NewChainedJobEventHandler(contextProvider ContextProvider) *ChainedJobEventHandler { - return &ChainedJobEventHandler{contextProvider: contextProvider} -} - -func (r *ChainedJobEventHandler) AddHandlers(handlers ...JobEventHandler) { - r.eventHandlers = append(r.eventHandlers, handlers...) -} - -func (r *ChainedJobEventHandler) HandleJobEvent(ctx context.Context, event models.JobEvent) (err error) { - startTime := time.Now() - defer logEvent(ctx, event, startTime)(&err) - - if r.eventHandlers == nil { - return fmt.Errorf("no event handlers registered") - } - - jobCtx := r.contextProvider.GetContext(ctx, event.JobID) - - // All handlers are called, unless one of them returns an error. - for _, handler := range r.eventHandlers { - if err = handler.HandleJobEvent(jobCtx, event); err != nil { //nolint:gocritic - return err - } - } - return nil -} - -func logEvent(ctx context.Context, event models.JobEvent, startTime time.Time) func(*error) { - return func(handlerError *error) { - logMsg := log.Ctx(ctx).Debug(). - Str("EventName", event.EventName.String()). - Str("JobID", event.JobID). - Str("NodeID", event.SourceNodeID). - Str("Status", event.Status). - Dur("HandleDuration", time.Since(startTime)) - if *handlerError != nil { - logMsg = logMsg.AnErr("HandlerError", *handlerError) - } - - logMsg.Msg("Handled event") - } -} diff --git a/pkg/eventhandler/chained_handlers_test.go b/pkg/eventhandler/chained_handlers_test.go deleted file mode 100644 index cf92117769..0000000000 --- a/pkg/eventhandler/chained_handlers_test.go +++ /dev/null @@ -1,111 +0,0 @@ -//go:build unit || !integration - -package eventhandler - -import ( - "context" - "fmt" - "testing" - - "github.com/google/uuid" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - "go.uber.org/mock/gomock" - - "github.com/bacalhau-project/bacalhau/pkg/eventhandler/mock_eventhandler" - "github.com/bacalhau-project/bacalhau/pkg/logger" - "github.com/bacalhau-project/bacalhau/pkg/models" -) - -// In order for 'go test' to run this suite, we need to create -// a normal test function and pass our suite to suite.Run -func TestChainedHandlers(t *testing.T) { - suite.Run(t, new(jobEventHandlerSuite)) -} - -type jobEventHandlerSuite struct { - suite.Suite - ctrl *gomock.Controller - chainedHandler *ChainedJobEventHandler - handler1 *mock_eventhandler.MockJobEventHandler - handler2 *mock_eventhandler.MockJobEventHandler - contextProvider *mock_eventhandler.MockContextProvider - context context.Context - event models.JobEvent -} - -// Before each test -func (suite *jobEventHandlerSuite) SetupTest() { - suite.ctrl = gomock.NewController(suite.T()) - suite.handler1 = mock_eventhandler.NewMockJobEventHandler(suite.ctrl) - suite.handler2 = mock_eventhandler.NewMockJobEventHandler(suite.ctrl) - suite.contextProvider = mock_eventhandler.NewMockContextProvider(suite.ctrl) - suite.chainedHandler = NewChainedJobEventHandler(suite.contextProvider) - suite.context = context.WithValue(context.Background(), "test", "test") - suite.event = models.JobEvent{ - EventName: models.JobEventCreated, - JobID: uuid.NewString(), - SourceNodeID: "nodeA", - Status: "this is a test event", - } - logger.ConfigureTestLogging(suite.T()) -} - -func (suite *jobEventHandlerSuite) TearDownTest() { - suite.ctrl.Finish() -} - -func (suite *jobEventHandlerSuite) TestChainedJobEventHandler_HandleJobEvent() { - suite.chainedHandler.AddHandlers(suite.handler1, suite.handler2) - ctx := context.Background() - - // assert context provider is called with the correct context and job id - suite.contextProvider.EXPECT().GetContext(ctx, suite.event.JobID).Return(suite.context) - - // assert both handlers are called with the context provider's context and event - gomock.InOrder( - suite.handler1.EXPECT().HandleJobEvent(suite.context, suite.event).Return(nil), - suite.handler2.EXPECT().HandleJobEvent(suite.context, suite.event).Return(nil), - ) - - // assert no error was returned - require.NoError(suite.T(), suite.chainedHandler.HandleJobEvent(ctx, suite.event)) -} - -func (suite *jobEventHandlerSuite) TestChainedJobEventHandler_HandleJobEventLazilyAdded() { - suite.chainedHandler.AddHandlers(suite.handler1) - suite.chainedHandler.AddHandlers(suite.handler2) - ctx := context.Background() - - // assert context provider is called with the correct context and job id - suite.contextProvider.EXPECT().GetContext(ctx, suite.event.JobID).Return(suite.context) - - // assert both handlers are called with the context provider's context and event - gomock.InOrder( - suite.handler1.EXPECT().HandleJobEvent(suite.context, suite.event).Return(nil), - suite.handler2.EXPECT().HandleJobEvent(suite.context, suite.event).Return(nil), - ) - - // assert no error was returned - require.NoError(suite.T(), suite.chainedHandler.HandleJobEvent(ctx, suite.event)) -} - -func (suite *jobEventHandlerSuite) TestChainedJobEventHandler_HandleJobEventError() { - suite.chainedHandler.AddHandlers(suite.handler1) - suite.chainedHandler.AddHandlers(suite.handler2) - ctx := context.Background() - mockError := fmt.Errorf("i am an error") - - // assert context provider is called with the correct context and job id - suite.contextProvider.EXPECT().GetContext(ctx, suite.event.JobID).Return(suite.context) - - // mock first handler to return an error, and don't expect the second handler to be called - suite.handler1.EXPECT().HandleJobEvent(suite.context, suite.event).Return(mockError) - - // assert no error was returned - require.Equal(suite.T(), mockError, suite.chainedHandler.HandleJobEvent(ctx, suite.event)) -} - -func (suite *jobEventHandlerSuite) TestChainedJobEventHandler_HandleJobEventEmptyHandlers() { - require.Error(suite.T(), suite.chainedHandler.HandleJobEvent(context.Background(), suite.event)) -} diff --git a/pkg/eventhandler/context_provider.go b/pkg/eventhandler/context_provider.go deleted file mode 100644 index 774878edcd..0000000000 --- a/pkg/eventhandler/context_provider.go +++ /dev/null @@ -1,81 +0,0 @@ -package eventhandler - -import ( - "context" - "sync" - - "go.opentelemetry.io/otel/attribute" - oteltrace "go.opentelemetry.io/otel/trace" - - "github.com/bacalhau-project/bacalhau/pkg/models" - "github.com/bacalhau-project/bacalhau/pkg/telemetry" -) - -// Interface for a context provider that can be used to generate a context to be used to handle -// job events. -type ContextProvider interface { - GetContext(ctx context.Context, jobID string) context.Context -} - -// TracerContextProvider is a context provider that generates a context along with tracing information. -// It also implements JobEventHandler to end the local lifecycle context for a job when it is completed. -type TracerContextProvider struct { - nodeID string - jobNodeContexts map[string]context.Context // per-node job lifecycle - contextMutex sync.RWMutex -} - -func NewTracerContextProvider(nodeID string) *TracerContextProvider { - return &TracerContextProvider{ - nodeID: nodeID, - jobNodeContexts: make(map[string]context.Context), - } -} - -func (t *TracerContextProvider) GetContext(ctx context.Context, jobID string) context.Context { - t.contextMutex.Lock() - defer t.contextMutex.Unlock() - - jobCtx, _ := telemetry.Span(ctx, "pkg/eventhandler/JobEventHandler.HandleJobEvent", - oteltrace.WithSpanKind(oteltrace.SpanKindInternal), - oteltrace.WithAttributes( - attribute.String(telemetry.TracerAttributeNameNodeID, t.nodeID), - attribute.String(telemetry.TracerAttributeNameJobID, jobID), - ), - ) - - // keep the latest context to clean it up during shutdown if necessary - t.jobNodeContexts[jobID] = jobCtx - return jobCtx -} - -func (t *TracerContextProvider) HandleJobEvent(ctx context.Context, event models.JobEvent) error { - // If the event is known to be terminal, end the local lifecycle context: - if event.EventName.IsTerminal() { - t.endJobNodeContext(ctx, event.JobID) - } - - return nil -} - -func (t *TracerContextProvider) Shutdown() error { - t.contextMutex.RLock() - defer t.contextMutex.RUnlock() - - for _, ctx := range t.jobNodeContexts { - oteltrace.SpanFromContext(ctx).End() - } - - // clear the maps - t.jobNodeContexts = make(map[string]context.Context) - - return nil -} - -// endJobNodeContext ends the local lifecycle context for a job. -func (t *TracerContextProvider) endJobNodeContext(ctx context.Context, jobID string) { - oteltrace.SpanFromContext(ctx).End() - t.contextMutex.Lock() - defer t.contextMutex.Unlock() - delete(t.jobNodeContexts, jobID) -} diff --git a/pkg/eventhandler/interfaces.go b/pkg/eventhandler/interfaces.go deleted file mode 100644 index 5c332daa28..0000000000 --- a/pkg/eventhandler/interfaces.go +++ /dev/null @@ -1,21 +0,0 @@ -package eventhandler - -//go:generate mockgen --source interfaces.go --destination mock_eventhandler/mock_handlers.go --package mock_eventhandler - -import ( - "context" - - "github.com/bacalhau-project/bacalhau/pkg/models" -) - -// A job event handler is a component that is notified of events related to jobs. -type JobEventHandler interface { - HandleJobEvent(ctx context.Context, event models.JobEvent) error -} - -// function that implements the JobEventHandler interface -type JobEventHandlerFunc func(ctx context.Context, event models.JobEvent) error - -func (f JobEventHandlerFunc) HandleJobEvent(ctx context.Context, event models.JobEvent) error { - return f(ctx, event) -} diff --git a/pkg/eventhandler/mock_eventhandler/mock_contextprovider.go b/pkg/eventhandler/mock_eventhandler/mock_contextprovider.go deleted file mode 100644 index 4bb1fc0721..0000000000 --- a/pkg/eventhandler/mock_eventhandler/mock_contextprovider.go +++ /dev/null @@ -1,49 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: system/context_provider.go - -// Package mock_system is a generated GoMock package. -package mock_eventhandler - -import ( - context "context" - reflect "reflect" - - gomock "go.uber.org/mock/gomock" -) - -// MockContextProvider is a mock of ContextProvider interface. -type MockContextProvider struct { - ctrl *gomock.Controller - recorder *MockContextProviderMockRecorder -} - -// MockContextProviderMockRecorder is the mock recorder for MockContextProvider. -type MockContextProviderMockRecorder struct { - mock *MockContextProvider -} - -// NewMockContextProvider creates a new mock instance. -func NewMockContextProvider(ctrl *gomock.Controller) *MockContextProvider { - mock := &MockContextProvider{ctrl: ctrl} - mock.recorder = &MockContextProviderMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockContextProvider) EXPECT() *MockContextProviderMockRecorder { - return m.recorder -} - -// GetContext mocks base method. -func (m *MockContextProvider) GetContext(ctx context.Context, jobID string) context.Context { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetContext", ctx, jobID) - ret0, _ := ret[0].(context.Context) - return ret0 -} - -// GetContext indicates an expected call of GetContext. -func (mr *MockContextProviderMockRecorder) GetContext(ctx, jobID interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetContext", reflect.TypeOf((*MockContextProvider)(nil).GetContext), ctx, jobID) -} diff --git a/pkg/eventhandler/mock_eventhandler/mock_handlers.go b/pkg/eventhandler/mock_eventhandler/mock_handlers.go deleted file mode 100644 index 832e5914df..0000000000 --- a/pkg/eventhandler/mock_eventhandler/mock_handlers.go +++ /dev/null @@ -1,55 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: interfaces.go -// -// Generated by this command: -// -// mockgen --source interfaces.go --destination mock_eventhandler/mock_handlers.go --package mock_eventhandler -// - -// Package mock_eventhandler is a generated GoMock package. -package mock_eventhandler - -import ( - context "context" - reflect "reflect" - - models "github.com/bacalhau-project/bacalhau/pkg/models" - gomock "go.uber.org/mock/gomock" -) - -// MockJobEventHandler is a mock of JobEventHandler interface. -type MockJobEventHandler struct { - ctrl *gomock.Controller - recorder *MockJobEventHandlerMockRecorder -} - -// MockJobEventHandlerMockRecorder is the mock recorder for MockJobEventHandler. -type MockJobEventHandlerMockRecorder struct { - mock *MockJobEventHandler -} - -// NewMockJobEventHandler creates a new mock instance. -func NewMockJobEventHandler(ctrl *gomock.Controller) *MockJobEventHandler { - mock := &MockJobEventHandler{ctrl: ctrl} - mock.recorder = &MockJobEventHandlerMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockJobEventHandler) EXPECT() *MockJobEventHandlerMockRecorder { - return m.recorder -} - -// HandleJobEvent mocks base method. -func (m *MockJobEventHandler) HandleJobEvent(ctx context.Context, event models.JobEvent) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "HandleJobEvent", ctx, event) - ret0, _ := ret[0].(error) - return ret0 -} - -// HandleJobEvent indicates an expected call of HandleJobEvent. -func (mr *MockJobEventHandlerMockRecorder) HandleJobEvent(ctx, event any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandleJobEvent", reflect.TypeOf((*MockJobEventHandler)(nil).HandleJobEvent), ctx, event) -} diff --git a/pkg/eventhandler/tracer.go b/pkg/eventhandler/tracer.go deleted file mode 100644 index 4db8f63dcb..0000000000 --- a/pkg/eventhandler/tracer.go +++ /dev/null @@ -1,78 +0,0 @@ -package eventhandler - -import ( - "context" - "fmt" - "io/fs" - "os" - - "github.com/rs/zerolog" - - "github.com/bacalhau-project/bacalhau/pkg/lib/marshaller" - "github.com/bacalhau-project/bacalhau/pkg/models" -) - -// Tracer is a JobEventHandler that will marshal the received event to a -// file-based log. -// -// Note that we don't need any mutexes here because writing to an os.File is -// thread-safe (see https://github.com/rs/zerolog/blob/master/writer.go#L33) -type Tracer struct { - LogFile *os.File - Logger zerolog.Logger -} - -const eventTracerFilePerms fs.FileMode = 0644 - -// Returns an eventhandler.Tracer that writes to config.GetEventTracerPath(), or -// an error if the file can't be opened. -func NewTracer(path string) (*Tracer, error) { - return NewTracerToFile(path) -} - -// Returns an eventhandler.Tracer that writes to the specified filename, or an -// error if the file can't be opened. -func NewTracerToFile(filename string) (*Tracer, error) { - file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, eventTracerFilePerms) - if err != nil { - return nil, err - } - - return &Tracer{ - LogFile: file, - Logger: zerolog.New(file).With().Timestamp().Logger(), - }, nil -} - -// HandleJobEvent implements JobEventHandler -func (t *Tracer) HandleJobEvent(ctx context.Context, event models.JobEvent) error { - trace(t.Logger, event) - return nil -} - -func trace[Event any](log zerolog.Logger, event Event) { - log.Log(). - Str("Type", fmt.Sprintf("%T", event)). - Func(func(e *zerolog.Event) { - // TODO: #828 Potential hotspot - marshaling is expensive, and - // we do it for every event. - eventJSON, err := marshaller.JSONMarshalWithMax(event) - if err == nil { - e.RawJSON("Event", eventJSON) - } else { - e.AnErr("MarshalError", err) - } - }).Send() -} - -func (t *Tracer) Shutdown() error { - if t.LogFile != nil { - err := t.LogFile.Close() - t.LogFile = nil - t.Logger = zerolog.Nop() - return err - } - return nil -} - -var _ JobEventHandler = (*Tracer)(nil) diff --git a/pkg/models/job_event_string.go b/pkg/models/job_event_string.go deleted file mode 100644 index 1d345bf277..0000000000 --- a/pkg/models/job_event_string.go +++ /dev/null @@ -1,36 +0,0 @@ -// Code generated by "stringer -type=JobEventType --trimprefix=JobEvent --output job_event_string.go"; DO NOT EDIT. - -package models - -import "strconv" - -func _() { - // An "invalid array index" compiler error signifies that the constant values have changed. - // Re-run the stringer command to generate them again. - var x [1]struct{} - _ = x[jobEventUndefined-0] - _ = x[JobEventCreated-1] - _ = x[JobEventBid-2] - _ = x[JobEventBidAccepted-3] - _ = x[JobEventBidRejected-4] - _ = x[JobEventComputeError-5] - _ = x[JobEventResultsProposed-6] - _ = x[JobEventResultsAccepted-7] - _ = x[JobEventResultsRejected-8] - _ = x[JobEventResultsPublished-9] - _ = x[JobEventError-10] - _ = x[JobEventCanceled-11] - _ = x[JobEventCompleted-12] - _ = x[jobEventDone-13] -} - -const _JobEventType_name = "jobEventUndefinedCreatedBidBidAcceptedBidRejectedComputeErrorResultsProposedResultsAcceptedResultsRejectedResultsPublishedErrorCanceledCompletedjobEventDone" - -var _JobEventType_index = [...]uint8{0, 17, 24, 27, 38, 49, 61, 76, 91, 106, 122, 127, 135, 144, 156} - -func (i JobEventType) String() string { - if i < 0 || i >= JobEventType(len(_JobEventType_index)-1) { - return "JobEventType(" + strconv.FormatInt(int64(i), 10) + ")" - } - return _JobEventType_name[_JobEventType_index[i]:_JobEventType_index[i+1]] -} diff --git a/pkg/models/jobevent.go b/pkg/models/jobevent.go deleted file mode 100644 index 54de944a22..0000000000 --- a/pkg/models/jobevent.go +++ /dev/null @@ -1,107 +0,0 @@ -package models - -import ( - "fmt" - "time" -) - -//go:generate stringer -type=JobEventType --trimprefix=JobEvent --output job_event_string.go -type JobEventType int - -const ( - jobEventUndefined JobEventType = iota // must be first - - // Job has been created on the requester node - JobEventCreated - - // a compute node bid on a job - JobEventBid - - // a requester node accepted for rejected a job bid - JobEventBidAccepted - JobEventBidRejected - - // a compute node had an error running a job - JobEventComputeError - - // a compute node completed running a job - JobEventResultsProposed - - // a Requester node accepted the results from a node for a job - JobEventResultsAccepted - - // a Requester node rejected the results from a node for a job - JobEventResultsRejected - - // once the results have been accepted or rejected - // the compute node will publish them and issue this event - JobEventResultsPublished - - // a requester node declared an error running a job - JobEventError - - // a user canceled a job - JobEventCanceled - - // a job has been completed - JobEventCompleted - - jobEventDone // must be last -) - -func (je JobEventType) IsUndefined() bool { - return je == jobEventUndefined -} - -// IsTerminal returns true if the given event type signals the end of the -// lifecycle of a job. After this, all nodes can safely ignore the job. -func (je JobEventType) IsTerminal() bool { - return je == JobEventError || je == JobEventCompleted || je == JobEventCanceled -} - -func ParseJobEventType(str string) (JobEventType, error) { - for typ := jobEventUndefined + 1; typ < jobEventDone; typ++ { - if equal(typ.String(), str) { - return typ, nil - } - } - - return jobEventUndefined, fmt.Errorf( - "executor: unknown job event type '%s'", str) -} - -func JobEventTypes() []JobEventType { - var res []JobEventType - for typ := jobEventUndefined + 1; typ < jobEventDone; typ++ { - res = append(res, typ) - } - - return res -} - -func (je JobEventType) MarshalText() ([]byte, error) { - return []byte(je.String()), nil -} - -func (je *JobEventType) UnmarshalText(text []byte) (err error) { - name := string(text) - *je, err = ParseJobEventType(name) - return -} - -// TODO remove this https://github.com/bacalhau-project/bacalhau/issues/4185 -type JobEvent struct { - JobID string `json:"JobID,omitempty" example:"9304c616-291f-41ad-b862-54e133c0149e"` - // compute execution identifier - ExecutionID string `json:"ExecutionID,omitempty" example:"9304c616-291f-41ad-b862-54e133c0149e"` - // the node that emitted this event - SourceNodeID string `json:"SourceNodeID,omitempty" example:"QmXaXu9N5GNetatsvwnTfQqNtSeKAD6uCmarbh3LMRYAcF"` - // the node that this event is for - // e.g. "AcceptJobBid" was emitted by Requester but it targeting compute node - TargetNodeID string `json:"TargetNodeID,omitempty" example:"QmdZQ7ZbhnvWY1J12XYKGHApJ6aufKyLNSvf8jZBrBaAVL"` - - EventName JobEventType `json:"EventName,omitempty"` - Status string `json:"Status,omitempty" example:"Got results proposal of length: 0"` - - EventTime time.Time `json:"EventTime,omitempty" example:"2022-11-17T13:32:55.756658941Z"` -} diff --git a/pkg/node/requester.go b/pkg/node/requester.go index ee73b00b9c..96166b5285 100644 --- a/pkg/node/requester.go +++ b/pkg/node/requester.go @@ -36,7 +36,6 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/util" "github.com/bacalhau-project/bacalhau/pkg/compute" - "github.com/bacalhau-project/bacalhau/pkg/eventhandler" "github.com/bacalhau-project/bacalhau/pkg/jobstore" "github.com/bacalhau-project/bacalhau/pkg/orchestrator/selection/discovery" "github.com/bacalhau-project/bacalhau/pkg/orchestrator/selection/ranking" @@ -76,14 +75,6 @@ func NewRequesterNode( return nil, err } - // prepare event handlers - tracerContextProvider := eventhandler.NewTracerContextProvider(nodeID) - localJobEventConsumer := eventhandler.NewChainedJobEventHandler(tracerContextProvider) - - eventEmitter := orchestrator.NewEventEmitter(orchestrator.EventEmitterParams{ - EventConsumer: localJobEventConsumer, - }) - jobStore, err := createJobStore(ctx, cfg) if err != nil { return nil, err @@ -120,12 +111,6 @@ func NewRequesterNode( JobStore: jobStore, }), - // planner that publishes events on job completion or failure - planner.NewEventEmitter(planner.EventEmitterParams{ - ID: nodeID, - EventEmitter: eventEmitter, - }), - // logs job completion or failure planner.NewLoggingPlanner(), ) @@ -227,7 +212,6 @@ func NewRequesterNode( endpointV2 := orchestrator.NewBaseEndpoint(&orchestrator.BaseEndpointParams{ ID: nodeID, Store: jobStore, - EventEmitter: eventEmitter, ComputeProxy: computeProxy, JobTransformer: jobTransformers, TaskTranslator: translationProvider, @@ -268,12 +252,6 @@ func NewRequesterNode( ) auth_endpoint.BindEndpoint(ctx, apiServer.Router, authenticators) - // order of event handlers is important as triggering some handlers might depend on the state of others. - localJobEventConsumer.AddHandlers( - // ends the span for the job if received a terminal event - tracerContextProvider, - ) - // ncl subscriber, err := ncl.NewSubscriber(transportLayer.Client(), ncl.WithSubscriberMessageSerDeRegistry(messageSerDeRegistry), @@ -302,10 +280,6 @@ func NewRequesterNode( } evalBroker.SetEnabled(false) - cleanupErr = tracerContextProvider.Shutdown() - if cleanupErr != nil { - util.LogDebugIfContextCancelled(ctx, cleanupErr, "failed to shutdown tracer context provider") - } // Close the jobstore after the evaluation broker is disabled cleanupErr = jobStore.Close(ctx) if cleanupErr != nil { @@ -317,9 +291,8 @@ func NewRequesterNode( // It provides the compute call back endpoints for interacting with compute nodes. // e.g. bidding, job completions, cancellations, and failures callback := orchestrator.NewCallback(&orchestrator.CallbackParams{ - ID: nodeID, - EventEmitter: eventEmitter, - Store: jobStore, + ID: nodeID, + Store: jobStore, }) if err = transportLayer.RegisterComputeCallback(callback); err != nil { return nil, err diff --git a/pkg/orchestrator/callback.go b/pkg/orchestrator/callback.go index 4550bf8369..6be49b86b4 100644 --- a/pkg/orchestrator/callback.go +++ b/pkg/orchestrator/callback.go @@ -13,23 +13,20 @@ import ( ) type CallbackParams struct { - ID string - Store jobstore.Store - EventEmitter EventEmitter + ID string + Store jobstore.Store } // Callback base implementation of requester Endpoint type Callback struct { - id string - store jobstore.Store - eventEmitter EventEmitter + id string + store jobstore.Store } func NewCallback(params *CallbackParams) *Callback { return &Callback{ - id: params.ID, - store: params.Store, - eventEmitter: params.EventEmitter, + id: params.ID, + store: params.Store, } } @@ -96,16 +93,11 @@ func (e *Callback) OnBidComplete(ctx context.Context, response compute.BidResult log.Ctx(ctx).Error().Err(err).Msgf("[OnBidComplete] failed to commit transaction") return } - - if response.Accepted { - e.eventEmitter.EmitBidReceived(ctx, response) - } } func (e *Callback) OnRunComplete(ctx context.Context, result compute.RunResult) { log.Ctx(ctx).Debug().Msgf("Requester node %s received RunComplete for execution: %s from %s", e.id, result.ExecutionID, result.SourcePeerID) - e.eventEmitter.EmitRunComplete(ctx, result) txContext, err := e.store.BeginTx(ctx) if err != nil { @@ -223,8 +215,6 @@ func (e *Callback) OnComputeFailure(ctx context.Context, result compute.ComputeE log.Ctx(ctx).Error().Err(err).Msgf("[OnComputeFailure] failed to commit transaction") return } - - e.eventEmitter.EmitComputeFailure(ctx, result.ExecutionID, result) } // enqueueEvaluation enqueues an evaluation to allow the scheduler to either accept the bid, or find a new node diff --git a/pkg/orchestrator/endpoint.go b/pkg/orchestrator/endpoint.go index dfe1a5da90..caf28c0d78 100644 --- a/pkg/orchestrator/endpoint.go +++ b/pkg/orchestrator/endpoint.go @@ -24,7 +24,6 @@ import ( type BaseEndpointParams struct { ID string Store jobstore.Store - EventEmitter EventEmitter ComputeProxy compute.Endpoint JobTransformer transformer.JobTransformer TaskTranslator translation.TranslatorProvider @@ -34,7 +33,6 @@ type BaseEndpointParams struct { type BaseEndpoint struct { id string store jobstore.Store - eventEmitter EventEmitter computeProxy compute.Endpoint jobTransformer transformer.JobTransformer taskTranslator translation.TranslatorProvider @@ -45,7 +43,6 @@ func NewBaseEndpoint(params *BaseEndpointParams) *BaseEndpoint { return &BaseEndpoint{ id: params.ID, store: params.Store, - eventEmitter: params.EventEmitter, computeProxy: params.ComputeProxy, jobTransformer: params.JobTransformer, taskTranslator: params.TaskTranslator, @@ -145,7 +142,6 @@ func (e *BaseEndpoint) SubmitJob(ctx context.Context, request *SubmitJobRequest) return nil, err } - e.eventEmitter.EmitJobCreated(ctx, *job) return &SubmitJobResponse{ JobID: job.ID, EvaluationID: eval.ID, @@ -223,12 +219,6 @@ func (e *BaseEndpoint) StopJob(ctx context.Context, request *StopJobRequest) (St return StopJobResponse{}, err } - e.eventEmitter.EmitEventSilently(ctx, models.JobEvent{ - JobID: request.JobID, - EventName: models.JobEventCanceled, - Status: request.Reason, - EventTime: time.Now(), - }) return StopJobResponse{ EvaluationID: evalID, }, nil diff --git a/pkg/orchestrator/event_emitter.go b/pkg/orchestrator/event_emitter.go deleted file mode 100644 index 513caa25f5..0000000000 --- a/pkg/orchestrator/event_emitter.go +++ /dev/null @@ -1,96 +0,0 @@ -package orchestrator - -import ( - "context" - "time" - - "github.com/rs/zerolog/log" - - "github.com/bacalhau-project/bacalhau/pkg/compute" - "github.com/bacalhau-project/bacalhau/pkg/eventhandler" - "github.com/bacalhau-project/bacalhau/pkg/models" -) - -// A quick workaround to publish job events locally as we still have some types that rely -// on job events to update their states (e.g. localdb) and to take actions (e.g. websockets and logging) -// TODO: create a strongly typed local event emitter, and update localdb directly from -// -// requester instead of consuming events. -type EventEmitterParams struct { - EventConsumer eventhandler.JobEventHandler -} - -type EventEmitter struct { - eventConsumer eventhandler.JobEventHandler -} - -func NewEventEmitter(params EventEmitterParams) EventEmitter { - return EventEmitter{ - eventConsumer: params.EventConsumer, - } -} - -func (e EventEmitter) EmitJobCreated( - ctx context.Context, job models.Job) { - event := models.JobEvent{ - JobID: job.ID, - SourceNodeID: job.Meta[models.MetaRequesterID], - EventName: models.JobEventCreated, - EventTime: time.Now(), - } - e.EmitEventSilently(ctx, event) -} - -func (e EventEmitter) EmitBidReceived( - ctx context.Context, result compute.BidResult) { - e.EmitEventSilently(ctx, e.constructEvent(result.RoutingMetadata, result.ExecutionMetadata, models.JobEventBid)) -} - -func (e EventEmitter) EmitBidAccepted( - ctx context.Context, request compute.BidAcceptedRequest, response compute.BidAcceptedResponse) { - e.EmitEventSilently(ctx, e.constructEvent(request.RoutingMetadata, response.ExecutionMetadata, models.JobEventBidAccepted)) -} - -func (e EventEmitter) EmitBidRejected( - ctx context.Context, request compute.BidRejectedRequest, response compute.BidRejectedResponse) { - e.EmitEventSilently(ctx, e.constructEvent(request.RoutingMetadata, response.ExecutionMetadata, models.JobEventBidRejected)) -} - -func (e EventEmitter) EmitRunComplete(ctx context.Context, response compute.RunResult) { - e.EmitEventSilently(ctx, e.constructEvent(response.RoutingMetadata, response.ExecutionMetadata, models.JobEventResultsProposed)) -} - -func (e EventEmitter) EmitComputeFailure(ctx context.Context, executionID string, err error) { - event := models.JobEvent{ - ExecutionID: executionID, - EventName: models.JobEventComputeError, - Status: err.Error(), - EventTime: time.Now(), - } - e.EmitEventSilently(ctx, event) -} - -func (e EventEmitter) constructEvent( - routingMetadata compute.RoutingMetadata, - executionMetadata compute.ExecutionMetadata, - eventName models.JobEventType) models.JobEvent { - return models.JobEvent{ - TargetNodeID: routingMetadata.TargetPeerID, - SourceNodeID: routingMetadata.SourcePeerID, - JobID: executionMetadata.JobID, - ExecutionID: executionMetadata.ExecutionID, - EventName: eventName, - EventTime: time.Now(), - } -} - -func (e EventEmitter) EmitEvent(ctx context.Context, event models.JobEvent) error { - return e.eventConsumer.HandleJobEvent(ctx, event) -} - -func (e EventEmitter) EmitEventSilently(ctx context.Context, event models.JobEvent) { - err := e.EmitEvent(ctx, event) - if err != nil { - log.Ctx(ctx).Error().Err(err).Msgf("failed to emit event %+v", event) - } -} diff --git a/pkg/orchestrator/planner/event_emitter.go b/pkg/orchestrator/planner/event_emitter.go deleted file mode 100644 index e7ea6bea28..0000000000 --- a/pkg/orchestrator/planner/event_emitter.go +++ /dev/null @@ -1,54 +0,0 @@ -package planner - -import ( - "context" - "time" - - "github.com/bacalhau-project/bacalhau/pkg/models" - "github.com/bacalhau-project/bacalhau/pkg/orchestrator" -) - -// EventEmitter is a planner implementation that emits events based on the job state. -type EventEmitter struct { - id string - eventEmitter orchestrator.EventEmitter -} - -// EventEmitterParams holds the parameters for creating a new EventEmitter. -type EventEmitterParams struct { - ID string - EventEmitter orchestrator.EventEmitter -} - -// NewEventEmitter creates a new instance of EventEmitter. -func NewEventEmitter(params EventEmitterParams) *EventEmitter { - return &EventEmitter{ - id: params.ID, - eventEmitter: params.EventEmitter, - } -} - -// Process updates the state of the executions in the plan according to the scheduler's desired state. -func (s *EventEmitter) Process(ctx context.Context, plan *models.Plan) error { - var eventName models.JobEventType - switch plan.DesiredJobState { - case models.JobStateTypeCompleted: - eventName = models.JobEventCompleted - case models.JobStateTypeFailed: - eventName = models.JobEventError - default: - } - if !eventName.IsUndefined() { - s.eventEmitter.EmitEventSilently(ctx, models.JobEvent{ - SourceNodeID: s.id, - JobID: plan.Job.ID, - Status: plan.UpdateMessage, - EventName: eventName, - EventTime: time.Now(), - }) - } - return nil -} - -// compile-time check whether the EventEmitter implements the Planner interface. -var _ orchestrator.Planner = (*EventEmitter)(nil)