Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #2159 from Netflix/remove_unimportant_apis
Browse files Browse the repository at this point in the history
Removed unimportant and potentially expensive APIs
  • Loading branch information
apanicker-nflx authored Apr 9, 2021
2 parents ed2a340 + 0febc74 commit 87d7ddc
Show file tree
Hide file tree
Showing 21 changed files with 82 additions and 562 deletions.
16 changes: 0 additions & 16 deletions .github/ISSUE_TEMPLATE/bug.md

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -198,36 +198,6 @@ private void populateTaskPayloads(Task task) {
}
}

/**
* Retrieve pending tasks by type
*
* @param taskType Type of task
* @param startKey id of the task from where to return the results. NULL to start from the beginning.
* @param count number of tasks to retrieve
* @return Returns the list of PENDING tasks by type, starting with a given task Id.
*/
public List<Task> getPendingTasksByType(String taskType, String startKey, Integer count) {
Preconditions.checkArgument(StringUtils.isNotBlank(taskType), "Task type cannot be blank");

Object[] params = new Object[]{"startKey", startKey, "count", count};
return getForEntity("tasks/in_progress/{taskType}", params, taskList, taskType);
}

/**
* Retrieve pending task identified by reference name for a workflow
*
* @param workflowId Workflow instance id
* @param taskReferenceName reference name of the task
* @return Returns the pending workflow task identified by the reference name
*/
public Task getPendingTaskForWorkflow(String workflowId, String taskReferenceName) {
Preconditions.checkArgument(StringUtils.isNotBlank(workflowId), "Workflow id cannot be blank");
Preconditions.checkArgument(StringUtils.isNotBlank(taskReferenceName), "Task reference name cannot be blank");

return getForEntity("tasks/in_progress/{workflowId}/{taskRefName}", null, Task.class, workflowId,
taskReferenceName);
}

/**
* Updates the result of a task execution. If the size of the task output payload is bigger than {@link
* ConductorClientConfiguration#getTaskOutputPayloadThresholdKB()}, it is uploaded to {@link
Expand Down
13 changes: 10 additions & 3 deletions core/src/main/java/com/netflix/conductor/service/AdminService.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
package com.netflix.conductor.service;

import com.netflix.conductor.common.metadata.tasks.Task;
import org.springframework.validation.annotation.Validated;

import javax.validation.constraints.NotEmpty;
import java.util.List;
import java.util.Map;
import javax.validation.constraints.NotEmpty;
import org.springframework.validation.annotation.Validated;

@Validated
public interface AdminService {
Expand Down Expand Up @@ -56,4 +55,12 @@ List<Task> getListOfPendingTask(@NotEmpty(message = "TaskType cannot be null or
*/
boolean verifyAndRepairWorkflowConsistency(
@NotEmpty(message = "WorkflowId cannot be null or empty.") String workflowId);

/**
* Get registered queues.
*
* @param verbose `true|false` for verbose logs
* @return map of event queues
*/
Map<String, ?> getEventQueues(boolean verbose);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
import com.netflix.conductor.annotations.Trace;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.events.EventQueueManager;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.execution.WorkflowRepairService;
import com.netflix.conductor.dao.QueueDAO;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.springframework.stereotype.Service;

