Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Addresses multiple changes #198

Merged
merged 10 commits into from
Jan 8, 2025
27 changes: 15 additions & 12 deletions controllers/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,20 @@ const (

// ChannelController is for /channel/:prodId
type ChannelController struct {
ChannelRepo storage.ChannelRepository
ConsumersEndpoint EndpointController
MessagesEndpoint EndpointController
BroadcastEndpoint EndpointController
ChannelRepo storage.ChannelRepository
ConsumersEndpoint EndpointController
MessagesEndpoint EndpointController
BroadcastEndpoint EndpointController
MessagesStatusEndpoint EndpointController
}

// ChannelModel represents the Channel data
type ChannelModel struct {
MsgStakeholder
ConsumersURL string
MessagesURL string
BroadcastURL string
ConsumersURL string
MessagesURL string
BroadcastURL string
MessagesStatusURL string
}

// Get implements the /channel/:prodId GET endpoint
Expand Down Expand Up @@ -58,9 +60,10 @@ func (channelController *ChannelController) Put(w http.ResponseWriter, r *http.R
func (channelController *ChannelController) getChannelModel(channel *data.Channel) *ChannelModel {
channelIDParam := httprouter.Param{Key: channelIDPathParamKey, Value: channel.ChannelID}
return &ChannelModel{MsgStakeholder: *getMessageStakeholder(channel.ChannelID, &channel.MessageStakeholder),
ConsumersURL: channelController.ConsumersEndpoint.FormatAsRelativeLink(channelIDParam),
MessagesURL: channelController.MessagesEndpoint.FormatAsRelativeLink(channelIDParam),
BroadcastURL: channelController.BroadcastEndpoint.FormatAsRelativeLink(channelIDParam)}
ConsumersURL: channelController.ConsumersEndpoint.FormatAsRelativeLink(channelIDParam),
MessagesURL: channelController.MessagesEndpoint.FormatAsRelativeLink(channelIDParam),
BroadcastURL: channelController.BroadcastEndpoint.FormatAsRelativeLink(channelIDParam),
MessagesStatusURL: channelController.MessagesStatusEndpoint.FormatAsRelativeLink(channelIDParam)}
}

// GetPath returns the endpoint's path
Expand Down Expand Up @@ -105,8 +108,8 @@ func (channelsController *ChannelsController) FormatAsRelativeLink(params ...htt
}

// NewChannelController initialize new channels controller
func NewChannelController(consumersController *ConsumersController, messagesController *MessagesController, broadcastController *BroadcastController, channelRepo storage.ChannelRepository) *ChannelController {
return &ChannelController{ChannelRepo: channelRepo, ConsumersEndpoint: consumersController, MessagesEndpoint: messagesController, BroadcastEndpoint: broadcastController}
func NewChannelController(consumersController *ConsumersController, messagesController *MessagesController, broadcastController *BroadcastController, messagesStatusController *MessagesStatusController, channelRepo storage.ChannelRepository) *ChannelController {
return &ChannelController{ChannelRepo: channelRepo, ConsumersEndpoint: consumersController, MessagesEndpoint: messagesController, BroadcastEndpoint: broadcastController, MessagesStatusEndpoint: messagesStatusController}
}

// NewChannelsController initialize new channels controller
Expand Down
3 changes: 2 additions & 1 deletion controllers/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func TestChannelGet(t *testing.T) {
assert.Equal(t, "/channel/"+listTestChannelIDPrefix+"0/consumers", bodyChannel.ConsumersURL)
assert.Equal(t, "/channel/"+listTestChannelIDPrefix+"0/messages", bodyChannel.MessagesURL)
assert.Equal(t, "/channel/"+listTestChannelIDPrefix+"0/broadcast", bodyChannel.BroadcastURL)
assert.Equal(t, "/channel/"+listTestChannelIDPrefix+"0/messages-status", bodyChannel.MessagesStatusURL)
assert.NotNil(t, bodyChannel.ChangedAt)
assert.Equal(t, bodyChannel.ChangedAt.Format(http.TimeFormat), rr.HeaderMap.Get(headerLastModified))
})
Expand All @@ -150,7 +151,7 @@ func TestChannelGet(t *testing.T) {

func getNewChannelController(channelRepo storage.ChannelRepository) *ChannelController {
bc, _ := getNewBroadcastController(messageRepo)
return NewChannelController(NewConsumersController(NewConsumerController(nil, nil, getDLQControllerWithMockedRepo()), nil), getMessagesController(), bc, channelRepo)
return NewChannelController(NewConsumersController(NewConsumerController(nil, nil, getDLQControllerWithMockedRepo()), nil), getMessagesController(), bc, NewMessagesStatusController(getMessagesController(), messageRepo), channelRepo)
}

func TestChannelPut(t *testing.T) {
Expand Down
40 changes: 40 additions & 0 deletions controllers/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
jobsPath = consumerPath + "/queued-jobs"
jobIDPathParamKey = "jobId"
jobPath = consumerPath + "/job/:" + jobIDPathParamKey
jobRequeuePath = consumerPath + "/job/:" + jobIDPathParamKey + "/requeue-dead-job"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a note... requeue-dead-job sounds kinda unnecessarily verbose. maybe requeue should convey the same message?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think requeue is misleading to be honest since not every job can be requeued; hence this name. I prefer verbose names since ideally no will have to "craft" the URLs but rather discover them; for example requeue URLs are discoverable in the single job get endpoint and the DLQ endpoint.

defaultPageSize = 25
maxPageSize = 100
)
Expand Down Expand Up @@ -167,6 +168,45 @@ func getConsumerWithValidation(w http.ResponseWriter, r *http.Request, params ht
return consumer, valid
}

type JobRequeueController struct {
DeliveryJobRepo storage.DeliveryJobRepository
ChannelRepo storage.ChannelRepository
ConsumerRepo storage.ConsumerRepository
}

// NewJobRequeueController creates and returns a new instance of JobRequeueController
func NewJobRequeueController(deliveryJobRepo storage.DeliveryJobRepository, channelRepo storage.ChannelRepository, consumerRepo storage.ConsumerRepository) *JobRequeueController {
return &JobRequeueController{DeliveryJobRepo: deliveryJobRepo, ChannelRepo: channelRepo, ConsumerRepo: consumerRepo}
}

// GetPath returns the endpoint's path
func (controller *JobRequeueController) GetPath() string {
return jobRequeuePath
}

// FormatAsRelativeLink Format as relative URL of this resource based on the params
func (controller *JobRequeueController) FormatAsRelativeLink(params ...httprouter.Param) string {
return formatURL(params, controller.GetPath(), channelIDPathParamKey, consumerIDPathParamKey, jobIDPathParamKey)
}

// Post implements the POST /channel/:channelId/consumer/:consumerId/job/:jobId/requeue-dead-job
func (controller *JobRequeueController) Post(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
job, valid := getJobWithValidation(w, r, params, controller.ChannelRepo, controller.ConsumerRepo, controller.DeliveryJobRepo)
if !valid {
return
}
if job.Status != data.JobDead {
writeStatus(w, http.StatusBadRequest, errJobDoesNotExist)
return
}
err := controller.DeliveryJobRepo.RequeueDeadJob(job)
if err == nil {
writeStatus(w, http.StatusAccepted, nil)
} else {
writeErr(w, err)
}
}

// JobController represents all endpoints related to a single job for a consumer
type JobController struct {
ChannelRepo storage.ChannelRepository
Expand Down
110 changes: 110 additions & 0 deletions controllers/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package controllers

import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
Expand Down Expand Up @@ -877,3 +879,111 @@ func TestJobControllerPost_TransitionFailureTimeout(t *testing.T) {
runInvalidTransitionTest(invalidTransition.nextState, invalidTransition.timeout)
}
}

func TestJobRequeueController_Post(t *testing.T) {
repo := new(storagemocks.DeliveryJobRepository)
channelRepo := new(storagemocks.ChannelRepository)
consumerRepo := new(storagemocks.ConsumerRepository)

controller := NewJobRequeueController(repo, channelRepo, consumerRepo)
router := createTestRouter(controller)

channelID := "test-channel"
consumerID := "test-consumer"
jobID := "test-job-id"
token := "test-token"

t.Run("Invalid Job Status", func(t *testing.T) {
mockChannel, _ := data.NewChannel(channelID, token)
mockChannel.QuickFix()
channelRepo.On("Get", channelID).Return(mockChannel, nil)
mockConsumer, _ := data.NewConsumer(mockChannel, consumerID, token, callbackURL, data.PullConsumer.String())
mockConsumer.QuickFix()
consumerRepo.On("Get", channelID, consumerID).Return(mockConsumer, nil)
mockProducer, _ := data.NewProducer("test-producer", token)
mockProducer.QuickFix()
mockMsg, _ := data.NewMessage(mockChannel, mockProducer, "test-payload", "text/plain", data.HeadersMap{})
mockMsg.QuickFix()
job, _ := data.NewDeliveryJob(mockMsg, mockConsumer)
job.QuickFix()
job.Status = data.JobQueued
repo.On("GetByID", jobID).Return(job, nil).Once()

url := fmt.Sprintf("/channel/%s/consumer/%s/job/%s/requeue-dead-job", channelID, consumerID, jobID)
req, _ := http.NewRequest("POST", url, nil)
req.Header.Set(headerChannelToken, token)
req.Header.Set(headerConsumerToken, token)
rr := httptest.NewRecorder()

router.ServeHTTP(rr, req)
assert.Equal(t, http.StatusBadRequest, rr.Code)
repo.AssertExpectations(t)
channelRepo.AssertExpectations(t)
consumerRepo.AssertExpectations(t)

})

t.Run("Job Requeue Successful", func(t *testing.T) {
mockChannel, _ := data.NewChannel(channelID, token)
mockChannel.QuickFix()
channelRepo.On("Get", channelID).Return(mockChannel, nil)
mockConsumer, _ := data.NewConsumer(mockChannel, consumerID, token, callbackURL, data.PullConsumer.String())
mockConsumer.QuickFix()
consumerRepo.On("Get", channelID, consumerID).Return(mockConsumer, nil)
mockProducer, _ := data.NewProducer("test-producer", token)
mockProducer.QuickFix()
mockMsg, _ := data.NewMessage(mockChannel, mockProducer, "test-payload", "text/plain", data.HeadersMap{})
mockMsg.QuickFix()
job, _ := data.NewDeliveryJob(mockMsg, mockConsumer)
job.QuickFix()
job.Status = data.JobDead
repo.On("GetByID", jobID).Return(job, nil).Once()
repo.On("RequeueDeadJob", job).Return(nil).Once()

url := fmt.Sprintf("/channel/%s/consumer/%s/job/%s/requeue-dead-job", channelID, consumerID, jobID)
req, _ := http.NewRequest("POST", url, nil)
req.Header.Set(headerChannelToken, token)
req.Header.Set(headerConsumerToken, token)

rr := httptest.NewRecorder()
router.ServeHTTP(rr, req)

assert.Equal(t, http.StatusAccepted, rr.Code)
repo.AssertExpectations(t)
channelRepo.AssertExpectations(t)
consumerRepo.AssertExpectations(t)
})

t.Run("Job Requeue Error", func(t *testing.T) {
expectedErr := errors.New("requeue error")
mockChannel, _ := data.NewChannel(channelID, token)
mockChannel.QuickFix()
channelRepo.On("Get", channelID).Return(mockChannel, nil)
mockConsumer, _ := data.NewConsumer(mockChannel, consumerID, token, callbackURL, data.PullConsumer.String())
mockConsumer.QuickFix()
consumerRepo.On("Get", channelID, consumerID).Return(mockConsumer, nil)
mockProducer, _ := data.NewProducer("test-producer", token)
mockProducer.QuickFix()
mockMsg, _ := data.NewMessage(mockChannel, mockProducer, "test-payload", "text/plain", data.HeadersMap{})
mockMsg.QuickFix()
job, _ := data.NewDeliveryJob(mockMsg, mockConsumer)
job.QuickFix()
job.Status = data.JobDead
repo.On("GetByID", jobID).Return(job, nil).Once()
repo.On("RequeueDeadJob", job).Return(expectedErr).Once()

url := fmt.Sprintf("/channel/%s/consumer/%s/job/%s/requeue-dead-job", channelID, consumerID, jobID)
req, _ := http.NewRequest("POST", url, nil)
req.Header.Set(headerChannelToken, token)
req.Header.Set(headerConsumerToken, token)

rr := httptest.NewRecorder()
router.ServeHTTP(rr, req)
assert.Equal(t, http.StatusInternalServerError, rr.Code)
assert.Equal(t, expectedErr.Error(), rr.Body.String())

repo.AssertExpectations(t)
channelRepo.AssertExpectations(t)
consumerRepo.AssertExpectations(t)
})
}
Loading
Loading