From 084f0025d1fd4f8b3fcc7735a7e0319f7cc0d7ce Mon Sep 17 00:00:00 2001 From: igorjava2025 <-> Date: Sun, 8 Mar 2026 22:12:57 +0300 Subject: [PATCH 1/8] Add HARD_STRICT mode See details there https://github.com/sonus21/rqueue/issues/276 (cherry picked from commit acc09bf4cbac84694e7217920d6d51869712adf9) --- .../SimpleRqueueListenerContainerFactory.java | 14 ++ .../listener/HardStrictPriorityPoller.java | 206 ++++++++++++++++++ .../HardStrictPriorityPollerProperties.java | 24 ++ .../RqueueMessageListenerContainer.java | 24 ++ .../rqueue/models/enums/PriorityMode.java | 3 +- .../HardStrictPriorityPollerTest.java | 178 +++++++++++++++ 6 files changed, 448 insertions(+), 1 deletion(-) create mode 100644 rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPoller.java create mode 100644 rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerProperties.java create mode 100644 rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerTest.java diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java index 5da2e641..ac7a4e90 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java @@ -27,6 +27,7 @@ import com.github.sonus21.rqueue.core.impl.RqueueMessageTemplateImpl; import com.github.sonus21.rqueue.core.middleware.Middleware; import com.github.sonus21.rqueue.core.support.MessageProcessor; +import com.github.sonus21.rqueue.listener.HardStrictPriorityPollerProperties; import com.github.sonus21.rqueue.listener.RqueueMessageHandler; import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer; import com.github.sonus21.rqueue.models.enums.PriorityMode; @@ -92,6 +93,8 @@ public class SimpleRqueueListenerContainerFactory { // Set priority mode for the pollers private PriorityMode priorityMode = PriorityMode.WEIGHTED; + // Set HardStrictPriorityPollerProperties for HARD_STRICT priority mode poller + private HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties; /** * Whether all beans of spring application should be inspected to find methods annotated with @@ -348,6 +351,9 @@ public RqueueMessageListenerContainer createMessageListenerContainer() { if (messageHeaders != null) { messageListenerContainer.setMessageHeaders(messageHeaders); } + if (hardStrictPriorityPollerProperties != null) { + messageListenerContainer.setHardStrictPriorityPollerProperties(hardStrictPriorityPollerProperties); + } return messageListenerContainer; } @@ -486,6 +492,14 @@ public void setMessageHeaders(MessageHeaders messageHeaders) { this.messageHeaders = messageHeaders; } + public HardStrictPriorityPollerProperties getHardStrictPriorityPollerProperties() { + return this.hardStrictPriorityPollerProperties; + } + + public void setHardStrictPriorityPollerProperties(HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties) { + this.hardStrictPriorityPollerProperties = hardStrictPriorityPollerProperties; + } + /** * Rqueue scans all beans to find method annotated with {@link RqueueListener}. * diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPoller.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPoller.java new file mode 100644 index 00000000..3d824bc3 --- /dev/null +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPoller.java @@ -0,0 +1,206 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ + +package com.github.sonus21.rqueue.listener; + +import com.github.sonus21.rqueue.core.RqueueBeanProvider; +import com.github.sonus21.rqueue.core.middleware.Middleware; +import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer.QueueStateMgr; +import com.github.sonus21.rqueue.utils.Constants; +import com.github.sonus21.rqueue.utils.QueueThreadPool; +import com.github.sonus21.rqueue.utils.TimeoutUtils; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.slf4j.event.Level; +import org.springframework.messaging.MessageHeaders; + +/** + * Use it only with priority queues. + * Message processing can be slow. + * The hard strict priority algorithm is better in HardStrictPriorityPoller than in StrictPriorityPoller + * More details see in GitHub project issue + */ +class HardStrictPriorityPoller extends RqueueMessagePoller { + + private final RqueueBeanProvider rqueueBeanProvider; + + private final Map queueNameToDetail; + private final Map queueNameToThread; + private final Map queueDeactivationTime = new HashMap<>(); + private final HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties; + + HardStrictPriorityPoller( + String groupName, + final List queueDetails, + final Map queueNameToThread, + RqueueBeanProvider rqueueBeanProvider, + QueueStateMgr queueStateMgr, + List middlewares, + long pollingInterval, + long backoffTime, + PostProcessingHandler postProcessingHandler, + MessageHeaders messageHeaders, + HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties) { + super( + "HardStrict-" + groupName, + rqueueBeanProvider, + queueStateMgr, + middlewares, + pollingInterval, + backoffTime, + postProcessingHandler, + messageHeaders); + + this.rqueueBeanProvider = rqueueBeanProvider; + // Sort queues by priority once during initialization + List queueDetailList = new ArrayList<>(queueDetails); + queueDetailList.sort( + (o1, o2) -> + o2.getPriority().get(Constants.DEFAULT_PRIORITY_KEY) + - o1.getPriority().get(Constants.DEFAULT_PRIORITY_KEY)); + + this.queues = queueDetailList.stream().map(QueueDetail::getName).collect(Collectors.toList()); + this.queueNameToDetail = + queueDetailList.stream() + .collect(Collectors.toMap(QueueDetail::getName, Function.identity())); + this.queueNameToThread = queueNameToThread; + this.hardStrictPriorityPollerProperties = hardStrictPriorityPollerProperties != null + ? hardStrictPriorityPollerProperties + : new HardStrictPriorityPollerProperties(); + } + + @Override + public void start() { + log(Level.DEBUG, "Running, Ordered Queues: {}", null, queues); + while (true) { + if (shouldExit()) { + return; + } + + boolean messageFoundInAnyQueue = false; + + try { + for (String queue : queues) { + if (eligibleForPolling(queue) && !isDeactivated(queue)) { + QueueThreadPool queueThreadPool = queueNameToThread.get(queue); + QueueDetail queueDetail = queueNameToDetail.get(queue); + poll(-1, queue, queueDetail, queueThreadPool); + + if (hardStrictPriorityPollerProperties.getAfterPollSleepInterval() != null) { + TimeoutUtils.sleepLog(hardStrictPriorityPollerProperties.getAfterPollSleepInterval(), false); + } + + if (existMessagesInCurrentQueueOrHigherPriorityQueue(queue, queues)) { + // break current cycle and start new cycle + // it allow to process queue with the higher priority + messageFoundInAnyQueue = true; + break; + } + } + } + + // If no messages were found across all queues, sleep for the polling interval + if (!messageFoundInAnyQueue) { + TimeoutUtils.sleepLog(pollingInterval, false); + } + + } catch (Throwable e) { + log(Level.ERROR, "Exception in the poller {}", e, e.getMessage()); + if (shouldExit()) { + return; + } + TimeoutUtils.sleepLog(backoffTime, false); + } + } + } + + boolean existMessagesInCurrentQueueOrHigherPriorityQueue(String currentQueue, List queues) { + for (String queue : queues) { + if (eligibleForPolling(queue) && !isDeactivated(queue)) { + QueueDetail queueDetail = queueNameToDetail.get(queue); + if (existAvailableMessagesForPoll(queueDetail)) { + // the current or higher priority queue contains messages that need to be processed. + return true; + } + } + // we check all queues from the highest priority to current queue + if (queue.equals(currentQueue)) { + return false; + } + } + // unexpected behavior, need more details if it occurs + log(Level.WARN, "current queue '{}' not found in queues list '{}'", null, currentQueue, queues); + return false; + } + + protected boolean existAvailableMessagesForPoll(QueueDetail queueDetail) { + List readyMessages = rqueueBeanProvider + .getRqueueMessageTemplate() + .readFromList(queueDetail.getQueueName(), 0, 0); + + if (readyMessages != null && !readyMessages.isEmpty()) { + log(Level.TRACE, "readyMessages exists for queue '{}', existAvailableMessagesForPoll = true.", null, queueDetail.getName()); + return true; + } + + // Only check delayed messages with score <= current time + long currentTime = System.currentTimeMillis(); + List delayedMessages = rqueueBeanProvider + .getRqueueMessageTemplate() + .readFromZsetWithScore(queueDetail.getScheduledQueueName(), 0, currentTime); + + if (delayedMessages != null && !delayedMessages.isEmpty()) { + log(Level.TRACE, "delayedMessages exists for queue '{}', existAvailableMessagesForPoll = true.", null, queueDetail.getName()); + return true; + } + + return false; + } + + private boolean isDeactivated(String queue) { + Long deactivationTime = queueDeactivationTime.get(queue); + if (deactivationTime == null) { + return false; + } + if (System.currentTimeMillis() - deactivationTime > pollingInterval) { + queueDeactivationTime.remove(queue); + return false; + } + return true; + } + + @Override + long getSemaphoreWaitTime() { + return hardStrictPriorityPollerProperties.getSemaphoreWaitTime() != null + ? hardStrictPriorityPollerProperties.getSemaphoreWaitTime() + : 20L; + } + + @Override + void deactivate(int index, String queue, DeactivateType deactivateType) { + if (deactivateType == DeactivateType.POLL_FAILED) { + // Pause in case of connection errors or polling failures + TimeoutUtils.sleepLog(backoffTime, false); + } else { + // Mark deactivation time if the queue is empty + queueDeactivationTime.put(queue, System.currentTimeMillis()); + } + } +} diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerProperties.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerProperties.java new file mode 100644 index 00000000..99ca44b9 --- /dev/null +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerProperties.java @@ -0,0 +1,24 @@ +package com.github.sonus21.rqueue.listener; + +public class HardStrictPriorityPollerProperties { + // set such default values for parameters "afterPollSleepInterval" and "semaphoreWaitTime" + // because local load tests have correct strict priority algorithm work and good performance with them + private Long afterPollSleepInterval = 30L; + private Long semaphoreWaitTime = 15L; + + public Long getAfterPollSleepInterval() { + return afterPollSleepInterval; + } + + public void setAfterPollSleepInterval(Long afterPollSleepInterval) { + this.afterPollSleepInterval = afterPollSleepInterval; + } + + public Long getSemaphoreWaitTime() { + return this.semaphoreWaitTime; + } + + public void setSemaphoreWaitTime(Long semaphoreWaitTime) { + this.semaphoreWaitTime = semaphoreWaitTime; + } +} diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java index 440a836a..36b8dc73 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java @@ -106,6 +106,7 @@ public class RqueueMessageListenerContainer private int phase = Integer.MAX_VALUE; private PriorityMode priorityMode; private MessageHeaders messageHeaders; + private HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties; public RqueueMessageListenerContainer( RqueueMessageHandler rqueueMessageHandler, RqueueMessageTemplate rqueueMessageTemplate) { @@ -515,6 +516,21 @@ protected void startGroup(String groupName, List queueDetails) { backOffTime, postProcessingHandler, getMessageHeaders())); + } else if (getPriorityMode() == PriorityMode.HARD_STRICT) { + future = + taskExecutor.submit( + new HardStrictPriorityPoller( + StringUtils.groupName(groupName), + queueDetails, + queueThread, + rqueueBeanProvider, + queueStateMgr, + getMiddleWares(), + pollingInterval, + backOffTime, + postProcessingHandler, + getMessageHeaders(), + getHardStrictPriorityPollerProperties())); } else { future = taskExecutor.submit( @@ -712,6 +728,14 @@ public void setMessageHeaders(MessageHeaders messageHeaders) { this.messageHeaders = messageHeaders; } + public HardStrictPriorityPollerProperties getHardStrictPriorityPollerProperties() { + return this.hardStrictPriorityPollerProperties; + } + + public void setHardStrictPriorityPollerProperties(HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties) { + this.hardStrictPriorityPollerProperties = hardStrictPriorityPollerProperties; + } + class QueueStateMgr { Set pausedQueues = ConcurrentHashMap.newKeySet(); diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/enums/PriorityMode.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/enums/PriorityMode.java index 0b6e556a..0fa83405 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/enums/PriorityMode.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/enums/PriorityMode.java @@ -18,5 +18,6 @@ public enum PriorityMode { STRICT, - WEIGHTED + WEIGHTED, + HARD_STRICT } diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerTest.java new file mode 100644 index 00000000..9315c2c9 --- /dev/null +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerTest.java @@ -0,0 +1,178 @@ +package com.github.sonus21.rqueue.listener; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import com.github.sonus21.TestBase; +import com.github.sonus21.rqueue.CoreUnitTest; +import com.github.sonus21.rqueue.core.RqueueBeanProvider; +import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer.QueueStateMgr; +import com.github.sonus21.rqueue.utils.Constants; +import com.github.sonus21.rqueue.utils.QueueThreadPool; +import com.github.sonus21.rqueue.utils.TimeoutUtils; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.springframework.messaging.MessageHeaders; + +@CoreUnitTest +class HardStrictPriorityPollerTest extends TestBase { + + @Mock + private RqueueBeanProvider rqueueBeanProvider; + @Mock + private QueueStateMgr queueStateMgr; + @Mock + private PostProcessingHandler postProcessingHandler; + + private final String highPriorityQueue = "high-priority-" + UUID.randomUUID(); + private final String lowPriorityQueue = "low-priority-" + UUID.randomUUID(); + private HardStrictPriorityPoller poller; + private QueueDetail highDetail; + private QueueDetail lowDetail; + + @BeforeEach + public void setUp() { + MockitoAnnotations.openMocks(this); + + highDetail = createQueueDetail(highPriorityQueue, 100); + lowDetail = createQueueDetail(lowPriorityQueue, 10); + + List queueDetails = Arrays.asList(lowDetail, highDetail); + Map queueNameToThread = new HashMap<>(); + queueNameToThread.put(highPriorityQueue, mock(QueueThreadPool.class)); + queueNameToThread.put(lowPriorityQueue, mock(QueueThreadPool.class)); + + poller = spy(new HardStrictPriorityPoller( + "test-group", + queueDetails, + queueNameToThread, + rqueueBeanProvider, + queueStateMgr, + Collections.emptyList(), + 50L, + 50L, + postProcessingHandler, + new MessageHeaders(Collections.emptyMap()), + new HardStrictPriorityPollerProperties() + )); + + // КРИТИЧЕСКИ ВАЖНО: Разрешаем опрос очередей + lenient().doReturn(true).when(poller).eligibleForPolling(anyString()); + // КРИТИЧЕСКИ ВАЖНО: Запрещаем немедленный выход из цикла + lenient().doReturn(false).when(poller).shouldExit(); + } + + private QueueDetail createQueueDetail(String name, int priority) { + QueueDetail detail = mock(QueueDetail.class); + when(detail.getName()).thenReturn(name); + Map priorityMap = new HashMap<>(); + priorityMap.put(Constants.DEFAULT_PRIORITY_KEY, priority); + when(detail.getPriority()).thenReturn(priorityMap); + return detail; + } + + @Test + void testQueuesAreSortedByPriority() { + List sortedQueues = poller.queues; + assertTrue(sortedQueues.indexOf(highPriorityQueue) < sortedQueues.indexOf(lowPriorityQueue), + "High priority queue should be first"); + } + + @Test + void testExistMessagesInHigherPriorityQueueReturnsTrue() { + List queues = Arrays.asList(highPriorityQueue, lowPriorityQueue); + + // High queue has messages + lenient().doReturn(true).when(poller).existAvailableMessagesForPoll(highDetail); + + // Checking from low priority perspective + boolean result = poller.existMessagesInCurrentQueueOrHigherPriorityQueue(lowPriorityQueue, queues); + assertTrue(result, "Should return true because high priority queue has messages"); + } + + @Test + void testStrictExecutionPreventsLowPriorityPoll() throws Exception { + AtomicInteger highQueuePollCount = new AtomicInteger(0); + AtomicInteger lowQueuePollCount = new AtomicInteger(0); + + // High Priority always has messages + lenient().doReturn(true).when(poller).existAvailableMessagesForPoll(highDetail); + lenient().doAnswer(invocation -> { + highQueuePollCount.incrementAndGet(); + return 1; + }).when(poller).poll(anyInt(), eq(highPriorityQueue), eq(highDetail), any()); + + // Low Priority also has messages + lenient().doReturn(true).when(poller).existAvailableMessagesForPoll(lowDetail); + lenient().doAnswer(invocation -> { + lowQueuePollCount.incrementAndGet(); + return 1; + }).when(poller).poll(anyInt(), eq(lowPriorityQueue), eq(lowDetail), any()); + + Thread pollerThread = new Thread(poller::start); + pollerThread.start(); + + try { + // Wait for multiple polls of High priority + TimeoutUtils.waitFor(() -> highQueuePollCount.get() > 5, 2000, "high priority polls"); + + // Low priority must NEVER be polled because 'break' happens after high poll + assertTrue(lowQueuePollCount.get() == 0, "Low priority queue should not be polled"); + } finally { + stop(poller, pollerThread); + } + } + + @Test + void testLowPriorityIsPolledWhenHighIsEmpty() throws Exception { + AtomicInteger lowQueuePollCount = new AtomicInteger(0); + + // High is empty + lenient().doReturn(false).when(poller).existAvailableMessagesForPoll(highDetail); + + // Low has messages + lenient().doReturn(true).when(poller).existAvailableMessagesForPoll(lowDetail); + lenient().doAnswer(invocation -> { + lowQueuePollCount.incrementAndGet(); + return 1; + }).when(poller).poll(anyInt(), eq(lowPriorityQueue), eq(lowDetail), any()); + + Thread pollerThread = new Thread(poller::start); + pollerThread.start(); + + try { + TimeoutUtils.waitFor(() -> lowQueuePollCount.get() > 0, 2000, "low priority poll"); + assertTrue(lowQueuePollCount.get() > 0); + } finally { + stop(poller, pollerThread); + } + } + + private void stop(HardStrictPriorityPoller poller, Thread thread) { + lenient().doReturn(true).when(poller).shouldExit(); + if (thread != null && thread.isAlive()) { + thread.interrupt(); + try { + thread.join(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } +} \ No newline at end of file From 3f5b0a0637b253591025b7290bcc5b48b61b8a03 Mon Sep 17 00:00:00 2001 From: igorjava2025 <-> Date: Mon, 9 Mar 2026 10:41:05 +0300 Subject: [PATCH 2/8] fix comment (cherry picked from commit c92cb75dc50a1590f3881739db9f43b110c99217) --- .../sonus21/rqueue/listener/HardStrictPriorityPollerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerTest.java index 9315c2c9..2392ebb1 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerTest.java @@ -72,9 +72,9 @@ public void setUp() { new HardStrictPriorityPollerProperties() )); - // КРИТИЧЕСКИ ВАЖНО: Разрешаем опрос очередей + // Allowing queue polling lenient().doReturn(true).when(poller).eligibleForPolling(anyString()); - // КРИТИЧЕСКИ ВАЖНО: Запрещаем немедленный выход из цикла + // Disable immediate exit from the loop lenient().doReturn(false).when(poller).shouldExit(); } From c6459d3e6efb26622c06308fe8c50a1e686ae424 Mon Sep 17 00:00:00 2001 From: igorjava2025 <-> Date: Fri, 13 Mar 2026 12:54:02 +0300 Subject: [PATCH 3/8] add fixes after review (cherry picked from commit 21d6d8643aabe24b13f28244952a6d778b8732a4) --- .../rqueue/core/RqueueMessageTemplate.java | 7 +++ .../core/impl/RqueueMessageTemplateImpl.java | 16 ++++++ .../listener/HardStrictPriorityPoller.java | 38 +++++++++----- .../HardStrictPriorityPollerProperties.java | 9 ++++ ...ardStrictPriorityPollerPropertiesTest.java | 49 +++++++++++++++++++ 5 files changed, 107 insertions(+), 12 deletions(-) create mode 100644 rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerPropertiesTest.java diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java index 1885eafd..caf2e25f 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java @@ -18,6 +18,7 @@ import com.github.sonus21.rqueue.models.MessageMoveResult; import java.util.List; +import java.util.Optional; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ZSetOperations.TypedTuple; import reactor.core.publisher.Flux; @@ -92,4 +93,10 @@ Long scheduleMessage( Flux addReactiveMessageWithDelay( String scheduledQueueName, String scheduledQueueChannelName, RqueueMessage rqueueMessage); + + Optional findFirstElementFromList(String name); + + Optional findFirstElementFromZset(String name); + + Optional> findFirstElementFromZsetWithScore(String name); } diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageTemplateImpl.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageTemplateImpl.java index fa6b288d..f121e775 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageTemplateImpl.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageTemplateImpl.java @@ -30,6 +30,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Set; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; @@ -343,4 +344,19 @@ public RedisTemplate getTemplate() { public Long removeElementFromZset(String zsetName, RqueueMessage rqueueMessage) { return super.removeFromZset(zsetName, rqueueMessage); } + + @Override + public Optional findFirstElementFromList(String name) { + return readFromList(name, 0, 0).stream().findFirst(); + } + + @Override + public Optional findFirstElementFromZset(String name) { + return readFromZset(name, 0, 0).stream().findFirst(); + } + + @Override + public Optional> findFirstElementFromZsetWithScore(String name) { + return readFromZsetWithScore(name, 0, 0).stream().findFirst(); + } } diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPoller.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPoller.java index 3d824bc3..04443827 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPoller.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPoller.java @@ -151,23 +151,37 @@ boolean existMessagesInCurrentQueueOrHigherPriorityQueue(String currentQueue, Li } protected boolean existAvailableMessagesForPoll(QueueDetail queueDetail) { - List readyMessages = rqueueBeanProvider - .getRqueueMessageTemplate() - .readFromList(queueDetail.getQueueName(), 0, 0); - - if (readyMessages != null && !readyMessages.isEmpty()) { - log(Level.TRACE, "readyMessages exists for queue '{}', existAvailableMessagesForPoll = true.", null, queueDetail.getName()); + boolean readyMessagesExists = + rqueueBeanProvider + .getRqueueMessageTemplate() + .findFirstElementFromList(queueDetail.getQueueName()) + .isPresent(); + if (readyMessagesExists) { + log( + Level.TRACE, + "readyMessages exists for queue '{}', existAvailableMessagesForPoll = true.", + null, + queueDetail.getName()); return true; } // Only check delayed messages with score <= current time long currentTime = System.currentTimeMillis(); - List delayedMessages = rqueueBeanProvider - .getRqueueMessageTemplate() - .readFromZsetWithScore(queueDetail.getScheduledQueueName(), 0, currentTime); - - if (delayedMessages != null && !delayedMessages.isEmpty()) { - log(Level.TRACE, "delayedMessages exists for queue '{}', existAvailableMessagesForPoll = true.", null, queueDetail.getName()); + boolean delayedMessagesExists = + rqueueBeanProvider + .getRqueueMessageTemplate() + .findFirstElementFromZsetWithScore(queueDetail.getScheduledQueueName()) + .filter(element -> element.getScore() <= currentTime) + .isPresent(); + + if (delayedMessagesExists) { + log( + Level.TRACE, + "delayedMessages exists for scheduled queue '{}' and currentTime '{}'," + + " existAvailableMessagesForPoll = true.", + null, + queueDetail.getScheduledQueueName(), + currentTime); return true; } diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerProperties.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerProperties.java index 99ca44b9..618aadac 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerProperties.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerProperties.java @@ -11,6 +11,7 @@ public Long getAfterPollSleepInterval() { } public void setAfterPollSleepInterval(Long afterPollSleepInterval) { + validateTimeInterval(afterPollSleepInterval); this.afterPollSleepInterval = afterPollSleepInterval; } @@ -19,6 +20,14 @@ public Long getSemaphoreWaitTime() { } public void setSemaphoreWaitTime(Long semaphoreWaitTime) { + validateTimeInterval(semaphoreWaitTime); this.semaphoreWaitTime = semaphoreWaitTime; } + + private void validateTimeInterval(Long value) { + if (value == null || value > 0) { + return; + } + throw new IllegalArgumentException("Value must be positive: " + value); + } } diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerPropertiesTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerPropertiesTest.java new file mode 100644 index 00000000..efa4e261 --- /dev/null +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerPropertiesTest.java @@ -0,0 +1,49 @@ +package com.github.sonus21.rqueue.listener; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.stream.LongStream; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.NullSource; + +class HardStrictPriorityPollerPropertiesTest { + + private final HardStrictPriorityPollerProperties properties = + new HardStrictPriorityPollerProperties(); + + static LongStream invalidValues() { + return LongStream.of(0L, -1L, -100L, Long.MIN_VALUE); + } + + static LongStream validValues() { + return LongStream.of(1L, Long.MAX_VALUE); + } + + @ParameterizedTest + @MethodSource("invalidValues") + void setAfterPollSleepIntervalInvalidValues(Long value) { + assertThrows(IllegalArgumentException.class, () -> properties.setAfterPollSleepInterval(value)); + } + + @ParameterizedTest + @NullSource + @MethodSource("validValues") + void setAfterPollSleepIntervalValidValues(Long value) { + assertDoesNotThrow(() -> properties.setAfterPollSleepInterval(value)); + } + + @ParameterizedTest + @MethodSource("invalidValues") + void setSemaphoreWaitTimeInvalidValues(Long value) { + assertThrows(IllegalArgumentException.class, () -> properties.setSemaphoreWaitTime(value)); + } + + @ParameterizedTest + @NullSource + @MethodSource("validValues") + void setSemaphoreWaitTimeValidValues(Long value) { + assertDoesNotThrow(() -> properties.setSemaphoreWaitTime(value)); + } +} From 5aff890c62520aae24a235fadd994b6bf504edf9 Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Sun, 15 Mar 2026 01:37:43 +0530 Subject: [PATCH 4/8] Prepare 3.4.1 backport release --- build.gradle | 2 +- docs/CHANGELOG.md | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 74f54259..432ecfee 100644 --- a/build.gradle +++ b/build.gradle @@ -69,7 +69,7 @@ ext { subprojects { group = "com.github.sonus21" - version = "3.4.0-RELEASE" + version = "3.4.1-RELEASE" dependencies { // https://mvnrepository.com/artifact/org.springframework/spring-messaging diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 80a1c68e..f1b768b8 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -8,6 +8,13 @@ layout: default All notable user-facing changes to this project are documented in this file. +## Release [3.4.1] 15-March-2026 +### Features +* Backported `HARD_STRICT` priority mode from #279 for stricter priority queue polling on the 3.x line + +### Fixes +* Optimized hard-strict polling queue availability checks to avoid full list and sorted-set reads + ## Release [3.4.0] 22-July-2025 ### Fixes * Fixed unique enqueue message to reject the message upfront instead of identifying it later #259 From aaf18cde81bcbd9ea654515312e7bc1e0a2899d6 Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Wed, 18 Mar 2026 15:46:11 +0530 Subject: [PATCH 5/8] add support for genric object --- .../converter/GenericMessageConverter.java | 46 +++++++++-- .../GenericMessageConverterTest.java | 80 ++++++++++++++++++- .../core/support/RqueueMessageUtilsTest.java | 10 ++- 3 files changed, 125 insertions(+), 11 deletions(-) diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/GenericMessageConverter.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/GenericMessageConverter.java index 515dd1bb..549af6ec 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/GenericMessageConverter.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/GenericMessageConverter.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import com.github.sonus21.rqueue.utils.SerializationUtils; +import java.lang.reflect.Field; import java.lang.reflect.TypeVariable; import java.util.Collection; import java.util.List; @@ -37,8 +38,8 @@ /** * A converter to turn the payload of a {@link Message} from serialized form to a typed String and - * vice versa. This class does not support generic class except {@link List},even for list the - * entries should be non generic. + * vice versa. Supports {@link List} and single-level generic envelope types (e.g. {@code Event}) + * where type parameters are non-generic and can be resolved from non-null field values. */ @Slf4j public class GenericMessageConverter implements SmartMessageConverter { @@ -137,7 +138,12 @@ private String getClassNameForCollection(String name, Collection payload) { if (payload.isEmpty()) { return null; } - String itemClassName = getClassName(((List) payload).get(0)); + Object firstItem = ((List) payload).get(0); + // Only support non-generic item classes in lists to avoid ambiguous encoding + if (firstItem.getClass().getTypeParameters().length > 0) { + return null; + } + String itemClassName = getClassName(firstItem); if (itemClassName == null) { return null; } @@ -146,12 +152,40 @@ private String getClassNameForCollection(String name, Collection payload) { return null; } - private String getGenericFieldBasedClassName(Class clazz) { + private Class resolveTypeVariable(Class clazz, TypeVariable tv, Object payload) { + // TypeVariable instances are scoped to the class that declares them, so + // field.getGenericType().equals(tv) can only match fields declared on clazz itself. + // Superclass fields reference their own TypeVariable instances, which are distinct objects. + for (Field field : clazz.getDeclaredFields()) { + if (field.getGenericType().equals(tv)) { + field.setAccessible(true); + try { + Object value = field.get(payload); + if (value != null) { + return value.getClass(); + } + } catch (IllegalAccessException e) { + log.debug("Cannot access field {}", field.getName(), e); + } + } + } + return null; + } + + private String getGenericFieldBasedClassName(Class clazz, Object payload) { TypeVariable[] typeVariables = clazz.getTypeParameters(); if (typeVariables.length == 0) { return clazz.getName(); } - return null; + StringBuilder sb = new StringBuilder(clazz.getName()); + for (TypeVariable tv : typeVariables) { + Class resolved = resolveTypeVariable(clazz, tv, payload); + if (resolved == null || resolved.getTypeParameters().length > 0) { + return null; + } + sb.append('#').append(resolved.getName()); + } + return sb.toString(); } private String getClassName(Object payload) { @@ -160,7 +194,7 @@ private String getClassName(Object payload) { if (payload instanceof Collection) { return getClassNameForCollection(name, (Collection) payload); } - return getGenericFieldBasedClassName(payloadClass); + return getGenericFieldBasedClassName(payloadClass, payload); } private JavaType getTargetType(Msg msg) throws ClassNotFoundException { diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/converter/GenericMessageConverterTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/converter/GenericMessageConverterTest.java index 32d991f2..8b67b46f 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/converter/GenericMessageConverterTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/converter/GenericMessageConverterTest.java @@ -116,13 +116,45 @@ void toAndFromMessageList() { } @Test - void genericMessageToReturnNull() { + void genericEnvelopeToAndFromMessage() { GenericTestData data = new GenericTestData<>(10, comment); Message message = genericMessageConverter.toMessage(data, RqueueMessageHeaders.emptyMessageHeaders()); + GenericTestData fromMessage = + (GenericTestData) genericMessageConverter.fromMessage(message, null); + assertEquals(data, fromMessage); + } + + @Test + void envelopeEventToAndFromMessage() { + Event event = new Event<>("evt-1", comment); + Message message = + genericMessageConverter.toMessage(event, RqueueMessageHeaders.emptyMessageHeaders()); + Event fromMessage = + (Event) genericMessageConverter.fromMessage(message, null); + assertEquals(event, fromMessage); + } + + @Test + void envelopeEventWithNullPayloadToReturnNull() { + Event event = new Event<>("evt-1", null); + Message message = + genericMessageConverter.toMessage(event, RqueueMessageHeaders.emptyMessageHeaders()); assertNull(message); } + @Test + void envelopeEventWithInheritedTypeToAndFromMessage() { + // T=Notification extends Alert extends BaseAlert — runtime class is Notification + Notification notification = new Notification("n-1", "hello", 42); + Event event = new Event<>("evt-2", notification); + Message message = + genericMessageConverter.toMessage(event, RqueueMessageHeaders.emptyMessageHeaders()); + Event fromMessage = + (Event) genericMessageConverter.fromMessage(message, null); + assertEquals(event, fromMessage); + } + @Test @Disabled void multipleGenericFieldMessageToAndFrom() { @@ -379,4 +411,50 @@ public static class GenericTestDataWithPredefinedType { private Integer index; private MultiGenericTestData data; } + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class Event { + + private String id; + private T payload; + } + + // Three-level hierarchy: Notification extends Alert extends BaseAlert + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class BaseAlert { + + private String id; + } + + @Data + @EqualsAndHashCode(callSuper = true) + @NoArgsConstructor + @AllArgsConstructor + public static class Alert extends BaseAlert { + + private String message; + + public Alert(String id, String message) { + super(id); + this.message = message; + } + } + + @Data + @EqualsAndHashCode(callSuper = true) + @NoArgsConstructor + @AllArgsConstructor + public static class Notification extends Alert { + + private int priority; + + public Notification(String id, String message, int priority) { + super(id, message); + this.priority = priority; + } + } } diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/support/RqueueMessageUtilsTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/support/RqueueMessageUtilsTest.java index 7071dfed..be618e1e 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/support/RqueueMessageUtilsTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/support/RqueueMessageUtilsTest.java @@ -148,8 +148,8 @@ void buildMessageWithDelay() { @Test void buildMessageNull() { + // id is null so the type parameter T cannot be resolved, making conversion fail GenericClass genericClass = new GenericClass<>(); - genericClass.id = UUID.randomUUID().toString(); try { RqueueMessageUtils.buildMessage( messageConverter, @@ -167,8 +167,8 @@ void buildMessageNull() { @Test void buildPeriodicMessageNull() { + // id is null so the type parameter T cannot be resolved, making conversion fail GenericClass genericClass = new GenericClass<>(); - genericClass.id = UUID.randomUUID().toString(); try { RqueueMessageUtils.buildPeriodicMessage( messageConverter, @@ -186,8 +186,9 @@ void buildPeriodicMessageNull() { @Test void buildMessageReturnInvalidType() { + // id is null so GenericMessageConverter returns null; falls through to NoMessageConverter + // which wraps the object in a GenericMessage with a non-String/non-byte[] payload GenericClass genericClass = new GenericClass<>(); - genericClass.id = UUID.randomUUID().toString(); try { RqueueMessageUtils.buildMessage( messageConverter2, @@ -205,8 +206,9 @@ void buildMessageReturnInvalidType() { @Test void buildPeriodicMessageReturnInvalidType() { + // id is null so GenericMessageConverter returns null; falls through to NoMessageConverter + // which wraps the object in a GenericMessage with a non-String/non-byte[] payload GenericClass genericClass = new GenericClass<>(); - genericClass.id = UUID.randomUUID().toString(); try { RqueueMessageUtils.buildPeriodicMessage( messageConverter2, From 226827b604ff42237249fac45b53e760182c2be5 Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Wed, 18 Mar 2026 15:59:55 +0530 Subject: [PATCH 6/8] add github aciotns --- .circleci/{config.yml => config.yml.disabled} | 0 .github/workflows/java-ci.yaml | 385 ++++++++++++++++++ .github/workflows/pr-format.yaml | 97 +++++ .../GenericMessageConverterTest.java | 14 +- 4 files changed, 483 insertions(+), 13 deletions(-) rename .circleci/{config.yml => config.yml.disabled} (100%) create mode 100644 .github/workflows/java-ci.yaml create mode 100644 .github/workflows/pr-format.yaml diff --git a/.circleci/config.yml b/.circleci/config.yml.disabled similarity index 100% rename from .circleci/config.yml rename to .circleci/config.yml.disabled diff --git a/.github/workflows/java-ci.yaml b/.github/workflows/java-ci.yaml new file mode 100644 index 00000000..cd5ed2c8 --- /dev/null +++ b/.github/workflows/java-ci.yaml @@ -0,0 +1,385 @@ +name: Java CI + +on: + push: + branches: + - master + paths: + - "**/*.java" + - "**/*.gradle" + - "**/*.gradle.kts" + - "gradle.properties" + - "settings.gradle" + - "settings.gradle.kts" + - "gradle/**" + - "gradlew" + - "gradlew.bat" + pull_request: + +permissions: + contents: read + +concurrency: + group: java-ci-${{ github.ref }} + cancel-in-progress: true + +env: + REDIS_RUNNING: "true" + USER_NAME: rqueue + TERM: dumb + JVM_OPTS: -Xmx8g + +jobs: + build: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Java 17 + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "17" + cache: gradle + + - name: Expose Java 17 to Gradle toolchains + run: | + echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV" + echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV" + java -version + javac -version + + - name: Set up Gradle + uses: gradle/actions/setup-gradle@v4 + + - name: Resolve dependencies + run: ./gradlew dependencies + + - name: Compile main sources + run: ./gradlew compileJava + + - name: Compile test sources + run: ./gradlew compileTestJava + + unit_test: + needs: build + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Java 17 + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "17" + cache: gradle + + - name: Expose Java 17 to Gradle toolchains + run: | + echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV" + echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV" + java -version + javac -version + + - name: Set up Gradle + uses: gradle/actions/setup-gradle@v4 + + - name: Run unit tests + run: ./gradlew test -DincludeTags=unit + + - name: Upload JaCoCo exec data + if: always() + uses: actions/upload-artifact@v4 + with: + name: coverage-unit + path: "**/build/reports/jacoco/*.exec" + if-no-files-found: error + + - name: Upload JUnit reports + if: always() + uses: actions/upload-artifact@v4 + with: + name: unit-test-results + path: | + rqueue-spring-boot-starter/build/reports/junit/xml + rqueue-spring/build/reports/junit/xml + rqueue-core/build/reports/junit/xml + if-no-files-found: ignore + + producer_only_test: + needs: build + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Java 17 + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "17" + cache: gradle + + - name: Expose Java 17 to Gradle toolchains + run: | + echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV" + echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV" + java -version + javac -version + + - name: Set up Gradle + uses: gradle/actions/setup-gradle@v4 + + - name: Install Redis + run: | + sudo apt-get update + sudo apt-get install -y redis-server + redis-cli --version + + - name: Run producer-only tests + run: ./gradlew test -DincludeTags=producerOnly + + - name: Upload JaCoCo exec data + if: always() + uses: actions/upload-artifact@v4 + with: + name: coverage-producer + path: "**/build/reports/jacoco/*.exec" + if-no-files-found: error + + - name: Upload JUnit reports + if: always() + uses: actions/upload-artifact@v4 + with: + name: producer-test-results + path: | + rqueue-spring-boot-starter/build/reports/junit/xml + rqueue-spring/build/reports/junit/xml + rqueue-core/build/reports/junit/xml + if-no-files-found: ignore + + integration_test: + needs: build + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Java 17 + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "17" + cache: gradle + + - name: Expose Java 17 to Gradle toolchains + run: | + echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV" + echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV" + java -version + javac -version + + - name: Set up Gradle + uses: gradle/actions/setup-gradle@v4 + + - name: Install Redis + run: | + sudo apt-get update + sudo apt-get install -y redis-server + redis-cli --version + + - name: Run integration tests + run: ./gradlew test -DincludeTags=integration -DexcludeTags=redisCluster,producerOnly,local + + - name: Upload JaCoCo exec data + if: always() + uses: actions/upload-artifact@v4 + with: + name: coverage-integration + path: "**/build/reports/jacoco/*.exec" + if-no-files-found: error + + - name: Upload JUnit reports + if: always() + uses: actions/upload-artifact@v4 + with: + name: integration-test-results + path: | + rqueue-spring-boot-starter/build/reports/junit/xml + rqueue-spring/build/reports/junit/xml + rqueue-core/build/reports/junit/xml + if-no-files-found: ignore + + redis_cluster_test: + needs: build + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Java 17 + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "17" + cache: gradle + + - name: Expose Java 17 to Gradle toolchains + run: | + echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV" + echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV" + java -version + javac -version + + - name: Set up Gradle + uses: gradle/actions/setup-gradle@v4 + + - name: Install Redis + run: | + sudo apt-get update + sudo apt-get install -y redis-server + redis-cli --version + + - name: Setup Redis Cluster + run: | + mkdir 9000 9001 9002 9003 9004 9005 + printf "port 9000 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9000/redis.conf + printf "port 9001 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9001/redis.conf + printf "port 9002 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9002/redis.conf + printf "port 9003 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9003/redis.conf + printf "port 9004 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9004/redis.conf + printf "port 9005 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9005/redis.conf + (cd 9000 && redis-server ./redis.conf) & + (cd 9001 && redis-server ./redis.conf) & + (cd 9002 && redis-server ./redis.conf) & + (cd 9003 && redis-server ./redis.conf) & + (cd 9004 && redis-server ./redis.conf) & + (cd 9005 && redis-server ./redis.conf) & + sleep 30 + yes yes | redis-cli --cluster create 127.0.0.1:9000 127.0.0.1:9001 127.0.0.1:9002 127.0.0.1:9003 127.0.0.1:9004 127.0.0.1:9005 --cluster-replicas 1 + + - name: Run Redis cluster tests + run: ./gradlew test -DincludeTags=redisCluster + + - name: Upload JaCoCo exec data + if: always() + uses: actions/upload-artifact@v4 + with: + name: coverage-redis-cluster + path: "**/build/reports/jacoco/*.exec" + if-no-files-found: error + + - name: Upload JUnit reports + if: always() + uses: actions/upload-artifact@v4 + with: + name: redis-cluster-test-results + path: | + rqueue-spring-boot-starter/build/reports/junit/xml + rqueue-spring/build/reports/junit/xml + rqueue-core/build/reports/junit/xml + if-no-files-found: ignore + + reactive_integration_test: + needs: build + runs-on: ubuntu-latest + env: + RQUEUE_REACTIVE_ENABLED: "true" + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Java 17 + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "17" + cache: gradle + + - name: Expose Java 17 to Gradle toolchains + run: | + echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV" + echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV" + java -version + javac -version + + - name: Set up Gradle + uses: gradle/actions/setup-gradle@v4 + + - name: Install Redis + run: | + sudo apt-get update + sudo apt-get install -y redis-server + redis-cli --version + + - name: Run reactive integration tests + run: ./gradlew test -DincludeTags=integration -DexcludeTags=redisCluster,producerOnly,local + + - name: Upload JaCoCo exec data + if: always() + uses: actions/upload-artifact@v4 + with: + name: coverage-reactive + path: "**/build/reports/jacoco/*.exec" + if-no-files-found: error + + - name: Upload JUnit reports + if: always() + uses: actions/upload-artifact@v4 + with: + name: reactive-integration-test-results + path: | + rqueue-spring-boot-starter/build/reports/junit/xml + rqueue-spring/build/reports/junit/xml + rqueue-core/build/reports/junit/xml + if-no-files-found: ignore + + coverage_report: + needs: + - unit_test + - producer_only_test + - integration_test + - redis_cluster_test + - reactive_integration_test + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Java 17 + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "17" + cache: gradle + + - name: Expose Java 17 to Gradle toolchains + run: | + echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV" + echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV" + java -version + javac -version + + - name: Set up Gradle + uses: gradle/actions/setup-gradle@v4 + + - name: Download coverage artifacts + uses: actions/download-artifact@v4 + with: + pattern: coverage-* + path: coverage-artifacts + merge-multiple: false + + - name: Generate merged coverage report + run: ./gradlew coverageReportOnly + + - name: Upload coverage report + uses: actions/upload-artifact@v4 + with: + name: coverage-report + path: | + build/reports/jacoco/test/jacocoTestReport.xml + build/reports/jacoco/coverageReportOnly/html + if-no-files-found: ignore diff --git a/.github/workflows/pr-format.yaml b/.github/workflows/pr-format.yaml new file mode 100644 index 00000000..6e90d7ff --- /dev/null +++ b/.github/workflows/pr-format.yaml @@ -0,0 +1,97 @@ +name: Format PR Code + +on: + pull_request: + types: + - opened + - synchronize + - reopened + - ready_for_review + push: + branches: + - master + paths: + - "**/*.java" + +jobs: + format: + if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name == github.repository + runs-on: ubuntu-latest + permissions: + contents: write + + steps: + - name: Checkout branch + uses: actions/checkout@v4 + with: + ref: ${{ github.event_name == 'pull_request' && github.event.pull_request.head.ref || github.ref_name }} + fetch-depth: 0 + + - name: Set up Java 21 + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "21" + cache: gradle + + - name: Expose Java 21 to Gradle toolchains + run: | + echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV" + echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV" + java -version + javac -version + + - name: Set up Gradle + uses: gradle/actions/setup-gradle@v4 + + - name: Format Java sources + run: ./gradlew formatJava + + - name: Commit formatted changes + run: | + if git diff --quiet; then + echo "No formatting changes detected" + exit 0 + fi + git config user.name "github-actions[bot]" + git config user.email "41898282+github-actions[bot]@users.noreply.github.com" + git add -A + git commit -m "Apply Palantir Java Format" + git push + + check-format: + if: github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name != github.repository + runs-on: ubuntu-latest + permissions: + contents: read + + steps: + - name: Checkout branch + uses: actions/checkout@v4 + + - name: Set up Java 21 + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "21" + cache: gradle + + - name: Expose Java 21 to Gradle toolchains + run: | + echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV" + echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV" + java -version + javac -version + + - name: Set up Gradle + uses: gradle/actions/setup-gradle@v4 + + - name: Check Java formatting + run: ./gradlew checkFormatJava + + - name: Explain how to fix formatting + if: failure() + run: | + echo "Java formatting check failed." + echo "This pull request comes from a fork, so CI cannot push formatting commits back to the branch." + echo "Run './gradlew formatJava' locally and push the formatted changes to update the PR." diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/converter/GenericMessageConverterTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/converter/GenericMessageConverterTest.java index 8b67b46f..2e264276 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/converter/GenericMessageConverterTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/converter/GenericMessageConverterTest.java @@ -146,7 +146,7 @@ void envelopeEventWithNullPayloadToReturnNull() { @Test void envelopeEventWithInheritedTypeToAndFromMessage() { // T=Notification extends Alert extends BaseAlert — runtime class is Notification - Notification notification = new Notification("n-1", "hello", 42); + Notification notification = new Notification(42); Event event = new Event<>("evt-2", notification); Message message = genericMessageConverter.toMessage(event, RqueueMessageHeaders.emptyMessageHeaders()); @@ -431,30 +431,18 @@ public static class BaseAlert { } @Data - @EqualsAndHashCode(callSuper = true) @NoArgsConstructor @AllArgsConstructor public static class Alert extends BaseAlert { private String message; - - public Alert(String id, String message) { - super(id); - this.message = message; - } } @Data - @EqualsAndHashCode(callSuper = true) @NoArgsConstructor @AllArgsConstructor public static class Notification extends Alert { private int priority; - - public Notification(String id, String message, int priority) { - super(id, message); - this.priority = priority; - } } } From 14595e1f0fe1fa95514edca2f4956b1eb6aac13f Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Wed, 18 Mar 2026 16:02:31 +0530 Subject: [PATCH 7/8] remvoe formatter --- .github/workflows/pr-format.yaml | 97 -------------------------------- 1 file changed, 97 deletions(-) delete mode 100644 .github/workflows/pr-format.yaml diff --git a/.github/workflows/pr-format.yaml b/.github/workflows/pr-format.yaml deleted file mode 100644 index 6e90d7ff..00000000 --- a/.github/workflows/pr-format.yaml +++ /dev/null @@ -1,97 +0,0 @@ -name: Format PR Code - -on: - pull_request: - types: - - opened - - synchronize - - reopened - - ready_for_review - push: - branches: - - master - paths: - - "**/*.java" - -jobs: - format: - if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name == github.repository - runs-on: ubuntu-latest - permissions: - contents: write - - steps: - - name: Checkout branch - uses: actions/checkout@v4 - with: - ref: ${{ github.event_name == 'pull_request' && github.event.pull_request.head.ref || github.ref_name }} - fetch-depth: 0 - - - name: Set up Java 21 - uses: actions/setup-java@v4 - with: - distribution: temurin - java-version: "21" - cache: gradle - - - name: Expose Java 21 to Gradle toolchains - run: | - echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV" - echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV" - java -version - javac -version - - - name: Set up Gradle - uses: gradle/actions/setup-gradle@v4 - - - name: Format Java sources - run: ./gradlew formatJava - - - name: Commit formatted changes - run: | - if git diff --quiet; then - echo "No formatting changes detected" - exit 0 - fi - git config user.name "github-actions[bot]" - git config user.email "41898282+github-actions[bot]@users.noreply.github.com" - git add -A - git commit -m "Apply Palantir Java Format" - git push - - check-format: - if: github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name != github.repository - runs-on: ubuntu-latest - permissions: - contents: read - - steps: - - name: Checkout branch - uses: actions/checkout@v4 - - - name: Set up Java 21 - uses: actions/setup-java@v4 - with: - distribution: temurin - java-version: "21" - cache: gradle - - - name: Expose Java 21 to Gradle toolchains - run: | - echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV" - echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV" - java -version - javac -version - - - name: Set up Gradle - uses: gradle/actions/setup-gradle@v4 - - - name: Check Java formatting - run: ./gradlew checkFormatJava - - - name: Explain how to fix formatting - if: failure() - run: | - echo "Java formatting check failed." - echo "This pull request comes from a fork, so CI cannot push formatting commits back to the branch." - echo "Run './gradlew formatJava' locally and push the formatted changes to update the PR." From fa8241f40dc5daef06cb9ef8412395983fb7881d Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Wed, 18 Mar 2026 16:16:08 +0530 Subject: [PATCH 8/8] update change log --- docs/CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index f1b768b8..56b6ac62 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -8,9 +8,10 @@ layout: default All notable user-facing changes to this project are documented in this file. -## Release [3.4.1] 15-March-2026 +## Release [3.4.1] 18-March-2026 ### Features * Backported `HARD_STRICT` priority mode from #279 for stricter priority queue polling on the 3.x line +* Added typed/generic message parameter support — listeners can now declare strongly-typed message parameters and rely on `GenericMessageConverter` for automatic deserialization ### Fixes * Optimized hard-strict polling queue availability checks to avoid full list and sorted-set reads