@Audit
@Trace
Expand All @@ -35,13 +35,15 @@ public class AdminServiceImpl implements AdminService {
private final ExecutionService executionService;
private final QueueDAO queueDAO;
private final WorkflowRepairService workflowRepairService;
private final EventQueueManager eventQueueManager;

public AdminServiceImpl(ConductorProperties properties, ExecutionService executionService, QueueDAO queueDAO,
Optional<WorkflowRepairService> workflowRepairService) {
Optional<WorkflowRepairService> workflowRepairService, Optional<EventQueueManager> eventQueueManager) {
this.properties = properties;
this.executionService = executionService;
this.queueDAO = queueDAO;
this.workflowRepairService = workflowRepairService.orElse(null);
this.eventQueueManager = eventQueueManager.orElse(null);
}

/**
Expand Down Expand Up @@ -87,7 +89,21 @@ public boolean verifyAndRepairWorkflowConsistency(String workflowId) {
*/
public String requeueSweep(String workflowId) {
boolean pushed = queueDAO
.pushIfNotExists(WorkflowExecutor.DECIDER_QUEUE, workflowId, properties.getWorkflowOffsetTimeout().getSeconds());
.pushIfNotExists(WorkflowExecutor.DECIDER_QUEUE, workflowId,
properties.getWorkflowOffsetTimeout().getSeconds());
return pushed + "." + workflowId;
}

/**
* Get registered queues.
*
* @param verbose `true|false` for verbose logs
* @return map of event queues
*/
public Map<String, ?> getEventQueues(boolean verbose) {
if (eventQueueManager == null) {
throw new IllegalStateException("Event processing is DISABLED");
}
return (verbose ? eventQueueManager.getQueueSizes() : eventQueueManager.getQueues());
}
}
21 changes: 2 additions & 19 deletions core/src/main/java/com/netflix/conductor/service/EventService.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,11 @@
package com.netflix.conductor.service;

import com.netflix.conductor.common.metadata.events.EventHandler;
import org.springframework.validation.annotation.Validated;

import java.util.List;
import javax.validation.Valid;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import java.util.List;
import java.util.Map;
import org.springframework.validation.annotation.Validated;

