diff --git a/orkes-conductor-queues/src/main/java/io/orkes/conductor/queue/dao/BaseRedisQueueDAO.java b/orkes-conductor-queues/src/main/java/io/orkes/conductor/queue/dao/BaseRedisQueueDAO.java index b0dc844..a8f2aab 100644 --- a/orkes-conductor-queues/src/main/java/io/orkes/conductor/queue/dao/BaseRedisQueueDAO.java +++ b/orkes-conductor-queues/src/main/java/io/orkes/conductor/queue/dao/BaseRedisQueueDAO.java @@ -101,12 +101,8 @@ public final boolean pushIfNotExists( @Override public final List pop(String queueName, int count, int timeout) { - // Keep the timeout to a minimum of 100ms - if (timeout < 100) { - timeout = 100; - } List messages = get(queueName).pop(count, timeout, TimeUnit.MILLISECONDS); - return messages.stream().map(msg -> msg.getId()).collect(Collectors.toList()); + return messages.stream().map(QueueMessage::getId).collect(Collectors.toList()); } @Override diff --git a/orkes-redis-queues/src/main/java/io/orkes/conductor/mq/redis/QueueMonitor.java b/orkes-redis-queues/src/main/java/io/orkes/conductor/mq/redis/QueueMonitor.java index 78e00f1..c6f7c1c 100644 --- a/orkes-redis-queues/src/main/java/io/orkes/conductor/mq/redis/QueueMonitor.java +++ b/orkes-redis-queues/src/main/java/io/orkes/conductor/mq/redis/QueueMonitor.java @@ -56,7 +56,7 @@ public QueueMonitor(String queueName) { public List pop(int count, int waitTime, TimeUnit timeUnit) { if (count <= 0) { - log.warn("Negative poll count {}"); + log.warn("Negative poll count {}", count); // Negative number shouldn't happen, but it can be zero and in that case we don't do // anything! return new ArrayList<>(); @@ -76,7 +76,7 @@ public List pop(int count, int waitTime, TimeUnit timeUnit) { // The sleep method below, just does Thread.wait should be more CPU friendly QueueMessage message = peekedMessages.poll(); if (message == null) { - if (!waited) { + if (!waited && waitTime > 0) { Uninterruptibles.sleepUninterruptibly(waitTime, timeUnit); waited = true; continue;