From 084f0025d1fd4f8b3fcc7735a7e0319f7cc0d7ce Mon Sep 17 00:00:00 2001
From: igorjava2025 <->
Date: Sun, 8 Mar 2026 22:12:57 +0300
Subject: [PATCH 1/8] Add HARD_STRICT mode See details there
https://github.com/sonus21/rqueue/issues/276
(cherry picked from commit acc09bf4cbac84694e7217920d6d51869712adf9)
---
.../SimpleRqueueListenerContainerFactory.java | 14 ++
.../listener/HardStrictPriorityPoller.java | 206 ++++++++++++++++++
.../HardStrictPriorityPollerProperties.java | 24 ++
.../RqueueMessageListenerContainer.java | 24 ++
.../rqueue/models/enums/PriorityMode.java | 3 +-
.../HardStrictPriorityPollerTest.java | 178 +++++++++++++++
6 files changed, 448 insertions(+), 1 deletion(-)
create mode 100644 rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPoller.java
create mode 100644 rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerProperties.java
create mode 100644 rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerTest.java
diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java
index 5da2e641..ac7a4e90 100644
--- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java
+++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java
@@ -27,6 +27,7 @@
import com.github.sonus21.rqueue.core.impl.RqueueMessageTemplateImpl;
import com.github.sonus21.rqueue.core.middleware.Middleware;
import com.github.sonus21.rqueue.core.support.MessageProcessor;
+import com.github.sonus21.rqueue.listener.HardStrictPriorityPollerProperties;
import com.github.sonus21.rqueue.listener.RqueueMessageHandler;
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
import com.github.sonus21.rqueue.models.enums.PriorityMode;
@@ -92,6 +93,8 @@ public class SimpleRqueueListenerContainerFactory {
// Set priority mode for the pollers
private PriorityMode priorityMode = PriorityMode.WEIGHTED;
+ // Set HardStrictPriorityPollerProperties for HARD_STRICT priority mode poller
+ private HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties;
/**
* Whether all beans of spring application should be inspected to find methods annotated with
@@ -348,6 +351,9 @@ public RqueueMessageListenerContainer createMessageListenerContainer() {
if (messageHeaders != null) {
messageListenerContainer.setMessageHeaders(messageHeaders);
}
+ if (hardStrictPriorityPollerProperties != null) {
+ messageListenerContainer.setHardStrictPriorityPollerProperties(hardStrictPriorityPollerProperties);
+ }
return messageListenerContainer;
}
@@ -486,6 +492,14 @@ public void setMessageHeaders(MessageHeaders messageHeaders) {
this.messageHeaders = messageHeaders;
}
+ public HardStrictPriorityPollerProperties getHardStrictPriorityPollerProperties() {
+ return this.hardStrictPriorityPollerProperties;
+ }
+
+ public void setHardStrictPriorityPollerProperties(HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties) {
+ this.hardStrictPriorityPollerProperties = hardStrictPriorityPollerProperties;
+ }
+
/**
* Rqueue scans all beans to find method annotated with {@link RqueueListener}.
*
diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPoller.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPoller.java
new file mode 100644
index 00000000..3d824bc3
--- /dev/null
+++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPoller.java
@@ -0,0 +1,206 @@
+/*
+ * Copyright (c) 2026 Sonu Kumar
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * You may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ *
+ */
+
+package com.github.sonus21.rqueue.listener;
+
+import com.github.sonus21.rqueue.core.RqueueBeanProvider;
+import com.github.sonus21.rqueue.core.middleware.Middleware;
+import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer.QueueStateMgr;
+import com.github.sonus21.rqueue.utils.Constants;
+import com.github.sonus21.rqueue.utils.QueueThreadPool;
+import com.github.sonus21.rqueue.utils.TimeoutUtils;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.slf4j.event.Level;
+import org.springframework.messaging.MessageHeaders;
+
+/**
+ * Use it only with priority queues.
+ * Message processing can be slow.
+ * The hard strict priority algorithm is better in HardStrictPriorityPoller than in StrictPriorityPoller
+ * More details see in GitHub project issue
+ */
+class HardStrictPriorityPoller extends RqueueMessagePoller {
+
+ private final RqueueBeanProvider rqueueBeanProvider;
+
+ private final Map queueNameToDetail;
+ private final Map queueNameToThread;
+ private final Map queueDeactivationTime = new HashMap<>();
+ private final HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties;
+
+ HardStrictPriorityPoller(
+ String groupName,
+ final List queueDetails,
+ final Map queueNameToThread,
+ RqueueBeanProvider rqueueBeanProvider,
+ QueueStateMgr queueStateMgr,
+ List middlewares,
+ long pollingInterval,
+ long backoffTime,
+ PostProcessingHandler postProcessingHandler,
+ MessageHeaders messageHeaders,
+ HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties) {
+ super(
+ "HardStrict-" + groupName,
+ rqueueBeanProvider,
+ queueStateMgr,
+ middlewares,
+ pollingInterval,
+ backoffTime,
+ postProcessingHandler,
+ messageHeaders);
+
+ this.rqueueBeanProvider = rqueueBeanProvider;
+ // Sort queues by priority once during initialization
+ List queueDetailList = new ArrayList<>(queueDetails);
+ queueDetailList.sort(
+ (o1, o2) ->
+ o2.getPriority().get(Constants.DEFAULT_PRIORITY_KEY)
+ - o1.getPriority().get(Constants.DEFAULT_PRIORITY_KEY));
+
+ this.queues = queueDetailList.stream().map(QueueDetail::getName).collect(Collectors.toList());
+ this.queueNameToDetail =
+ queueDetailList.stream()
+ .collect(Collectors.toMap(QueueDetail::getName, Function.identity()));
+ this.queueNameToThread = queueNameToThread;
+ this.hardStrictPriorityPollerProperties = hardStrictPriorityPollerProperties != null
+ ? hardStrictPriorityPollerProperties
+ : new HardStrictPriorityPollerProperties();
+ }
+
+ @Override
+ public void start() {
+ log(Level.DEBUG, "Running, Ordered Queues: {}", null, queues);
+ while (true) {
+ if (shouldExit()) {
+ return;
+ }
+
+ boolean messageFoundInAnyQueue = false;
+
+ try {
+ for (String queue : queues) {
+ if (eligibleForPolling(queue) && !isDeactivated(queue)) {
+ QueueThreadPool queueThreadPool = queueNameToThread.get(queue);
+ QueueDetail queueDetail = queueNameToDetail.get(queue);
+ poll(-1, queue, queueDetail, queueThreadPool);
+
+ if (hardStrictPriorityPollerProperties.getAfterPollSleepInterval() != null) {
+ TimeoutUtils.sleepLog(hardStrictPriorityPollerProperties.getAfterPollSleepInterval(), false);
+ }
+
+ if (existMessagesInCurrentQueueOrHigherPriorityQueue(queue, queues)) {
+ // break current cycle and start new cycle
+ // it allow to process queue with the higher priority
+ messageFoundInAnyQueue = true;
+ break;
+ }
+ }
+ }
+
+ // If no messages were found across all queues, sleep for the polling interval
+ if (!messageFoundInAnyQueue) {
+ TimeoutUtils.sleepLog(pollingInterval, false);
+ }
+
+ } catch (Throwable e) {
+ log(Level.ERROR, "Exception in the poller {}", e, e.getMessage());
+ if (shouldExit()) {
+ return;
+ }
+ TimeoutUtils.sleepLog(backoffTime, false);
+ }
+ }
+ }
+
+ boolean existMessagesInCurrentQueueOrHigherPriorityQueue(String currentQueue, List queues) {
+ for (String queue : queues) {
+ if (eligibleForPolling(queue) && !isDeactivated(queue)) {
+ QueueDetail queueDetail = queueNameToDetail.get(queue);
+ if (existAvailableMessagesForPoll(queueDetail)) {
+ // the current or higher priority queue contains messages that need to be processed.
+ return true;
+ }
+ }
+ // we check all queues from the highest priority to current queue
+ if (queue.equals(currentQueue)) {
+ return false;
+ }
+ }
+ // unexpected behavior, need more details if it occurs
+ log(Level.WARN, "current queue '{}' not found in queues list '{}'", null, currentQueue, queues);
+ return false;
+ }
+
+ protected boolean existAvailableMessagesForPoll(QueueDetail queueDetail) {
+ List> readyMessages = rqueueBeanProvider
+ .getRqueueMessageTemplate()
+ .readFromList(queueDetail.getQueueName(), 0, 0);
+
+ if (readyMessages != null && !readyMessages.isEmpty()) {
+ log(Level.TRACE, "readyMessages exists for queue '{}', existAvailableMessagesForPoll = true.", null, queueDetail.getName());
+ return true;
+ }
+
+ // Only check delayed messages with score <= current time
+ long currentTime = System.currentTimeMillis();
+ List> delayedMessages = rqueueBeanProvider
+ .getRqueueMessageTemplate()
+ .readFromZsetWithScore(queueDetail.getScheduledQueueName(), 0, currentTime);
+
+ if (delayedMessages != null && !delayedMessages.isEmpty()) {
+ log(Level.TRACE, "delayedMessages exists for queue '{}', existAvailableMessagesForPoll = true.", null, queueDetail.getName());
+ return true;
+ }
+
+ return false;
+ }
+
+ private boolean isDeactivated(String queue) {
+ Long deactivationTime = queueDeactivationTime.get(queue);
+ if (deactivationTime == null) {
+ return false;
+ }
+ if (System.currentTimeMillis() - deactivationTime > pollingInterval) {
+ queueDeactivationTime.remove(queue);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ long getSemaphoreWaitTime() {
+ return hardStrictPriorityPollerProperties.getSemaphoreWaitTime() != null
+ ? hardStrictPriorityPollerProperties.getSemaphoreWaitTime()
+ : 20L;
+ }
+
+ @Override
+ void deactivate(int index, String queue, DeactivateType deactivateType) {
+ if (deactivateType == DeactivateType.POLL_FAILED) {
+ // Pause in case of connection errors or polling failures
+ TimeoutUtils.sleepLog(backoffTime, false);
+ } else {
+ // Mark deactivation time if the queue is empty
+ queueDeactivationTime.put(queue, System.currentTimeMillis());
+ }
+ }
+}
diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerProperties.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerProperties.java
new file mode 100644
index 00000000..99ca44b9
--- /dev/null
+++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerProperties.java
@@ -0,0 +1,24 @@
+package com.github.sonus21.rqueue.listener;
+
+public class HardStrictPriorityPollerProperties {
+ // set such default values for parameters "afterPollSleepInterval" and "semaphoreWaitTime"
+ // because local load tests have correct strict priority algorithm work and good performance with them
+ private Long afterPollSleepInterval = 30L;
+ private Long semaphoreWaitTime = 15L;
+
+ public Long getAfterPollSleepInterval() {
+ return afterPollSleepInterval;
+ }
+
+ public void setAfterPollSleepInterval(Long afterPollSleepInterval) {
+ this.afterPollSleepInterval = afterPollSleepInterval;
+ }
+
+ public Long getSemaphoreWaitTime() {
+ return this.semaphoreWaitTime;
+ }
+
+ public void setSemaphoreWaitTime(Long semaphoreWaitTime) {
+ this.semaphoreWaitTime = semaphoreWaitTime;
+ }
+}
diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java
index 440a836a..36b8dc73 100644
--- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java
+++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java
@@ -106,6 +106,7 @@ public class RqueueMessageListenerContainer
private int phase = Integer.MAX_VALUE;
private PriorityMode priorityMode;
private MessageHeaders messageHeaders;
+ private HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties;
public RqueueMessageListenerContainer(
RqueueMessageHandler rqueueMessageHandler, RqueueMessageTemplate rqueueMessageTemplate) {
@@ -515,6 +516,21 @@ protected void startGroup(String groupName, List queueDetails) {
backOffTime,
postProcessingHandler,
getMessageHeaders()));
+ } else if (getPriorityMode() == PriorityMode.HARD_STRICT) {
+ future =
+ taskExecutor.submit(
+ new HardStrictPriorityPoller(
+ StringUtils.groupName(groupName),
+ queueDetails,
+ queueThread,
+ rqueueBeanProvider,
+ queueStateMgr,
+ getMiddleWares(),
+ pollingInterval,
+ backOffTime,
+ postProcessingHandler,
+ getMessageHeaders(),
+ getHardStrictPriorityPollerProperties()));
} else {
future =
taskExecutor.submit(
@@ -712,6 +728,14 @@ public void setMessageHeaders(MessageHeaders messageHeaders) {
this.messageHeaders = messageHeaders;
}
+ public HardStrictPriorityPollerProperties getHardStrictPriorityPollerProperties() {
+ return this.hardStrictPriorityPollerProperties;
+ }
+
+ public void setHardStrictPriorityPollerProperties(HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties) {
+ this.hardStrictPriorityPollerProperties = hardStrictPriorityPollerProperties;
+ }
+
class QueueStateMgr {
Set pausedQueues = ConcurrentHashMap.newKeySet();
diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/enums/PriorityMode.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/enums/PriorityMode.java
index 0b6e556a..0fa83405 100644
--- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/enums/PriorityMode.java
+++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/enums/PriorityMode.java
@@ -18,5 +18,6 @@
public enum PriorityMode {
STRICT,
- WEIGHTED
+ WEIGHTED,
+ HARD_STRICT
}
diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerTest.java
new file mode 100644
index 00000000..9315c2c9
--- /dev/null
+++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerTest.java
@@ -0,0 +1,178 @@
+package com.github.sonus21.rqueue.listener;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import com.github.sonus21.TestBase;
+import com.github.sonus21.rqueue.CoreUnitTest;
+import com.github.sonus21.rqueue.core.RqueueBeanProvider;
+import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer.QueueStateMgr;
+import com.github.sonus21.rqueue.utils.Constants;
+import com.github.sonus21.rqueue.utils.QueueThreadPool;
+import com.github.sonus21.rqueue.utils.TimeoutUtils;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.springframework.messaging.MessageHeaders;
+
+@CoreUnitTest
+class HardStrictPriorityPollerTest extends TestBase {
+
+ @Mock
+ private RqueueBeanProvider rqueueBeanProvider;
+ @Mock
+ private QueueStateMgr queueStateMgr;
+ @Mock
+ private PostProcessingHandler postProcessingHandler;
+
+ private final String highPriorityQueue = "high-priority-" + UUID.randomUUID();
+ private final String lowPriorityQueue = "low-priority-" + UUID.randomUUID();
+ private HardStrictPriorityPoller poller;
+ private QueueDetail highDetail;
+ private QueueDetail lowDetail;
+
+ @BeforeEach
+ public void setUp() {
+ MockitoAnnotations.openMocks(this);
+
+ highDetail = createQueueDetail(highPriorityQueue, 100);
+ lowDetail = createQueueDetail(lowPriorityQueue, 10);
+
+ List queueDetails = Arrays.asList(lowDetail, highDetail);
+ Map queueNameToThread = new HashMap<>();
+ queueNameToThread.put(highPriorityQueue, mock(QueueThreadPool.class));
+ queueNameToThread.put(lowPriorityQueue, mock(QueueThreadPool.class));
+
+ poller = spy(new HardStrictPriorityPoller(
+ "test-group",
+ queueDetails,
+ queueNameToThread,
+ rqueueBeanProvider,
+ queueStateMgr,
+ Collections.emptyList(),
+ 50L,
+ 50L,
+ postProcessingHandler,
+ new MessageHeaders(Collections.emptyMap()),
+ new HardStrictPriorityPollerProperties()
+ ));
+
+ // КРИТИЧЕСКИ ВАЖНО: Разрешаем опрос очередей
+ lenient().doReturn(true).when(poller).eligibleForPolling(anyString());
+ // КРИТИЧЕСКИ ВАЖНО: Запрещаем немедленный выход из цикла
+ lenient().doReturn(false).when(poller).shouldExit();
+ }
+
+ private QueueDetail createQueueDetail(String name, int priority) {
+ QueueDetail detail = mock(QueueDetail.class);
+ when(detail.getName()).thenReturn(name);
+ Map priorityMap = new HashMap<>();
+ priorityMap.put(Constants.DEFAULT_PRIORITY_KEY, priority);
+ when(detail.getPriority()).thenReturn(priorityMap);
+ return detail;
+ }
+
+ @Test
+ void testQueuesAreSortedByPriority() {
+ List sortedQueues = poller.queues;
+ assertTrue(sortedQueues.indexOf(highPriorityQueue) < sortedQueues.indexOf(lowPriorityQueue),
+ "High priority queue should be first");
+ }
+
+ @Test
+ void testExistMessagesInHigherPriorityQueueReturnsTrue() {
+ List queues = Arrays.asList(highPriorityQueue, lowPriorityQueue);
+
+ // High queue has messages
+ lenient().doReturn(true).when(poller).existAvailableMessagesForPoll(highDetail);
+
+ // Checking from low priority perspective
+ boolean result = poller.existMessagesInCurrentQueueOrHigherPriorityQueue(lowPriorityQueue, queues);
+ assertTrue(result, "Should return true because high priority queue has messages");
+ }
+
+ @Test
+ void testStrictExecutionPreventsLowPriorityPoll() throws Exception {
+ AtomicInteger highQueuePollCount = new AtomicInteger(0);
+ AtomicInteger lowQueuePollCount = new AtomicInteger(0);
+
+ // High Priority always has messages
+ lenient().doReturn(true).when(poller).existAvailableMessagesForPoll(highDetail);
+ lenient().doAnswer(invocation -> {
+ highQueuePollCount.incrementAndGet();
+ return 1;
+ }).when(poller).poll(anyInt(), eq(highPriorityQueue), eq(highDetail), any());
+
+ // Low Priority also has messages
+ lenient().doReturn(true).when(poller).existAvailableMessagesForPoll(lowDetail);
+ lenient().doAnswer(invocation -> {
+ lowQueuePollCount.incrementAndGet();
+ return 1;
+ }).when(poller).poll(anyInt(), eq(lowPriorityQueue), eq(lowDetail), any());
+
+ Thread pollerThread = new Thread(poller::start);
+ pollerThread.start();
+
+ try {
+ // Wait for multiple polls of High priority
+ TimeoutUtils.waitFor(() -> highQueuePollCount.get() > 5, 2000, "high priority polls");
+
+ // Low priority must NEVER be polled because 'break' happens after high poll
+ assertTrue(lowQueuePollCount.get() == 0, "Low priority queue should not be polled");
+ } finally {
+ stop(poller, pollerThread);
+ }
+ }
+
+ @Test
+ void testLowPriorityIsPolledWhenHighIsEmpty() throws Exception {
+ AtomicInteger lowQueuePollCount = new AtomicInteger(0);
+
+ // High is empty
+ lenient().doReturn(false).when(poller).existAvailableMessagesForPoll(highDetail);
+
+ // Low has messages
+ lenient().doReturn(true).when(poller).existAvailableMessagesForPoll(lowDetail);
+ lenient().doAnswer(invocation -> {
+ lowQueuePollCount.incrementAndGet();
+ return 1;
+ }).when(poller).poll(anyInt(), eq(lowPriorityQueue), eq(lowDetail), any());
+
+ Thread pollerThread = new Thread(poller::start);
+ pollerThread.start();
+
+ try {
+ TimeoutUtils.waitFor(() -> lowQueuePollCount.get() > 0, 2000, "low priority poll");
+ assertTrue(lowQueuePollCount.get() > 0);
+ } finally {
+ stop(poller, pollerThread);
+ }
+ }
+
+ private void stop(HardStrictPriorityPoller poller, Thread thread) {
+ lenient().doReturn(true).when(poller).shouldExit();
+ if (thread != null && thread.isAlive()) {
+ thread.interrupt();
+ try {
+ thread.join(500);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+}
\ No newline at end of file
From 3f5b0a0637b253591025b7290bcc5b48b61b8a03 Mon Sep 17 00:00:00 2001
From: igorjava2025 <->
Date: Mon, 9 Mar 2026 10:41:05 +0300
Subject: [PATCH 2/8] fix comment
(cherry picked from commit c92cb75dc50a1590f3881739db9f43b110c99217)
---
.../sonus21/rqueue/listener/HardStrictPriorityPollerTest.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerTest.java
index 9315c2c9..2392ebb1 100644
--- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerTest.java
+++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerTest.java
@@ -72,9 +72,9 @@ public void setUp() {
new HardStrictPriorityPollerProperties()
));
- // КРИТИЧЕСКИ ВАЖНО: Разрешаем опрос очередей
+ // Allowing queue polling
lenient().doReturn(true).when(poller).eligibleForPolling(anyString());
- // КРИТИЧЕСКИ ВАЖНО: Запрещаем немедленный выход из цикла
+ // Disable immediate exit from the loop
lenient().doReturn(false).when(poller).shouldExit();
}
From c6459d3e6efb26622c06308fe8c50a1e686ae424 Mon Sep 17 00:00:00 2001
From: igorjava2025 <->
Date: Fri, 13 Mar 2026 12:54:02 +0300
Subject: [PATCH 3/8] add fixes after review
(cherry picked from commit 21d6d8643aabe24b13f28244952a6d778b8732a4)
---
.../rqueue/core/RqueueMessageTemplate.java | 7 +++
.../core/impl/RqueueMessageTemplateImpl.java | 16 ++++++
.../listener/HardStrictPriorityPoller.java | 38 +++++++++-----
.../HardStrictPriorityPollerProperties.java | 9 ++++
...ardStrictPriorityPollerPropertiesTest.java | 49 +++++++++++++++++++
5 files changed, 107 insertions(+), 12 deletions(-)
create mode 100644 rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerPropertiesTest.java
diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java
index 1885eafd..caf2e25f 100644
--- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java
+++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java
@@ -18,6 +18,7 @@
import com.github.sonus21.rqueue.models.MessageMoveResult;
import java.util.List;
+import java.util.Optional;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations.TypedTuple;
import reactor.core.publisher.Flux;
@@ -92,4 +93,10 @@ Long scheduleMessage(
Flux addReactiveMessageWithDelay(
String scheduledQueueName, String scheduledQueueChannelName, RqueueMessage rqueueMessage);
+
+ Optional findFirstElementFromList(String name);
+
+ Optional findFirstElementFromZset(String name);
+
+ Optional> findFirstElementFromZsetWithScore(String name);
}
diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageTemplateImpl.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageTemplateImpl.java
index fa6b288d..f121e775 100644
--- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageTemplateImpl.java
+++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageTemplateImpl.java
@@ -30,6 +30,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
@@ -343,4 +344,19 @@ public RedisTemplate getTemplate() {
public Long removeElementFromZset(String zsetName, RqueueMessage rqueueMessage) {
return super.removeFromZset(zsetName, rqueueMessage);
}
+
+ @Override
+ public Optional findFirstElementFromList(String name) {
+ return readFromList(name, 0, 0).stream().findFirst();
+ }
+
+ @Override
+ public Optional findFirstElementFromZset(String name) {
+ return readFromZset(name, 0, 0).stream().findFirst();
+ }
+
+ @Override
+ public Optional> findFirstElementFromZsetWithScore(String name) {
+ return readFromZsetWithScore(name, 0, 0).stream().findFirst();
+ }
}
diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPoller.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPoller.java
index 3d824bc3..04443827 100644
--- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPoller.java
+++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPoller.java
@@ -151,23 +151,37 @@ boolean existMessagesInCurrentQueueOrHigherPriorityQueue(String currentQueue, Li
}
protected boolean existAvailableMessagesForPoll(QueueDetail queueDetail) {
- List> readyMessages = rqueueBeanProvider
- .getRqueueMessageTemplate()
- .readFromList(queueDetail.getQueueName(), 0, 0);
-
- if (readyMessages != null && !readyMessages.isEmpty()) {
- log(Level.TRACE, "readyMessages exists for queue '{}', existAvailableMessagesForPoll = true.", null, queueDetail.getName());
+ boolean readyMessagesExists =
+ rqueueBeanProvider
+ .getRqueueMessageTemplate()
+ .findFirstElementFromList(queueDetail.getQueueName())
+ .isPresent();
+ if (readyMessagesExists) {
+ log(
+ Level.TRACE,
+ "readyMessages exists for queue '{}', existAvailableMessagesForPoll = true.",
+ null,
+ queueDetail.getName());
return true;
}
// Only check delayed messages with score <= current time
long currentTime = System.currentTimeMillis();
- List> delayedMessages = rqueueBeanProvider
- .getRqueueMessageTemplate()
- .readFromZsetWithScore(queueDetail.getScheduledQueueName(), 0, currentTime);
-
- if (delayedMessages != null && !delayedMessages.isEmpty()) {
- log(Level.TRACE, "delayedMessages exists for queue '{}', existAvailableMessagesForPoll = true.", null, queueDetail.getName());
+ boolean delayedMessagesExists =
+ rqueueBeanProvider
+ .getRqueueMessageTemplate()
+ .findFirstElementFromZsetWithScore(queueDetail.getScheduledQueueName())
+ .filter(element -> element.getScore() <= currentTime)
+ .isPresent();
+
+ if (delayedMessagesExists) {
+ log(
+ Level.TRACE,
+ "delayedMessages exists for scheduled queue '{}' and currentTime '{}',"
+ + " existAvailableMessagesForPoll = true.",
+ null,
+ queueDetail.getScheduledQueueName(),
+ currentTime);
return true;
}
diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerProperties.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerProperties.java
index 99ca44b9..618aadac 100644
--- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerProperties.java
+++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerProperties.java
@@ -11,6 +11,7 @@ public Long getAfterPollSleepInterval() {
}
public void setAfterPollSleepInterval(Long afterPollSleepInterval) {
+ validateTimeInterval(afterPollSleepInterval);
this.afterPollSleepInterval = afterPollSleepInterval;
}
@@ -19,6 +20,14 @@ public Long getSemaphoreWaitTime() {
}
public void setSemaphoreWaitTime(Long semaphoreWaitTime) {
+ validateTimeInterval(semaphoreWaitTime);
this.semaphoreWaitTime = semaphoreWaitTime;
}
+
+ private void validateTimeInterval(Long value) {
+ if (value == null || value > 0) {
+ return;
+ }
+ throw new IllegalArgumentException("Value must be positive: " + value);
+ }
}
diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerPropertiesTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerPropertiesTest.java
new file mode 100644
index 00000000..efa4e261
--- /dev/null
+++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerPropertiesTest.java
@@ -0,0 +1,49 @@
+package com.github.sonus21.rqueue.listener;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.stream.LongStream;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.NullSource;
+
+class HardStrictPriorityPollerPropertiesTest {
+
+ private final HardStrictPriorityPollerProperties properties =
+ new HardStrictPriorityPollerProperties();
+
+ static LongStream invalidValues() {
+ return LongStream.of(0L, -1L, -100L, Long.MIN_VALUE);
+ }
+
+ static LongStream validValues() {
+ return LongStream.of(1L, Long.MAX_VALUE);
+ }
+
+ @ParameterizedTest
+ @MethodSource("invalidValues")
+ void setAfterPollSleepIntervalInvalidValues(Long value) {
+ assertThrows(IllegalArgumentException.class, () -> properties.setAfterPollSleepInterval(value));
+ }
+
+ @ParameterizedTest
+ @NullSource
+ @MethodSource("validValues")
+ void setAfterPollSleepIntervalValidValues(Long value) {
+ assertDoesNotThrow(() -> properties.setAfterPollSleepInterval(value));
+ }
+
+ @ParameterizedTest
+ @MethodSource("invalidValues")
+ void setSemaphoreWaitTimeInvalidValues(Long value) {
+ assertThrows(IllegalArgumentException.class, () -> properties.setSemaphoreWaitTime(value));
+ }
+
+ @ParameterizedTest
+ @NullSource
+ @MethodSource("validValues")
+ void setSemaphoreWaitTimeValidValues(Long value) {
+ assertDoesNotThrow(() -> properties.setSemaphoreWaitTime(value));
+ }
+}
From 5aff890c62520aae24a235fadd994b6bf504edf9 Mon Sep 17 00:00:00 2001
From: Sonu Kumar
Date: Sun, 15 Mar 2026 01:37:43 +0530
Subject: [PATCH 4/8] Prepare 3.4.1 backport release
---
build.gradle | 2 +-
docs/CHANGELOG.md | 7 +++++++
2 files changed, 8 insertions(+), 1 deletion(-)
diff --git a/build.gradle b/build.gradle
index 74f54259..432ecfee 100644
--- a/build.gradle
+++ b/build.gradle
@@ -69,7 +69,7 @@ ext {
subprojects {
group = "com.github.sonus21"
- version = "3.4.0-RELEASE"
+ version = "3.4.1-RELEASE"
dependencies {
// https://mvnrepository.com/artifact/org.springframework/spring-messaging
diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md
index 80a1c68e..f1b768b8 100644
--- a/docs/CHANGELOG.md
+++ b/docs/CHANGELOG.md
@@ -8,6 +8,13 @@ layout: default
All notable user-facing changes to this project are documented in this file.
+## Release [3.4.1] 15-March-2026
+### Features
+* Backported `HARD_STRICT` priority mode from #279 for stricter priority queue polling on the 3.x line
+
+### Fixes
+* Optimized hard-strict polling queue availability checks to avoid full list and sorted-set reads
+
## Release [3.4.0] 22-July-2025
### Fixes
* Fixed unique enqueue message to reject the message upfront instead of identifying it later #259
From aaf18cde81bcbd9ea654515312e7bc1e0a2899d6 Mon Sep 17 00:00:00 2001
From: Sonu Kumar
Date: Wed, 18 Mar 2026 15:46:11 +0530
Subject: [PATCH 5/8] add support for genric object
---
.../converter/GenericMessageConverter.java | 46 +++++++++--
.../GenericMessageConverterTest.java | 80 ++++++++++++++++++-
.../core/support/RqueueMessageUtilsTest.java | 10 ++-
3 files changed, 125 insertions(+), 11 deletions(-)
diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/GenericMessageConverter.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/GenericMessageConverter.java
index 515dd1bb..549af6ec 100644
--- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/GenericMessageConverter.java
+++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/GenericMessageConverter.java
@@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.sonus21.rqueue.utils.SerializationUtils;
+import java.lang.reflect.Field;
import java.lang.reflect.TypeVariable;
import java.util.Collection;
import java.util.List;
@@ -37,8 +38,8 @@
/**
* A converter to turn the payload of a {@link Message} from serialized form to a typed String and
- * vice versa. This class does not support generic class except {@link List},even for list the
- * entries should be non generic.
+ * vice versa. Supports {@link List} and single-level generic envelope types (e.g. {@code Event})
+ * where type parameters are non-generic and can be resolved from non-null field values.
*/
@Slf4j
public class GenericMessageConverter implements SmartMessageConverter {
@@ -137,7 +138,12 @@ private String getClassNameForCollection(String name, Collection> payload) {
if (payload.isEmpty()) {
return null;
}
- String itemClassName = getClassName(((List>) payload).get(0));
+ Object firstItem = ((List>) payload).get(0);
+ // Only support non-generic item classes in lists to avoid ambiguous encoding
+ if (firstItem.getClass().getTypeParameters().length > 0) {
+ return null;
+ }
+ String itemClassName = getClassName(firstItem);
if (itemClassName == null) {
return null;
}
@@ -146,12 +152,40 @@ private String getClassNameForCollection(String name, Collection> payload) {
return null;
}
- private String getGenericFieldBasedClassName(Class> clazz) {
+ private Class> resolveTypeVariable(Class> clazz, TypeVariable> tv, Object payload) {
+ // TypeVariable instances are scoped to the class that declares them, so
+ // field.getGenericType().equals(tv) can only match fields declared on clazz itself.
+ // Superclass fields reference their own TypeVariable instances, which are distinct objects.
+ for (Field field : clazz.getDeclaredFields()) {
+ if (field.getGenericType().equals(tv)) {
+ field.setAccessible(true);
+ try {
+ Object value = field.get(payload);
+ if (value != null) {
+ return value.getClass();
+ }
+ } catch (IllegalAccessException e) {
+ log.debug("Cannot access field {}", field.getName(), e);
+ }
+ }
+ }
+ return null;
+ }
+
+ private String getGenericFieldBasedClassName(Class> clazz, Object payload) {
TypeVariable>[] typeVariables = clazz.getTypeParameters();
if (typeVariables.length == 0) {
return clazz.getName();
}
- return null;
+ StringBuilder sb = new StringBuilder(clazz.getName());
+ for (TypeVariable> tv : typeVariables) {
+ Class> resolved = resolveTypeVariable(clazz, tv, payload);
+ if (resolved == null || resolved.getTypeParameters().length > 0) {
+ return null;
+ }
+ sb.append('#').append(resolved.getName());
+ }
+ return sb.toString();
}
private String getClassName(Object payload) {
@@ -160,7 +194,7 @@ private String getClassName(Object payload) {
if (payload instanceof Collection) {
return getClassNameForCollection(name, (Collection>) payload);
}
- return getGenericFieldBasedClassName(payloadClass);
+ return getGenericFieldBasedClassName(payloadClass, payload);
}
private JavaType getTargetType(Msg msg) throws ClassNotFoundException {
diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/converter/GenericMessageConverterTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/converter/GenericMessageConverterTest.java
index 32d991f2..8b67b46f 100644
--- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/converter/GenericMessageConverterTest.java
+++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/converter/GenericMessageConverterTest.java
@@ -116,13 +116,45 @@ void toAndFromMessageList() {
}
@Test
- void genericMessageToReturnNull() {
+ void genericEnvelopeToAndFromMessage() {
GenericTestData data = new GenericTestData<>(10, comment);
Message message =
genericMessageConverter.toMessage(data, RqueueMessageHeaders.emptyMessageHeaders());
+ GenericTestData fromMessage =
+ (GenericTestData) genericMessageConverter.fromMessage(message, null);
+ assertEquals(data, fromMessage);
+ }
+
+ @Test
+ void envelopeEventToAndFromMessage() {
+ Event event = new Event<>("evt-1", comment);
+ Message message =
+ genericMessageConverter.toMessage(event, RqueueMessageHeaders.emptyMessageHeaders());
+ Event fromMessage =
+ (Event) genericMessageConverter.fromMessage(message, null);
+ assertEquals(event, fromMessage);
+ }
+
+ @Test
+ void envelopeEventWithNullPayloadToReturnNull() {
+ Event event = new Event<>("evt-1", null);
+ Message message =
+ genericMessageConverter.toMessage(event, RqueueMessageHeaders.emptyMessageHeaders());
assertNull(message);
}
+ @Test
+ void envelopeEventWithInheritedTypeToAndFromMessage() {
+ // T=Notification extends Alert extends BaseAlert — runtime class is Notification
+ Notification notification = new Notification("n-1", "hello", 42);
+ Event event = new Event<>("evt-2", notification);
+ Message message =
+ genericMessageConverter.toMessage(event, RqueueMessageHeaders.emptyMessageHeaders());
+ Event fromMessage =
+ (Event) genericMessageConverter.fromMessage(message, null);
+ assertEquals(event, fromMessage);
+ }
+
@Test
@Disabled
void multipleGenericFieldMessageToAndFrom() {
@@ -379,4 +411,50 @@ public static class GenericTestDataWithPredefinedType {
private Integer index;
private MultiGenericTestData data;
}
+
+ @Data
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class Event {
+
+ private String id;
+ private T payload;
+ }
+
+ // Three-level hierarchy: Notification extends Alert extends BaseAlert
+ @Data
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class BaseAlert {
+
+ private String id;
+ }
+
+ @Data
+ @EqualsAndHashCode(callSuper = true)
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class Alert extends BaseAlert {
+
+ private String message;
+
+ public Alert(String id, String message) {
+ super(id);
+ this.message = message;
+ }
+ }
+
+ @Data
+ @EqualsAndHashCode(callSuper = true)
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class Notification extends Alert {
+
+ private int priority;
+
+ public Notification(String id, String message, int priority) {
+ super(id, message);
+ this.priority = priority;
+ }
+ }
}
diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/support/RqueueMessageUtilsTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/support/RqueueMessageUtilsTest.java
index 7071dfed..be618e1e 100644
--- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/support/RqueueMessageUtilsTest.java
+++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/support/RqueueMessageUtilsTest.java
@@ -148,8 +148,8 @@ void buildMessageWithDelay() {
@Test
void buildMessageNull() {
+ // id is null so the type parameter T cannot be resolved, making conversion fail
GenericClass genericClass = new GenericClass<>();
- genericClass.id = UUID.randomUUID().toString();
try {
RqueueMessageUtils.buildMessage(
messageConverter,
@@ -167,8 +167,8 @@ void buildMessageNull() {
@Test
void buildPeriodicMessageNull() {
+ // id is null so the type parameter T cannot be resolved, making conversion fail
GenericClass genericClass = new GenericClass<>();
- genericClass.id = UUID.randomUUID().toString();
try {
RqueueMessageUtils.buildPeriodicMessage(
messageConverter,
@@ -186,8 +186,9 @@ void buildPeriodicMessageNull() {
@Test
void buildMessageReturnInvalidType() {
+ // id is null so GenericMessageConverter returns null; falls through to NoMessageConverter
+ // which wraps the object in a GenericMessage with a non-String/non-byte[] payload
GenericClass genericClass = new GenericClass<>();
- genericClass.id = UUID.randomUUID().toString();
try {
RqueueMessageUtils.buildMessage(
messageConverter2,
@@ -205,8 +206,9 @@ void buildMessageReturnInvalidType() {
@Test
void buildPeriodicMessageReturnInvalidType() {
+ // id is null so GenericMessageConverter returns null; falls through to NoMessageConverter
+ // which wraps the object in a GenericMessage with a non-String/non-byte[] payload
GenericClass genericClass = new GenericClass<>();
- genericClass.id = UUID.randomUUID().toString();
try {
RqueueMessageUtils.buildPeriodicMessage(
messageConverter2,
From 226827b604ff42237249fac45b53e760182c2be5 Mon Sep 17 00:00:00 2001
From: Sonu Kumar
Date: Wed, 18 Mar 2026 15:59:55 +0530
Subject: [PATCH 6/8] add github aciotns
---
.circleci/{config.yml => config.yml.disabled} | 0
.github/workflows/java-ci.yaml | 385 ++++++++++++++++++
.github/workflows/pr-format.yaml | 97 +++++
.../GenericMessageConverterTest.java | 14 +-
4 files changed, 483 insertions(+), 13 deletions(-)
rename .circleci/{config.yml => config.yml.disabled} (100%)
create mode 100644 .github/workflows/java-ci.yaml
create mode 100644 .github/workflows/pr-format.yaml
diff --git a/.circleci/config.yml b/.circleci/config.yml.disabled
similarity index 100%
rename from .circleci/config.yml
rename to .circleci/config.yml.disabled
diff --git a/.github/workflows/java-ci.yaml b/.github/workflows/java-ci.yaml
new file mode 100644
index 00000000..cd5ed2c8
--- /dev/null
+++ b/.github/workflows/java-ci.yaml
@@ -0,0 +1,385 @@
+name: Java CI
+
+on:
+ push:
+ branches:
+ - master
+ paths:
+ - "**/*.java"
+ - "**/*.gradle"
+ - "**/*.gradle.kts"
+ - "gradle.properties"
+ - "settings.gradle"
+ - "settings.gradle.kts"
+ - "gradle/**"
+ - "gradlew"
+ - "gradlew.bat"
+ pull_request:
+
+permissions:
+ contents: read
+
+concurrency:
+ group: java-ci-${{ github.ref }}
+ cancel-in-progress: true
+
+env:
+ REDIS_RUNNING: "true"
+ USER_NAME: rqueue
+ TERM: dumb
+ JVM_OPTS: -Xmx8g
+
+jobs:
+ build:
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v4
+
+ - name: Set up Java 17
+ uses: actions/setup-java@v4
+ with:
+ distribution: temurin
+ java-version: "17"
+ cache: gradle
+
+ - name: Expose Java 17 to Gradle toolchains
+ run: |
+ echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV"
+ echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV"
+ java -version
+ javac -version
+
+ - name: Set up Gradle
+ uses: gradle/actions/setup-gradle@v4
+
+ - name: Resolve dependencies
+ run: ./gradlew dependencies
+
+ - name: Compile main sources
+ run: ./gradlew compileJava
+
+ - name: Compile test sources
+ run: ./gradlew compileTestJava
+
+ unit_test:
+ needs: build
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v4
+
+ - name: Set up Java 17
+ uses: actions/setup-java@v4
+ with:
+ distribution: temurin
+ java-version: "17"
+ cache: gradle
+
+ - name: Expose Java 17 to Gradle toolchains
+ run: |
+ echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV"
+ echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV"
+ java -version
+ javac -version
+
+ - name: Set up Gradle
+ uses: gradle/actions/setup-gradle@v4
+
+ - name: Run unit tests
+ run: ./gradlew test -DincludeTags=unit
+
+ - name: Upload JaCoCo exec data
+ if: always()
+ uses: actions/upload-artifact@v4
+ with:
+ name: coverage-unit
+ path: "**/build/reports/jacoco/*.exec"
+ if-no-files-found: error
+
+ - name: Upload JUnit reports
+ if: always()
+ uses: actions/upload-artifact@v4
+ with:
+ name: unit-test-results
+ path: |
+ rqueue-spring-boot-starter/build/reports/junit/xml
+ rqueue-spring/build/reports/junit/xml
+ rqueue-core/build/reports/junit/xml
+ if-no-files-found: ignore
+
+ producer_only_test:
+ needs: build
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v4
+
+ - name: Set up Java 17
+ uses: actions/setup-java@v4
+ with:
+ distribution: temurin
+ java-version: "17"
+ cache: gradle
+
+ - name: Expose Java 17 to Gradle toolchains
+ run: |
+ echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV"
+ echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV"
+ java -version
+ javac -version
+
+ - name: Set up Gradle
+ uses: gradle/actions/setup-gradle@v4
+
+ - name: Install Redis
+ run: |
+ sudo apt-get update
+ sudo apt-get install -y redis-server
+ redis-cli --version
+
+ - name: Run producer-only tests
+ run: ./gradlew test -DincludeTags=producerOnly
+
+ - name: Upload JaCoCo exec data
+ if: always()
+ uses: actions/upload-artifact@v4
+ with:
+ name: coverage-producer
+ path: "**/build/reports/jacoco/*.exec"
+ if-no-files-found: error
+
+ - name: Upload JUnit reports
+ if: always()
+ uses: actions/upload-artifact@v4
+ with:
+ name: producer-test-results
+ path: |
+ rqueue-spring-boot-starter/build/reports/junit/xml
+ rqueue-spring/build/reports/junit/xml
+ rqueue-core/build/reports/junit/xml
+ if-no-files-found: ignore
+
+ integration_test:
+ needs: build
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v4
+
+ - name: Set up Java 17
+ uses: actions/setup-java@v4
+ with:
+ distribution: temurin
+ java-version: "17"
+ cache: gradle
+
+ - name: Expose Java 17 to Gradle toolchains
+ run: |
+ echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV"
+ echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV"
+ java -version
+ javac -version
+
+ - name: Set up Gradle
+ uses: gradle/actions/setup-gradle@v4
+
+ - name: Install Redis
+ run: |
+ sudo apt-get update
+ sudo apt-get install -y redis-server
+ redis-cli --version
+
+ - name: Run integration tests
+ run: ./gradlew test -DincludeTags=integration -DexcludeTags=redisCluster,producerOnly,local
+
+ - name: Upload JaCoCo exec data
+ if: always()
+ uses: actions/upload-artifact@v4
+ with:
+ name: coverage-integration
+ path: "**/build/reports/jacoco/*.exec"
+ if-no-files-found: error
+
+ - name: Upload JUnit reports
+ if: always()
+ uses: actions/upload-artifact@v4
+ with:
+ name: integration-test-results
+ path: |
+ rqueue-spring-boot-starter/build/reports/junit/xml
+ rqueue-spring/build/reports/junit/xml
+ rqueue-core/build/reports/junit/xml
+ if-no-files-found: ignore
+
+ redis_cluster_test:
+ needs: build
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v4
+
+ - name: Set up Java 17
+ uses: actions/setup-java@v4
+ with:
+ distribution: temurin
+ java-version: "17"
+ cache: gradle
+
+ - name: Expose Java 17 to Gradle toolchains
+ run: |
+ echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV"
+ echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV"
+ java -version
+ javac -version
+
+ - name: Set up Gradle
+ uses: gradle/actions/setup-gradle@v4
+
+ - name: Install Redis
+ run: |
+ sudo apt-get update
+ sudo apt-get install -y redis-server
+ redis-cli --version
+
+ - name: Setup Redis Cluster
+ run: |
+ mkdir 9000 9001 9002 9003 9004 9005
+ printf "port 9000 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9000/redis.conf
+ printf "port 9001 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9001/redis.conf
+ printf "port 9002 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9002/redis.conf
+ printf "port 9003 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9003/redis.conf
+ printf "port 9004 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9004/redis.conf
+ printf "port 9005 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9005/redis.conf
+ (cd 9000 && redis-server ./redis.conf) &
+ (cd 9001 && redis-server ./redis.conf) &
+ (cd 9002 && redis-server ./redis.conf) &
+ (cd 9003 && redis-server ./redis.conf) &
+ (cd 9004 && redis-server ./redis.conf) &
+ (cd 9005 && redis-server ./redis.conf) &
+ sleep 30
+ yes yes | redis-cli --cluster create 127.0.0.1:9000 127.0.0.1:9001 127.0.0.1:9002 127.0.0.1:9003 127.0.0.1:9004 127.0.0.1:9005 --cluster-replicas 1
+
+ - name: Run Redis cluster tests
+ run: ./gradlew test -DincludeTags=redisCluster
+
+ - name: Upload JaCoCo exec data
+ if: always()
+ uses: actions/upload-artifact@v4
+ with:
+ name: coverage-redis-cluster
+ path: "**/build/reports/jacoco/*.exec"
+ if-no-files-found: error
+
+ - name: Upload JUnit reports
+ if: always()
+ uses: actions/upload-artifact@v4
+ with:
+ name: redis-cluster-test-results
+ path: |
+ rqueue-spring-boot-starter/build/reports/junit/xml
+ rqueue-spring/build/reports/junit/xml
+ rqueue-core/build/reports/junit/xml
+ if-no-files-found: ignore
+
+ reactive_integration_test:
+ needs: build
+ runs-on: ubuntu-latest
+ env:
+ RQUEUE_REACTIVE_ENABLED: "true"
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v4
+
+ - name: Set up Java 17
+ uses: actions/setup-java@v4
+ with:
+ distribution: temurin
+ java-version: "17"
+ cache: gradle
+
+ - name: Expose Java 17 to Gradle toolchains
+ run: |
+ echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV"
+ echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV"
+ java -version
+ javac -version
+
+ - name: Set up Gradle
+ uses: gradle/actions/setup-gradle@v4
+
+ - name: Install Redis
+ run: |
+ sudo apt-get update
+ sudo apt-get install -y redis-server
+ redis-cli --version
+
+ - name: Run reactive integration tests
+ run: ./gradlew test -DincludeTags=integration -DexcludeTags=redisCluster,producerOnly,local
+
+ - name: Upload JaCoCo exec data
+ if: always()
+ uses: actions/upload-artifact@v4
+ with:
+ name: coverage-reactive
+ path: "**/build/reports/jacoco/*.exec"
+ if-no-files-found: error
+
+ - name: Upload JUnit reports
+ if: always()
+ uses: actions/upload-artifact@v4
+ with:
+ name: reactive-integration-test-results
+ path: |
+ rqueue-spring-boot-starter/build/reports/junit/xml
+ rqueue-spring/build/reports/junit/xml
+ rqueue-core/build/reports/junit/xml
+ if-no-files-found: ignore
+
+ coverage_report:
+ needs:
+ - unit_test
+ - producer_only_test
+ - integration_test
+ - redis_cluster_test
+ - reactive_integration_test
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v4
+
+ - name: Set up Java 17
+ uses: actions/setup-java@v4
+ with:
+ distribution: temurin
+ java-version: "17"
+ cache: gradle
+
+ - name: Expose Java 17 to Gradle toolchains
+ run: |
+ echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV"
+ echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV"
+ java -version
+ javac -version
+
+ - name: Set up Gradle
+ uses: gradle/actions/setup-gradle@v4
+
+ - name: Download coverage artifacts
+ uses: actions/download-artifact@v4
+ with:
+ pattern: coverage-*
+ path: coverage-artifacts
+ merge-multiple: false
+
+ - name: Generate merged coverage report
+ run: ./gradlew coverageReportOnly
+
+ - name: Upload coverage report
+ uses: actions/upload-artifact@v4
+ with:
+ name: coverage-report
+ path: |
+ build/reports/jacoco/test/jacocoTestReport.xml
+ build/reports/jacoco/coverageReportOnly/html
+ if-no-files-found: ignore
diff --git a/.github/workflows/pr-format.yaml b/.github/workflows/pr-format.yaml
new file mode 100644
index 00000000..6e90d7ff
--- /dev/null
+++ b/.github/workflows/pr-format.yaml
@@ -0,0 +1,97 @@
+name: Format PR Code
+
+on:
+ pull_request:
+ types:
+ - opened
+ - synchronize
+ - reopened
+ - ready_for_review
+ push:
+ branches:
+ - master
+ paths:
+ - "**/*.java"
+
+jobs:
+ format:
+ if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name == github.repository
+ runs-on: ubuntu-latest
+ permissions:
+ contents: write
+
+ steps:
+ - name: Checkout branch
+ uses: actions/checkout@v4
+ with:
+ ref: ${{ github.event_name == 'pull_request' && github.event.pull_request.head.ref || github.ref_name }}
+ fetch-depth: 0
+
+ - name: Set up Java 21
+ uses: actions/setup-java@v4
+ with:
+ distribution: temurin
+ java-version: "21"
+ cache: gradle
+
+ - name: Expose Java 21 to Gradle toolchains
+ run: |
+ echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV"
+ echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV"
+ java -version
+ javac -version
+
+ - name: Set up Gradle
+ uses: gradle/actions/setup-gradle@v4
+
+ - name: Format Java sources
+ run: ./gradlew formatJava
+
+ - name: Commit formatted changes
+ run: |
+ if git diff --quiet; then
+ echo "No formatting changes detected"
+ exit 0
+ fi
+ git config user.name "github-actions[bot]"
+ git config user.email "41898282+github-actions[bot]@users.noreply.github.com"
+ git add -A
+ git commit -m "Apply Palantir Java Format"
+ git push
+
+ check-format:
+ if: github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name != github.repository
+ runs-on: ubuntu-latest
+ permissions:
+ contents: read
+
+ steps:
+ - name: Checkout branch
+ uses: actions/checkout@v4
+
+ - name: Set up Java 21
+ uses: actions/setup-java@v4
+ with:
+ distribution: temurin
+ java-version: "21"
+ cache: gradle
+
+ - name: Expose Java 21 to Gradle toolchains
+ run: |
+ echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV"
+ echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV"
+ java -version
+ javac -version
+
+ - name: Set up Gradle
+ uses: gradle/actions/setup-gradle@v4
+
+ - name: Check Java formatting
+ run: ./gradlew checkFormatJava
+
+ - name: Explain how to fix formatting
+ if: failure()
+ run: |
+ echo "Java formatting check failed."
+ echo "This pull request comes from a fork, so CI cannot push formatting commits back to the branch."
+ echo "Run './gradlew formatJava' locally and push the formatted changes to update the PR."
diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/converter/GenericMessageConverterTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/converter/GenericMessageConverterTest.java
index 8b67b46f..2e264276 100644
--- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/converter/GenericMessageConverterTest.java
+++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/converter/GenericMessageConverterTest.java
@@ -146,7 +146,7 @@ void envelopeEventWithNullPayloadToReturnNull() {
@Test
void envelopeEventWithInheritedTypeToAndFromMessage() {
// T=Notification extends Alert extends BaseAlert — runtime class is Notification
- Notification notification = new Notification("n-1", "hello", 42);
+ Notification notification = new Notification(42);
Event event = new Event<>("evt-2", notification);
Message message =
genericMessageConverter.toMessage(event, RqueueMessageHeaders.emptyMessageHeaders());
@@ -431,30 +431,18 @@ public static class BaseAlert {
}
@Data
- @EqualsAndHashCode(callSuper = true)
@NoArgsConstructor
@AllArgsConstructor
public static class Alert extends BaseAlert {
private String message;
-
- public Alert(String id, String message) {
- super(id);
- this.message = message;
- }
}
@Data
- @EqualsAndHashCode(callSuper = true)
@NoArgsConstructor
@AllArgsConstructor
public static class Notification extends Alert {
private int priority;
-
- public Notification(String id, String message, int priority) {
- super(id, message);
- this.priority = priority;
- }
}
}
From 14595e1f0fe1fa95514edca2f4956b1eb6aac13f Mon Sep 17 00:00:00 2001
From: Sonu Kumar
Date: Wed, 18 Mar 2026 16:02:31 +0530
Subject: [PATCH 7/8] remvoe formatter
---
.github/workflows/pr-format.yaml | 97 --------------------------------
1 file changed, 97 deletions(-)
delete mode 100644 .github/workflows/pr-format.yaml
diff --git a/.github/workflows/pr-format.yaml b/.github/workflows/pr-format.yaml
deleted file mode 100644
index 6e90d7ff..00000000
--- a/.github/workflows/pr-format.yaml
+++ /dev/null
@@ -1,97 +0,0 @@
-name: Format PR Code
-
-on:
- pull_request:
- types:
- - opened
- - synchronize
- - reopened
- - ready_for_review
- push:
- branches:
- - master
- paths:
- - "**/*.java"
-
-jobs:
- format:
- if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name == github.repository
- runs-on: ubuntu-latest
- permissions:
- contents: write
-
- steps:
- - name: Checkout branch
- uses: actions/checkout@v4
- with:
- ref: ${{ github.event_name == 'pull_request' && github.event.pull_request.head.ref || github.ref_name }}
- fetch-depth: 0
-
- - name: Set up Java 21
- uses: actions/setup-java@v4
- with:
- distribution: temurin
- java-version: "21"
- cache: gradle
-
- - name: Expose Java 21 to Gradle toolchains
- run: |
- echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV"
- echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV"
- java -version
- javac -version
-
- - name: Set up Gradle
- uses: gradle/actions/setup-gradle@v4
-
- - name: Format Java sources
- run: ./gradlew formatJava
-
- - name: Commit formatted changes
- run: |
- if git diff --quiet; then
- echo "No formatting changes detected"
- exit 0
- fi
- git config user.name "github-actions[bot]"
- git config user.email "41898282+github-actions[bot]@users.noreply.github.com"
- git add -A
- git commit -m "Apply Palantir Java Format"
- git push
-
- check-format:
- if: github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name != github.repository
- runs-on: ubuntu-latest
- permissions:
- contents: read
-
- steps:
- - name: Checkout branch
- uses: actions/checkout@v4
-
- - name: Set up Java 21
- uses: actions/setup-java@v4
- with:
- distribution: temurin
- java-version: "21"
- cache: gradle
-
- - name: Expose Java 21 to Gradle toolchains
- run: |
- echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV"
- echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV"
- java -version
- javac -version
-
- - name: Set up Gradle
- uses: gradle/actions/setup-gradle@v4
-
- - name: Check Java formatting
- run: ./gradlew checkFormatJava
-
- - name: Explain how to fix formatting
- if: failure()
- run: |
- echo "Java formatting check failed."
- echo "This pull request comes from a fork, so CI cannot push formatting commits back to the branch."
- echo "Run './gradlew formatJava' locally and push the formatted changes to update the PR."
From fa8241f40dc5daef06cb9ef8412395983fb7881d Mon Sep 17 00:00:00 2001
From: Sonu Kumar
Date: Wed, 18 Mar 2026 16:16:08 +0530
Subject: [PATCH 8/8] update change log
---
docs/CHANGELOG.md | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md
index f1b768b8..56b6ac62 100644
--- a/docs/CHANGELOG.md
+++ b/docs/CHANGELOG.md
@@ -8,9 +8,10 @@ layout: default
All notable user-facing changes to this project are documented in this file.
-## Release [3.4.1] 15-March-2026
+## Release [3.4.1] 18-March-2026
### Features
* Backported `HARD_STRICT` priority mode from #279 for stricter priority queue polling on the 3.x line
+* Added typed/generic message parameter support — listeners can now declare strongly-typed message parameters and rely on `GenericMessageConverter` for automatic deserialization
### Fixes
* Optimized hard-strict polling queue availability checks to avoid full list and sorted-set reads