diff --git a/.circleci/config.yml b/.circleci/config.yml.disabled similarity index 100% rename from .circleci/config.yml rename to .circleci/config.yml.disabled diff --git a/.github/workflows/java-ci.yaml b/.github/workflows/java-ci.yaml new file mode 100644 index 00000000..cd5ed2c8 --- /dev/null +++ b/.github/workflows/java-ci.yaml @@ -0,0 +1,385 @@ +name: Java CI + +on: + push: + branches: + - master + paths: + - "**/*.java" + - "**/*.gradle" + - "**/*.gradle.kts" + - "gradle.properties" + - "settings.gradle" + - "settings.gradle.kts" + - "gradle/**" + - "gradlew" + - "gradlew.bat" + pull_request: + +permissions: + contents: read + +concurrency: + group: java-ci-${{ github.ref }} + cancel-in-progress: true + +env: + REDIS_RUNNING: "true" + USER_NAME: rqueue + TERM: dumb + JVM_OPTS: -Xmx8g + +jobs: + build: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Java 17 + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "17" + cache: gradle + + - name: Expose Java 17 to Gradle toolchains + run: | + echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV" + echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV" + java -version + javac -version + + - name: Set up Gradle + uses: gradle/actions/setup-gradle@v4 + + - name: Resolve dependencies + run: ./gradlew dependencies + + - name: Compile main sources + run: ./gradlew compileJava + + - name: Compile test sources + run: ./gradlew compileTestJava + + unit_test: + needs: build + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Java 17 + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "17" + cache: gradle + + - name: Expose Java 17 to Gradle toolchains + run: | + echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV" + echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV" + java -version + javac -version + + - name: Set up Gradle + uses: gradle/actions/setup-gradle@v4 + + - name: Run unit tests + run: ./gradlew test -DincludeTags=unit + + - name: Upload JaCoCo exec data + if: always() + uses: actions/upload-artifact@v4 + with: + name: coverage-unit + path: "**/build/reports/jacoco/*.exec" + if-no-files-found: error + + - name: Upload JUnit reports + if: always() + uses: actions/upload-artifact@v4 + with: + name: unit-test-results + path: | + rqueue-spring-boot-starter/build/reports/junit/xml + rqueue-spring/build/reports/junit/xml + rqueue-core/build/reports/junit/xml + if-no-files-found: ignore + + producer_only_test: + needs: build + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Java 17 + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "17" + cache: gradle + + - name: Expose Java 17 to Gradle toolchains + run: | + echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV" + echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV" + java -version + javac -version + + - name: Set up Gradle + uses: gradle/actions/setup-gradle@v4 + + - name: Install Redis + run: | + sudo apt-get update + sudo apt-get install -y redis-server + redis-cli --version + + - name: Run producer-only tests + run: ./gradlew test -DincludeTags=producerOnly + + - name: Upload JaCoCo exec data + if: always() + uses: actions/upload-artifact@v4 + with: + name: coverage-producer + path: "**/build/reports/jacoco/*.exec" + if-no-files-found: error + + - name: Upload JUnit reports + if: always() + uses: actions/upload-artifact@v4 + with: + name: producer-test-results + path: | + rqueue-spring-boot-starter/build/reports/junit/xml + rqueue-spring/build/reports/junit/xml + rqueue-core/build/reports/junit/xml + if-no-files-found: ignore + + integration_test: + needs: build + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Java 17 + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "17" + cache: gradle + + - name: Expose Java 17 to Gradle toolchains + run: | + echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV" + echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV" + java -version + javac -version + + - name: Set up Gradle + uses: gradle/actions/setup-gradle@v4 + + - name: Install Redis + run: | + sudo apt-get update + sudo apt-get install -y redis-server + redis-cli --version + + - name: Run integration tests + run: ./gradlew test -DincludeTags=integration -DexcludeTags=redisCluster,producerOnly,local + + - name: Upload JaCoCo exec data + if: always() + uses: actions/upload-artifact@v4 + with: + name: coverage-integration + path: "**/build/reports/jacoco/*.exec" + if-no-files-found: error + + - name: Upload JUnit reports + if: always() + uses: actions/upload-artifact@v4 + with: + name: integration-test-results + path: | + rqueue-spring-boot-starter/build/reports/junit/xml + rqueue-spring/build/reports/junit/xml + rqueue-core/build/reports/junit/xml + if-no-files-found: ignore + + redis_cluster_test: + needs: build + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Java 17 + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "17" + cache: gradle + + - name: Expose Java 17 to Gradle toolchains + run: | + echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV" + echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV" + java -version + javac -version + + - name: Set up Gradle + uses: gradle/actions/setup-gradle@v4 + + - name: Install Redis + run: | + sudo apt-get update + sudo apt-get install -y redis-server + redis-cli --version + + - name: Setup Redis Cluster + run: | + mkdir 9000 9001 9002 9003 9004 9005 + printf "port 9000 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9000/redis.conf + printf "port 9001 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9001/redis.conf + printf "port 9002 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9002/redis.conf + printf "port 9003 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9003/redis.conf + printf "port 9004 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9004/redis.conf + printf "port 9005 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9005/redis.conf + (cd 9000 && redis-server ./redis.conf) & + (cd 9001 && redis-server ./redis.conf) & + (cd 9002 && redis-server ./redis.conf) & + (cd 9003 && redis-server ./redis.conf) & + (cd 9004 && redis-server ./redis.conf) & + (cd 9005 && redis-server ./redis.conf) & + sleep 30 + yes yes | redis-cli --cluster create 127.0.0.1:9000 127.0.0.1:9001 127.0.0.1:9002 127.0.0.1:9003 127.0.0.1:9004 127.0.0.1:9005 --cluster-replicas 1 + + - name: Run Redis cluster tests + run: ./gradlew test -DincludeTags=redisCluster + + - name: Upload JaCoCo exec data + if: always() + uses: actions/upload-artifact@v4 + with: + name: coverage-redis-cluster + path: "**/build/reports/jacoco/*.exec" + if-no-files-found: error + + - name: Upload JUnit reports + if: always() + uses: actions/upload-artifact@v4 + with: + name: redis-cluster-test-results + path: | + rqueue-spring-boot-starter/build/reports/junit/xml + rqueue-spring/build/reports/junit/xml + rqueue-core/build/reports/junit/xml + if-no-files-found: ignore + + reactive_integration_test: + needs: build + runs-on: ubuntu-latest + env: + RQUEUE_REACTIVE_ENABLED: "true" + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Java 17 + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "17" + cache: gradle + + - name: Expose Java 17 to Gradle toolchains + run: | + echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV" + echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV" + java -version + javac -version + + - name: Set up Gradle + uses: gradle/actions/setup-gradle@v4 + + - name: Install Redis + run: | + sudo apt-get update + sudo apt-get install -y redis-server + redis-cli --version + + - name: Run reactive integration tests + run: ./gradlew test -DincludeTags=integration -DexcludeTags=redisCluster,producerOnly,local + + - name: Upload JaCoCo exec data + if: always() + uses: actions/upload-artifact@v4 + with: + name: coverage-reactive + path: "**/build/reports/jacoco/*.exec" + if-no-files-found: error + + - name: Upload JUnit reports + if: always() + uses: actions/upload-artifact@v4 + with: + name: reactive-integration-test-results + path: | + rqueue-spring-boot-starter/build/reports/junit/xml + rqueue-spring/build/reports/junit/xml + rqueue-core/build/reports/junit/xml + if-no-files-found: ignore + + coverage_report: + needs: + - unit_test + - producer_only_test + - integration_test + - redis_cluster_test + - reactive_integration_test + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Java 17 + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "17" + cache: gradle + + - name: Expose Java 17 to Gradle toolchains + run: | + echo "JAVA_HOME=$JAVA_HOME" >> "$GITHUB_ENV" + echo "ORG_GRADLE_JAVA_INSTALLATIONS_PATHS=$JAVA_HOME" >> "$GITHUB_ENV" + java -version + javac -version + + - name: Set up Gradle + uses: gradle/actions/setup-gradle@v4 + + - name: Download coverage artifacts + uses: actions/download-artifact@v4 + with: + pattern: coverage-* + path: coverage-artifacts + merge-multiple: false + + - name: Generate merged coverage report + run: ./gradlew coverageReportOnly + + - name: Upload coverage report + uses: actions/upload-artifact@v4 + with: + name: coverage-report + path: | + build/reports/jacoco/test/jacocoTestReport.xml + build/reports/jacoco/coverageReportOnly/html + if-no-files-found: ignore diff --git a/build.gradle b/build.gradle index 74f54259..432ecfee 100644 --- a/build.gradle +++ b/build.gradle @@ -69,7 +69,7 @@ ext { subprojects { group = "com.github.sonus21" - version = "3.4.0-RELEASE" + version = "3.4.1-RELEASE" dependencies { // https://mvnrepository.com/artifact/org.springframework/spring-messaging diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 80a1c68e..56b6ac62 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -8,6 +8,14 @@ layout: default All notable user-facing changes to this project are documented in this file. +## Release [3.4.1] 18-March-2026 +### Features +* Backported `HARD_STRICT` priority mode from #279 for stricter priority queue polling on the 3.x line +* Added typed/generic message parameter support — listeners can now declare strongly-typed message parameters and rely on `GenericMessageConverter` for automatic deserialization + +### Fixes +* Optimized hard-strict polling queue availability checks to avoid full list and sorted-set reads + ## Release [3.4.0] 22-July-2025 ### Fixes * Fixed unique enqueue message to reject the message upfront instead of identifying it later #259 diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java index 5da2e641..ac7a4e90 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java @@ -27,6 +27,7 @@ import com.github.sonus21.rqueue.core.impl.RqueueMessageTemplateImpl; import com.github.sonus21.rqueue.core.middleware.Middleware; import com.github.sonus21.rqueue.core.support.MessageProcessor; +import com.github.sonus21.rqueue.listener.HardStrictPriorityPollerProperties; import com.github.sonus21.rqueue.listener.RqueueMessageHandler; import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer; import com.github.sonus21.rqueue.models.enums.PriorityMode; @@ -92,6 +93,8 @@ public class SimpleRqueueListenerContainerFactory { // Set priority mode for the pollers private PriorityMode priorityMode = PriorityMode.WEIGHTED; + // Set HardStrictPriorityPollerProperties for HARD_STRICT priority mode poller + private HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties; /** * Whether all beans of spring application should be inspected to find methods annotated with @@ -348,6 +351,9 @@ public RqueueMessageListenerContainer createMessageListenerContainer() { if (messageHeaders != null) { messageListenerContainer.setMessageHeaders(messageHeaders); } + if (hardStrictPriorityPollerProperties != null) { + messageListenerContainer.setHardStrictPriorityPollerProperties(hardStrictPriorityPollerProperties); + } return messageListenerContainer; } @@ -486,6 +492,14 @@ public void setMessageHeaders(MessageHeaders messageHeaders) { this.messageHeaders = messageHeaders; } + public HardStrictPriorityPollerProperties getHardStrictPriorityPollerProperties() { + return this.hardStrictPriorityPollerProperties; + } + + public void setHardStrictPriorityPollerProperties(HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties) { + this.hardStrictPriorityPollerProperties = hardStrictPriorityPollerProperties; + } + /** * Rqueue scans all beans to find method annotated with {@link RqueueListener}. * diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/GenericMessageConverter.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/GenericMessageConverter.java index 515dd1bb..549af6ec 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/GenericMessageConverter.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/GenericMessageConverter.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import com.github.sonus21.rqueue.utils.SerializationUtils; +import java.lang.reflect.Field; import java.lang.reflect.TypeVariable; import java.util.Collection; import java.util.List; @@ -37,8 +38,8 @@ /** * A converter to turn the payload of a {@link Message} from serialized form to a typed String and - * vice versa. This class does not support generic class except {@link List},even for list the - * entries should be non generic. + * vice versa. Supports {@link List} and single-level generic envelope types (e.g. {@code Event}) + * where type parameters are non-generic and can be resolved from non-null field values. */ @Slf4j public class GenericMessageConverter implements SmartMessageConverter { @@ -137,7 +138,12 @@ private String getClassNameForCollection(String name, Collection payload) { if (payload.isEmpty()) { return null; } - String itemClassName = getClassName(((List) payload).get(0)); + Object firstItem = ((List) payload).get(0); + // Only support non-generic item classes in lists to avoid ambiguous encoding + if (firstItem.getClass().getTypeParameters().length > 0) { + return null; + } + String itemClassName = getClassName(firstItem); if (itemClassName == null) { return null; } @@ -146,12 +152,40 @@ private String getClassNameForCollection(String name, Collection payload) { return null; } - private String getGenericFieldBasedClassName(Class clazz) { + private Class resolveTypeVariable(Class clazz, TypeVariable tv, Object payload) { + // TypeVariable instances are scoped to the class that declares them, so + // field.getGenericType().equals(tv) can only match fields declared on clazz itself. + // Superclass fields reference their own TypeVariable instances, which are distinct objects. + for (Field field : clazz.getDeclaredFields()) { + if (field.getGenericType().equals(tv)) { + field.setAccessible(true); + try { + Object value = field.get(payload); + if (value != null) { + return value.getClass(); + } + } catch (IllegalAccessException e) { + log.debug("Cannot access field {}", field.getName(), e); + } + } + } + return null; + } + + private String getGenericFieldBasedClassName(Class clazz, Object payload) { TypeVariable[] typeVariables = clazz.getTypeParameters(); if (typeVariables.length == 0) { return clazz.getName(); } - return null; + StringBuilder sb = new StringBuilder(clazz.getName()); + for (TypeVariable tv : typeVariables) { + Class resolved = resolveTypeVariable(clazz, tv, payload); + if (resolved == null || resolved.getTypeParameters().length > 0) { + return null; + } + sb.append('#').append(resolved.getName()); + } + return sb.toString(); } private String getClassName(Object payload) { @@ -160,7 +194,7 @@ private String getClassName(Object payload) { if (payload instanceof Collection) { return getClassNameForCollection(name, (Collection) payload); } - return getGenericFieldBasedClassName(payloadClass); + return getGenericFieldBasedClassName(payloadClass, payload); } private JavaType getTargetType(Msg msg) throws ClassNotFoundException { diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java index 1885eafd..caf2e25f 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java @@ -18,6 +18,7 @@ import com.github.sonus21.rqueue.models.MessageMoveResult; import java.util.List; +import java.util.Optional; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ZSetOperations.TypedTuple; import reactor.core.publisher.Flux; @@ -92,4 +93,10 @@ Long scheduleMessage( Flux addReactiveMessageWithDelay( String scheduledQueueName, String scheduledQueueChannelName, RqueueMessage rqueueMessage); + + Optional findFirstElementFromList(String name); + + Optional findFirstElementFromZset(String name); + + Optional> findFirstElementFromZsetWithScore(String name); } diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageTemplateImpl.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageTemplateImpl.java index fa6b288d..f121e775 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageTemplateImpl.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageTemplateImpl.java @@ -30,6 +30,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Set; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; @@ -343,4 +344,19 @@ public RedisTemplate getTemplate() { public Long removeElementFromZset(String zsetName, RqueueMessage rqueueMessage) { return super.removeFromZset(zsetName, rqueueMessage); } + + @Override + public Optional findFirstElementFromList(String name) { + return readFromList(name, 0, 0).stream().findFirst(); + } + + @Override + public Optional findFirstElementFromZset(String name) { + return readFromZset(name, 0, 0).stream().findFirst(); + } + + @Override + public Optional> findFirstElementFromZsetWithScore(String name) { + return readFromZsetWithScore(name, 0, 0).stream().findFirst(); + } } diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPoller.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPoller.java new file mode 100644 index 00000000..04443827 --- /dev/null +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPoller.java @@ -0,0 +1,220 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ + +package com.github.sonus21.rqueue.listener; + +import com.github.sonus21.rqueue.core.RqueueBeanProvider; +import com.github.sonus21.rqueue.core.middleware.Middleware; +import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer.QueueStateMgr; +import com.github.sonus21.rqueue.utils.Constants; +import com.github.sonus21.rqueue.utils.QueueThreadPool; +import com.github.sonus21.rqueue.utils.TimeoutUtils; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.slf4j.event.Level; +import org.springframework.messaging.MessageHeaders; + +/** + * Use it only with priority queues. + * Message processing can be slow. + * The hard strict priority algorithm is better in HardStrictPriorityPoller than in StrictPriorityPoller + * More details see in GitHub project issue + */ +class HardStrictPriorityPoller extends RqueueMessagePoller { + + private final RqueueBeanProvider rqueueBeanProvider; + + private final Map queueNameToDetail; + private final Map queueNameToThread; + private final Map queueDeactivationTime = new HashMap<>(); + private final HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties; + + HardStrictPriorityPoller( + String groupName, + final List queueDetails, + final Map queueNameToThread, + RqueueBeanProvider rqueueBeanProvider, + QueueStateMgr queueStateMgr, + List middlewares, + long pollingInterval, + long backoffTime, + PostProcessingHandler postProcessingHandler, + MessageHeaders messageHeaders, + HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties) { + super( + "HardStrict-" + groupName, + rqueueBeanProvider, + queueStateMgr, + middlewares, + pollingInterval, + backoffTime, + postProcessingHandler, + messageHeaders); + + this.rqueueBeanProvider = rqueueBeanProvider; + // Sort queues by priority once during initialization + List queueDetailList = new ArrayList<>(queueDetails); + queueDetailList.sort( + (o1, o2) -> + o2.getPriority().get(Constants.DEFAULT_PRIORITY_KEY) + - o1.getPriority().get(Constants.DEFAULT_PRIORITY_KEY)); + + this.queues = queueDetailList.stream().map(QueueDetail::getName).collect(Collectors.toList()); + this.queueNameToDetail = + queueDetailList.stream() + .collect(Collectors.toMap(QueueDetail::getName, Function.identity())); + this.queueNameToThread = queueNameToThread; + this.hardStrictPriorityPollerProperties = hardStrictPriorityPollerProperties != null + ? hardStrictPriorityPollerProperties + : new HardStrictPriorityPollerProperties(); + } + + @Override + public void start() { + log(Level.DEBUG, "Running, Ordered Queues: {}", null, queues); + while (true) { + if (shouldExit()) { + return; + } + + boolean messageFoundInAnyQueue = false; + + try { + for (String queue : queues) { + if (eligibleForPolling(queue) && !isDeactivated(queue)) { + QueueThreadPool queueThreadPool = queueNameToThread.get(queue); + QueueDetail queueDetail = queueNameToDetail.get(queue); + poll(-1, queue, queueDetail, queueThreadPool); + + if (hardStrictPriorityPollerProperties.getAfterPollSleepInterval() != null) { + TimeoutUtils.sleepLog(hardStrictPriorityPollerProperties.getAfterPollSleepInterval(), false); + } + + if (existMessagesInCurrentQueueOrHigherPriorityQueue(queue, queues)) { + // break current cycle and start new cycle + // it allow to process queue with the higher priority + messageFoundInAnyQueue = true; + break; + } + } + } + + // If no messages were found across all queues, sleep for the polling interval + if (!messageFoundInAnyQueue) { + TimeoutUtils.sleepLog(pollingInterval, false); + } + + } catch (Throwable e) { + log(Level.ERROR, "Exception in the poller {}", e, e.getMessage()); + if (shouldExit()) { + return; + } + TimeoutUtils.sleepLog(backoffTime, false); + } + } + } + + boolean existMessagesInCurrentQueueOrHigherPriorityQueue(String currentQueue, List queues) { + for (String queue : queues) { + if (eligibleForPolling(queue) && !isDeactivated(queue)) { + QueueDetail queueDetail = queueNameToDetail.get(queue); + if (existAvailableMessagesForPoll(queueDetail)) { + // the current or higher priority queue contains messages that need to be processed. + return true; + } + } + // we check all queues from the highest priority to current queue + if (queue.equals(currentQueue)) { + return false; + } + } + // unexpected behavior, need more details if it occurs + log(Level.WARN, "current queue '{}' not found in queues list '{}'", null, currentQueue, queues); + return false; + } + + protected boolean existAvailableMessagesForPoll(QueueDetail queueDetail) { + boolean readyMessagesExists = + rqueueBeanProvider + .getRqueueMessageTemplate() + .findFirstElementFromList(queueDetail.getQueueName()) + .isPresent(); + if (readyMessagesExists) { + log( + Level.TRACE, + "readyMessages exists for queue '{}', existAvailableMessagesForPoll = true.", + null, + queueDetail.getName()); + return true; + } + + // Only check delayed messages with score <= current time + long currentTime = System.currentTimeMillis(); + boolean delayedMessagesExists = + rqueueBeanProvider + .getRqueueMessageTemplate() + .findFirstElementFromZsetWithScore(queueDetail.getScheduledQueueName()) + .filter(element -> element.getScore() <= currentTime) + .isPresent(); + + if (delayedMessagesExists) { + log( + Level.TRACE, + "delayedMessages exists for scheduled queue '{}' and currentTime '{}'," + + " existAvailableMessagesForPoll = true.", + null, + queueDetail.getScheduledQueueName(), + currentTime); + return true; + } + + return false; + } + + private boolean isDeactivated(String queue) { + Long deactivationTime = queueDeactivationTime.get(queue); + if (deactivationTime == null) { + return false; + } + if (System.currentTimeMillis() - deactivationTime > pollingInterval) { + queueDeactivationTime.remove(queue); + return false; + } + return true; + } + + @Override + long getSemaphoreWaitTime() { + return hardStrictPriorityPollerProperties.getSemaphoreWaitTime() != null + ? hardStrictPriorityPollerProperties.getSemaphoreWaitTime() + : 20L; + } + + @Override + void deactivate(int index, String queue, DeactivateType deactivateType) { + if (deactivateType == DeactivateType.POLL_FAILED) { + // Pause in case of connection errors or polling failures + TimeoutUtils.sleepLog(backoffTime, false); + } else { + // Mark deactivation time if the queue is empty + queueDeactivationTime.put(queue, System.currentTimeMillis()); + } + } +} diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerProperties.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerProperties.java new file mode 100644 index 00000000..618aadac --- /dev/null +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerProperties.java @@ -0,0 +1,33 @@ +package com.github.sonus21.rqueue.listener; + +public class HardStrictPriorityPollerProperties { + // set such default values for parameters "afterPollSleepInterval" and "semaphoreWaitTime" + // because local load tests have correct strict priority algorithm work and good performance with them + private Long afterPollSleepInterval = 30L; + private Long semaphoreWaitTime = 15L; + + public Long getAfterPollSleepInterval() { + return afterPollSleepInterval; + } + + public void setAfterPollSleepInterval(Long afterPollSleepInterval) { + validateTimeInterval(afterPollSleepInterval); + this.afterPollSleepInterval = afterPollSleepInterval; + } + + public Long getSemaphoreWaitTime() { + return this.semaphoreWaitTime; + } + + public void setSemaphoreWaitTime(Long semaphoreWaitTime) { + validateTimeInterval(semaphoreWaitTime); + this.semaphoreWaitTime = semaphoreWaitTime; + } + + private void validateTimeInterval(Long value) { + if (value == null || value > 0) { + return; + } + throw new IllegalArgumentException("Value must be positive: " + value); + } +} diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java index 440a836a..36b8dc73 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java @@ -106,6 +106,7 @@ public class RqueueMessageListenerContainer private int phase = Integer.MAX_VALUE; private PriorityMode priorityMode; private MessageHeaders messageHeaders; + private HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties; public RqueueMessageListenerContainer( RqueueMessageHandler rqueueMessageHandler, RqueueMessageTemplate rqueueMessageTemplate) { @@ -515,6 +516,21 @@ protected void startGroup(String groupName, List queueDetails) { backOffTime, postProcessingHandler, getMessageHeaders())); + } else if (getPriorityMode() == PriorityMode.HARD_STRICT) { + future = + taskExecutor.submit( + new HardStrictPriorityPoller( + StringUtils.groupName(groupName), + queueDetails, + queueThread, + rqueueBeanProvider, + queueStateMgr, + getMiddleWares(), + pollingInterval, + backOffTime, + postProcessingHandler, + getMessageHeaders(), + getHardStrictPriorityPollerProperties())); } else { future = taskExecutor.submit( @@ -712,6 +728,14 @@ public void setMessageHeaders(MessageHeaders messageHeaders) { this.messageHeaders = messageHeaders; } + public HardStrictPriorityPollerProperties getHardStrictPriorityPollerProperties() { + return this.hardStrictPriorityPollerProperties; + } + + public void setHardStrictPriorityPollerProperties(HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties) { + this.hardStrictPriorityPollerProperties = hardStrictPriorityPollerProperties; + } + class QueueStateMgr { Set pausedQueues = ConcurrentHashMap.newKeySet(); diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/enums/PriorityMode.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/enums/PriorityMode.java index 0b6e556a..0fa83405 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/enums/PriorityMode.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/enums/PriorityMode.java @@ -18,5 +18,6 @@ public enum PriorityMode { STRICT, - WEIGHTED + WEIGHTED, + HARD_STRICT } diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/converter/GenericMessageConverterTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/converter/GenericMessageConverterTest.java index 32d991f2..2e264276 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/converter/GenericMessageConverterTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/converter/GenericMessageConverterTest.java @@ -116,13 +116,45 @@ void toAndFromMessageList() { } @Test - void genericMessageToReturnNull() { + void genericEnvelopeToAndFromMessage() { GenericTestData data = new GenericTestData<>(10, comment); Message message = genericMessageConverter.toMessage(data, RqueueMessageHeaders.emptyMessageHeaders()); + GenericTestData fromMessage = + (GenericTestData) genericMessageConverter.fromMessage(message, null); + assertEquals(data, fromMessage); + } + + @Test + void envelopeEventToAndFromMessage() { + Event event = new Event<>("evt-1", comment); + Message message = + genericMessageConverter.toMessage(event, RqueueMessageHeaders.emptyMessageHeaders()); + Event fromMessage = + (Event) genericMessageConverter.fromMessage(message, null); + assertEquals(event, fromMessage); + } + + @Test + void envelopeEventWithNullPayloadToReturnNull() { + Event event = new Event<>("evt-1", null); + Message message = + genericMessageConverter.toMessage(event, RqueueMessageHeaders.emptyMessageHeaders()); assertNull(message); } + @Test + void envelopeEventWithInheritedTypeToAndFromMessage() { + // T=Notification extends Alert extends BaseAlert — runtime class is Notification + Notification notification = new Notification(42); + Event event = new Event<>("evt-2", notification); + Message message = + genericMessageConverter.toMessage(event, RqueueMessageHeaders.emptyMessageHeaders()); + Event fromMessage = + (Event) genericMessageConverter.fromMessage(message, null); + assertEquals(event, fromMessage); + } + @Test @Disabled void multipleGenericFieldMessageToAndFrom() { @@ -379,4 +411,38 @@ public static class GenericTestDataWithPredefinedType { private Integer index; private MultiGenericTestData data; } + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class Event { + + private String id; + private T payload; + } + + // Three-level hierarchy: Notification extends Alert extends BaseAlert + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class BaseAlert { + + private String id; + } + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class Alert extends BaseAlert { + + private String message; + } + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class Notification extends Alert { + + private int priority; + } } diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/support/RqueueMessageUtilsTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/support/RqueueMessageUtilsTest.java index 7071dfed..be618e1e 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/support/RqueueMessageUtilsTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/support/RqueueMessageUtilsTest.java @@ -148,8 +148,8 @@ void buildMessageWithDelay() { @Test void buildMessageNull() { + // id is null so the type parameter T cannot be resolved, making conversion fail GenericClass genericClass = new GenericClass<>(); - genericClass.id = UUID.randomUUID().toString(); try { RqueueMessageUtils.buildMessage( messageConverter, @@ -167,8 +167,8 @@ void buildMessageNull() { @Test void buildPeriodicMessageNull() { + // id is null so the type parameter T cannot be resolved, making conversion fail GenericClass genericClass = new GenericClass<>(); - genericClass.id = UUID.randomUUID().toString(); try { RqueueMessageUtils.buildPeriodicMessage( messageConverter, @@ -186,8 +186,9 @@ void buildPeriodicMessageNull() { @Test void buildMessageReturnInvalidType() { + // id is null so GenericMessageConverter returns null; falls through to NoMessageConverter + // which wraps the object in a GenericMessage with a non-String/non-byte[] payload GenericClass genericClass = new GenericClass<>(); - genericClass.id = UUID.randomUUID().toString(); try { RqueueMessageUtils.buildMessage( messageConverter2, @@ -205,8 +206,9 @@ void buildMessageReturnInvalidType() { @Test void buildPeriodicMessageReturnInvalidType() { + // id is null so GenericMessageConverter returns null; falls through to NoMessageConverter + // which wraps the object in a GenericMessage with a non-String/non-byte[] payload GenericClass genericClass = new GenericClass<>(); - genericClass.id = UUID.randomUUID().toString(); try { RqueueMessageUtils.buildPeriodicMessage( messageConverter2, diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerPropertiesTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerPropertiesTest.java new file mode 100644 index 00000000..efa4e261 --- /dev/null +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerPropertiesTest.java @@ -0,0 +1,49 @@ +package com.github.sonus21.rqueue.listener; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.stream.LongStream; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.NullSource; + +class HardStrictPriorityPollerPropertiesTest { + + private final HardStrictPriorityPollerProperties properties = + new HardStrictPriorityPollerProperties(); + + static LongStream invalidValues() { + return LongStream.of(0L, -1L, -100L, Long.MIN_VALUE); + } + + static LongStream validValues() { + return LongStream.of(1L, Long.MAX_VALUE); + } + + @ParameterizedTest + @MethodSource("invalidValues") + void setAfterPollSleepIntervalInvalidValues(Long value) { + assertThrows(IllegalArgumentException.class, () -> properties.setAfterPollSleepInterval(value)); + } + + @ParameterizedTest + @NullSource + @MethodSource("validValues") + void setAfterPollSleepIntervalValidValues(Long value) { + assertDoesNotThrow(() -> properties.setAfterPollSleepInterval(value)); + } + + @ParameterizedTest + @MethodSource("invalidValues") + void setSemaphoreWaitTimeInvalidValues(Long value) { + assertThrows(IllegalArgumentException.class, () -> properties.setSemaphoreWaitTime(value)); + } + + @ParameterizedTest + @NullSource + @MethodSource("validValues") + void setSemaphoreWaitTimeValidValues(Long value) { + assertDoesNotThrow(() -> properties.setSemaphoreWaitTime(value)); + } +} diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerTest.java new file mode 100644 index 00000000..2392ebb1 --- /dev/null +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerTest.java @@ -0,0 +1,178 @@ +package com.github.sonus21.rqueue.listener; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import com.github.sonus21.TestBase; +import com.github.sonus21.rqueue.CoreUnitTest; +import com.github.sonus21.rqueue.core.RqueueBeanProvider; +import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer.QueueStateMgr; +import com.github.sonus21.rqueue.utils.Constants; +import com.github.sonus21.rqueue.utils.QueueThreadPool; +import com.github.sonus21.rqueue.utils.TimeoutUtils; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.springframework.messaging.MessageHeaders; + +@CoreUnitTest +class HardStrictPriorityPollerTest extends TestBase { + + @Mock + private RqueueBeanProvider rqueueBeanProvider; + @Mock + private QueueStateMgr queueStateMgr; + @Mock + private PostProcessingHandler postProcessingHandler; + + private final String highPriorityQueue = "high-priority-" + UUID.randomUUID(); + private final String lowPriorityQueue = "low-priority-" + UUID.randomUUID(); + private HardStrictPriorityPoller poller; + private QueueDetail highDetail; + private QueueDetail lowDetail; + + @BeforeEach + public void setUp() { + MockitoAnnotations.openMocks(this); + + highDetail = createQueueDetail(highPriorityQueue, 100); + lowDetail = createQueueDetail(lowPriorityQueue, 10); + + List queueDetails = Arrays.asList(lowDetail, highDetail); + Map queueNameToThread = new HashMap<>(); + queueNameToThread.put(highPriorityQueue, mock(QueueThreadPool.class)); + queueNameToThread.put(lowPriorityQueue, mock(QueueThreadPool.class)); + + poller = spy(new HardStrictPriorityPoller( + "test-group", + queueDetails, + queueNameToThread, + rqueueBeanProvider, + queueStateMgr, + Collections.emptyList(), + 50L, + 50L, + postProcessingHandler, + new MessageHeaders(Collections.emptyMap()), + new HardStrictPriorityPollerProperties() + )); + + // Allowing queue polling + lenient().doReturn(true).when(poller).eligibleForPolling(anyString()); + // Disable immediate exit from the loop + lenient().doReturn(false).when(poller).shouldExit(); + } + + private QueueDetail createQueueDetail(String name, int priority) { + QueueDetail detail = mock(QueueDetail.class); + when(detail.getName()).thenReturn(name); + Map priorityMap = new HashMap<>(); + priorityMap.put(Constants.DEFAULT_PRIORITY_KEY, priority); + when(detail.getPriority()).thenReturn(priorityMap); + return detail; + } + + @Test + void testQueuesAreSortedByPriority() { + List sortedQueues = poller.queues; + assertTrue(sortedQueues.indexOf(highPriorityQueue) < sortedQueues.indexOf(lowPriorityQueue), + "High priority queue should be first"); + } + + @Test + void testExistMessagesInHigherPriorityQueueReturnsTrue() { + List queues = Arrays.asList(highPriorityQueue, lowPriorityQueue); + + // High queue has messages + lenient().doReturn(true).when(poller).existAvailableMessagesForPoll(highDetail); + + // Checking from low priority perspective + boolean result = poller.existMessagesInCurrentQueueOrHigherPriorityQueue(lowPriorityQueue, queues); + assertTrue(result, "Should return true because high priority queue has messages"); + } + + @Test + void testStrictExecutionPreventsLowPriorityPoll() throws Exception { + AtomicInteger highQueuePollCount = new AtomicInteger(0); + AtomicInteger lowQueuePollCount = new AtomicInteger(0); + + // High Priority always has messages + lenient().doReturn(true).when(poller).existAvailableMessagesForPoll(highDetail); + lenient().doAnswer(invocation -> { + highQueuePollCount.incrementAndGet(); + return 1; + }).when(poller).poll(anyInt(), eq(highPriorityQueue), eq(highDetail), any()); + + // Low Priority also has messages + lenient().doReturn(true).when(poller).existAvailableMessagesForPoll(lowDetail); + lenient().doAnswer(invocation -> { + lowQueuePollCount.incrementAndGet(); + return 1; + }).when(poller).poll(anyInt(), eq(lowPriorityQueue), eq(lowDetail), any()); + + Thread pollerThread = new Thread(poller::start); + pollerThread.start(); + + try { + // Wait for multiple polls of High priority + TimeoutUtils.waitFor(() -> highQueuePollCount.get() > 5, 2000, "high priority polls"); + + // Low priority must NEVER be polled because 'break' happens after high poll + assertTrue(lowQueuePollCount.get() == 0, "Low priority queue should not be polled"); + } finally { + stop(poller, pollerThread); + } + } + + @Test + void testLowPriorityIsPolledWhenHighIsEmpty() throws Exception { + AtomicInteger lowQueuePollCount = new AtomicInteger(0); + + // High is empty + lenient().doReturn(false).when(poller).existAvailableMessagesForPoll(highDetail); + + // Low has messages + lenient().doReturn(true).when(poller).existAvailableMessagesForPoll(lowDetail); + lenient().doAnswer(invocation -> { + lowQueuePollCount.incrementAndGet(); + return 1; + }).when(poller).poll(anyInt(), eq(lowPriorityQueue), eq(lowDetail), any()); + + Thread pollerThread = new Thread(poller::start); + pollerThread.start(); + + try { + TimeoutUtils.waitFor(() -> lowQueuePollCount.get() > 0, 2000, "low priority poll"); + assertTrue(lowQueuePollCount.get() > 0); + } finally { + stop(poller, pollerThread); + } + } + + private void stop(HardStrictPriorityPoller poller, Thread thread) { + lenient().doReturn(true).when(poller).shouldExit(); + if (thread != null && thread.isAlive()) { + thread.interrupt(); + try { + thread.join(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } +} \ No newline at end of file