@Validated
public interface EventService {
Expand Down Expand Up @@ -62,19 +60,4 @@ public interface EventService {
List<EventHandler> getEventHandlersForEvent(
@NotEmpty(message = "Event cannot be null or empty.") String event,
boolean activeOnly);

/**
* Get registered queues.
*
* @param verbose `true|false` for verbose logs
* @return map of event queues
*/
Map<String, ?> getEventQueues(boolean verbose);

/**
* Get registered queue providers.
*
* @return list of registered queue providers.
*/
List<String> getEventQueueProviders();
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,17 @@
package com.netflix.conductor.service;

import com.netflix.conductor.common.metadata.events.EventHandler;
import com.netflix.conductor.core.events.EventQueueManager;
import com.netflix.conductor.core.events.EventQueues;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.springframework.stereotype.Service;

@Service
public class EventServiceImpl implements EventService {

private final MetadataService metadataService;
private final EventQueueManager eventQueueManager;
private final EventQueues eventQueues;

public EventServiceImpl(MetadataService metadataService, Optional<EventQueueManager> optionalEventQueueManager,
EventQueues eventQueues) {
public EventServiceImpl(MetadataService metadataService, EventQueues eventQueues) {
this.metadataService = metadataService;
// EventQueueManager is optional and may not be enabled
this.eventQueueManager = optionalEventQueueManager.orElse(null);
this.eventQueues = eventQueues;
}

/**
Expand Down Expand Up @@ -81,26 +72,4 @@ public List<EventHandler> getEventHandlers() {
public List<EventHandler> getEventHandlersForEvent(String event, boolean activeOnly) {
return metadataService.getEventHandlersForEvent(event, activeOnly);
}

/**
* Get registered queues.
*
* @param verbose `true|false` for verbose logs
* @return map of event queues
*/
public Map<String, ?> getEventQueues(boolean verbose) {
if (eventQueueManager == null) {
throw new IllegalStateException("Event processing is DISABLED");
}
return (verbose ? eventQueueManager.getQueueSizes() : eventQueueManager.getQueues());
}

/**
* Get registered queue providers.
*
* @return list of registered queue providers.
*/
public List<String> getEventQueueProviders() {
return eventQueues.getProviders();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;

import com.netflix.conductor.core.events.EventQueueManager;
import com.netflix.conductor.core.events.EventQueues;
import com.netflix.conductor.core.events.DefaultEventQueueManager;
import java.util.Optional;
import java.util.Set;
import javax.validation.ConstraintViolationException;
import org.junit.Test;
Expand All @@ -43,9 +40,8 @@ static class TestEventConfiguration {
@Bean
public EventService eventService() {
MetadataService metadataService = mock(MetadataService.class);
EventQueueManager eventQueueManager = mock(DefaultEventQueueManager.class);
EventQueues eventQueues = mock(EventQueues.class);
return new EventServiceImpl(metadataService, Optional.of(eventQueueManager), eventQueues);
return new EventServiceImpl(metadataService, eventQueues);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,53 +103,6 @@ public Iterator<Task> batchPollTasksByTaskTypeAsync(String taskType, String work
return Iterators.transform(it, protoMapper::fromProto);
}

/**
* Retrieve pending tasks by type
*
* @param taskType Type of task
* @param startKey id of the task from where to return the results. NULL to start from the beginning.
* @param count number of tasks to retrieve
* @return Returns the list of PENDING tasks by type, starting with a given task Id.
*/
public List<Task> getPendingTasksByType(String taskType, @Nullable String startKey, @Nullable Integer count) {
Preconditions.checkArgument(StringUtils.isNotBlank(taskType), "Task type cannot be blank");

TaskServicePb.TasksInProgressRequest.Builder request = TaskServicePb.TasksInProgressRequest.newBuilder();
request.setTaskType(taskType);
if (startKey != null) {
request.setStartKey(startKey);
}
if (count != null) {
request.setCount(count);
}

return stub.getTasksInProgress(request.build())
.getTasksList()
.stream()
.map(protoMapper::fromProto)
.collect(Collectors.toList());
}

/**
* Retrieve pending task identified by reference name for a workflow
*
* @param workflowId Workflow instance id
* @param taskReferenceName reference name of the task
* @return Returns the pending workflow task identified by the reference name
*/
public Task getPendingTaskForWorkflow(String workflowId, String taskReferenceName) {
Preconditions.checkArgument(StringUtils.isNotBlank(workflowId), "Workflow id cannot be blank");
Preconditions.checkArgument(StringUtils.isNotBlank(taskReferenceName), "Task reference name cannot be blank");

TaskServicePb.PendingTaskResponse response = stub.getPendingTaskForWorkflow(
TaskServicePb.PendingTaskRequest.newBuilder()
.setWorkflowId(workflowId)
.setTaskRefName(taskReferenceName)
.build()
);
return protoMapper.fromProto(response.getTask());
}

/**
* Updates the result of a task execution.
*
Expand All @@ -163,26 +116,6 @@ public void updateTask(TaskResult taskResult) {
);
}

/**
* Ack for the task poll.
*
* @param taskId Id of the task to be polled
* @param workerId user identified worker.
* @return true if the task was found with the given ID and acknowledged. False otherwise. If the server returns
* false, the client should NOT attempt to ack again.
*/
public boolean ack(String taskId, @Nullable String workerId) {
Preconditions.checkArgument(StringUtils.isNotBlank(taskId), "Task id cannot be blank");

TaskServicePb.AckTaskRequest.Builder request = TaskServicePb.AckTaskRequest.newBuilder();
request.setTaskId(taskId);
if (workerId != null) {
request.setWorkerId(workerId);
}

return stub.ackTask(request.build()).getAck();
}

/**
* Log execution messages for a task.
*
Expand Down Expand Up @@ -230,23 +163,6 @@ public Task getTaskDetails(String taskId) {
);
}

/**
* Removes a task from a taskType queue
*
* @param taskType the taskType to identify the queue
* @param taskId the id of the task to be removed
*/
public void removeTaskFromQueue(String taskType, String taskId) {
Preconditions.checkArgument(StringUtils.isNotBlank(taskType), "Task type cannot be blank");
Preconditions.checkArgument(StringUtils.isNotBlank(taskId), "Task id cannot be blank");
stub.removeTaskFromQueue(
TaskServicePb.RemoveTaskRequest.newBuilder()
.setTaskType(taskType)
.setTaskId(taskId)
.build()
);
}

public int getQueueSizeForTask(String taskType) {
Preconditions.checkArgument(StringUtils.isNotBlank(taskType), "Task type cannot be blank");

Expand Down
Loading

0 comments on commit 87d7ddc

Please sign in to comment.