Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ ext {

subprojects {
group = "com.github.sonus21"
version = "4.0.0-RC8"
version = "4.0.0-RC9"

dependencies {
// https://mvnrepository.com/artifact/org.springframework/spring-messaging
Expand Down
12 changes: 12 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@ foundational Spring Boot 4 and Jackson 3 migration notes; RC3 for the Java 17
baseline change; RC4–RC6 below for the NATS backend, broker SPI, dashboard
work, and middleware additions that build on top.

## Release [4.0.0.RC9] 2026-05-13

{: .highlight}
Release candidate.

### Features
* **Global retry limit** — added `rqueue.retry.max` to cap the implicit
retry-forever default for listeners that do not configure `numRetries`.
Explicit per-listener retry counts and the existing DLQ retry default continue
to take precedence. On NATS, the effective retry count maps to JetStream
`maxDeliver` as `retries + 1`.

## Release [4.0.0.RC8] 2026-05-09

{: .highlight}
Expand Down
13 changes: 11 additions & 2 deletions docs/configuration/retry-and-backoff.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ Note: Messages handled this way are neither retried nor moved to the Dead Letter
When a message handler fails, the message can be retried immediately, delayed for a
future retry, moved to a dead letter queue, or dropped.

### Global Retry Limit

Set `rqueue.retry.max=N` to limit listeners that do not configure `numRetries` and
would otherwise retry forever. The default is `-1`, which leaves the retry-forever
default unchanged. Explicit `@RqueueListener(numRetries = "...")` values and the
dead-letter queue default retry count continue to take precedence.

For the NATS backend, this retry count is translated to JetStream `maxDeliver` as
`N + 1`, because JetStream counts the initial delivery plus retries. For example,
`rqueue.retry.max=3` creates consumers with at most four total deliveries.

### Immediate Retries
To retry a message immediately within the same polling cycle, set
`rqueue.retry.per.poll` to a positive integer (e.g., `2`). This will cause the
Expand Down Expand Up @@ -75,5 +86,3 @@ public class RqueueConfiguration {
}
```



Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ private static String generateBrokerId() {
@Value("${rqueue.retry.per.poll:1}")
private int retryPerPoll;

@Value("${rqueue.retry.max:-1}")
private int maxRetry = -1;

@Value("${rqueue.net.proxy.host:}")
private String proxyHost;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,20 +471,20 @@ private AsyncTaskExecutor createTaskExecutor(

private List<QueueDetail> getQueueDetail(String queue, MappingInformation mappingInformation) {
int numRetry = mappingInformation.getNumRetry();
RqueueConfig rqueueConfig = rqueueBeanProvider.getRqueueConfig();
if (!StringUtils.isEmpty(mappingInformation.getDeadLetterQueueName()) && numRetry == -1) {
log.warn(
"Dead letter queue {} is set but retry is not set",
mappingInformation.getDeadLetterQueueName());
numRetry = Constants.DEFAULT_RETRY_DEAD_LETTER_QUEUE;
} else if (numRetry == -1) {
numRetry = Integer.MAX_VALUE;
numRetry = rqueueConfig.getMaxRetry() >= 0 ? rqueueConfig.getMaxRetry() : Integer.MAX_VALUE;
}
String priorityGroup = mappingInformation.getPriorityGroup();
Map<String, Integer> priority = mappingInformation.getPriority();
if (StringUtils.isEmpty(priorityGroup) && priority.size() == 1) {
priorityGroup = Constants.DEFAULT_PRIORITY_GROUP;
}
RqueueConfig rqueueConfig = rqueueBeanProvider.getRqueueConfig();
QueueDetail queueDetail = QueueDetail.builder()
.name(queue)
.queueName(rqueueConfig.getQueueName(queue))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
{
"groups": [],
"properties": [
{
"name": "rqueue.retry.max",
"type": "java.lang.Integer",
"description": "Global retry limit used when a listener does not configure numRetries and would otherwise retry forever. -1 preserves the retry-forever default. Explicit listener retry counts and DLQ defaults take precedence.",
"defaultValue": -1,
"sourceType": "com.github.sonus21.rqueue.config.RqueueConfig"
},
{
"name": "rqueue.nats.auto-create-streams",
"type": "java.lang.Boolean",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,9 @@ void workerRegistryProperties() {
assertEquals(
Duration.ofSeconds(15), rqueueConfigVersion2.getWorkerRegistryQueueHeartbeatInterval());
}

@Test
void defaultMaxRetryIsDisabled() {
assertEquals(-1, rqueueConfigVersion2.getMaxRetry());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@
import com.github.sonus21.rqueue.common.RqueueLockManager;
import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.config.RqueueWebConfig;
import com.github.sonus21.rqueue.core.EndpointRegistry;
import com.github.sonus21.rqueue.core.RqueueBeanProvider;
import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao;
import com.github.sonus21.rqueue.models.Concurrency;
import com.github.sonus21.rqueue.models.db.MessageMetadata;
import com.github.sonus21.rqueue.models.db.QueueConfig;
import com.github.sonus21.rqueue.models.enums.MessageStatus;
Expand Down Expand Up @@ -67,6 +69,7 @@
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;

@CoreUnitTest
class RqueueMessageListenerContainerTest extends TestBase {
Expand Down Expand Up @@ -172,6 +175,68 @@ void setTaskExecutor() {
((ThreadPoolTaskExecutor) container.getTaskExecutor()).getThreadNamePrefix());
}

@Test
void globalMaxRetryCapsDefaultRetryForever() throws Exception {
beanProvider.getRqueueConfig().setMaxRetry(5);
doReturn(handlerMap(mapping(slowQueue, -1, null)))
.when(rqueueMessageHandler)
.getHandlerMethodMap();

container.afterPropertiesSet();

assertEquals(5, EndpointRegistry.get(slowQueue).getNumRetry());
}

@Test
void globalMaxRetryDoesNotOverrideExplicitRetryCount() throws Exception {
beanProvider.getRqueueConfig().setMaxRetry(2);
doReturn(handlerMap(mapping(slowQueue, 10, null)))
.when(rqueueMessageHandler)
.getHandlerMethodMap();

container.afterPropertiesSet();

assertEquals(10, EndpointRegistry.get(slowQueue).getNumRetry());
}

@Test
void globalMaxRetryDoesNotRaiseLowerExplicitRetryCount() throws Exception {
beanProvider.getRqueueConfig().setMaxRetry(5);
doReturn(handlerMap(mapping(slowQueue, 2, null)))
.when(rqueueMessageHandler)
.getHandlerMethodMap();

container.afterPropertiesSet();

assertEquals(2, EndpointRegistry.get(slowQueue).getNumRetry());
}

@Test
void globalMaxRetryCanDisableRetries() throws Exception {
beanProvider.getRqueueConfig().setMaxRetry(0);
doReturn(handlerMap(mapping(slowQueue, -1, null)))
.when(rqueueMessageHandler)
.getHandlerMethodMap();

container.afterPropertiesSet();

assertEquals(0, EndpointRegistry.get(slowQueue).getNumRetry());
}

@Test
void globalMaxRetryDoesNotOverrideDeadLetterDefaultRetryCount() throws Exception {
beanProvider.getRqueueConfig().setMaxRetry(0);
doReturn(handlerMap(mapping(slowQueue, -1, slowQueue + "-dlq")))
.when(rqueueMessageHandler)
.getHandlerMethodMap();

container.afterPropertiesSet();

assertEquals(
Constants.DEFAULT_RETRY_DEAD_LETTER_QUEUE,
EndpointRegistry.get(slowQueue).getNumRetry());
}

@Test
void phaseSetting() {
assertEquals(Integer.MAX_VALUE, container.getPhase());
Expand Down Expand Up @@ -534,6 +599,28 @@ private class TestListenerContainer extends RqueueMessageListenerContainer {
}
}

private MultiValueMap<MappingInformation, RqueueMessageHandler.HandlerMethodWithPrimary>
handlerMap(MappingInformation mappingInformation) {
LinkedMultiValueMap<MappingInformation, RqueueMessageHandler.HandlerMethodWithPrimary> map =
new LinkedMultiValueMap<>();
map.add(mappingInformation, new RqueueMessageHandler.HandlerMethodWithPrimary(null, false));
return map;
}

private MappingInformation mapping(String queue, int numRetry, String deadLetterQueueName) {
return MappingInformation.builder()
.queueNames(Collections.singleton(queue))
.numRetry(numRetry)
.deadLetterQueueName(deadLetterQueueName)
.deadLetterConsumerEnabled(false)
.visibilityTimeout(VISIBILITY_TIMEOUT)
.active(false)
.concurrency(new Concurrency(0, 0))
.priority(Collections.singletonMap(Constants.DEFAULT_PRIORITY_KEY, 1))
.batchSize(1)
.build();
}

@Getter
private class StubMessageSchedulerListenerContainer extends RqueueMessageListenerContainer {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.github.sonus21.rqueue.models.db.RqueueJob;
import com.github.sonus21.rqueue.nats.internal.NatsProvisioner;
import com.github.sonus21.rqueue.nats.kv.NatsKvBuckets;
import com.github.sonus21.rqueue.nats.kv.NatsKvKeys;
import io.nats.client.JetStreamApiException;
import io.nats.client.KeyValue;
import io.nats.client.api.KeyValueEntry;
Expand Down Expand Up @@ -69,15 +70,15 @@ public void createJob(RqueueJob rqueueJob, Duration expiry) {
@Override
public void save(RqueueJob rqueueJob, Duration expiry) {
try {
kv(expiry).put(sanitize(rqueueJob.getId()), serialize(rqueueJob));
kv(expiry).put(NatsKvKeys.sanitize(rqueueJob.getId()), serialize(rqueueJob));
} catch (IOException | JetStreamApiException e) {
log.log(Level.WARNING, "save job " + rqueueJob.getId() + " failed", e);
}
}

@Override
public RqueueJob findById(String jobId) {
return loadByKey(sanitize(jobId));
return loadByKey(NatsKvKeys.sanitize(jobId));
}

@Override
Expand Down Expand Up @@ -111,7 +112,7 @@ public List<RqueueJob> finByMessageIdIn(List<String> messageIds) {
@Override
public void delete(String jobId) {
try {
kv(null).delete(sanitize(jobId));
kv(null).delete(NatsKvKeys.sanitize(jobId));
} catch (IOException | JetStreamApiException e) {
log.log(Level.WARNING, "delete job " + jobId + " failed", e);
}
Expand Down Expand Up @@ -164,9 +165,4 @@ private RqueueJob deserialize(byte[] bytes) {
return null;
}
}

/** KV keys allow {@code [A-Za-z0-9_=.-]} only. */
private static String sanitize(String key) {
return key == null ? "_" : key.replaceAll("[^A-Za-z0-9_=.-]", "_");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.github.sonus21.rqueue.models.db.QueueStatistics;
import com.github.sonus21.rqueue.nats.internal.NatsProvisioner;
import com.github.sonus21.rqueue.nats.kv.NatsKvBuckets;
import com.github.sonus21.rqueue.nats.kv.NatsKvKeys;
import io.nats.client.JetStreamApiException;
import io.nats.client.KeyValue;
import io.nats.client.api.KeyValueEntry;
Expand Down Expand Up @@ -72,7 +73,7 @@ public QueueStatistics findById(String id) {
return null;
}
try {
KeyValueEntry entry = kv().get(sanitize(id));
KeyValueEntry entry = kv().get(NatsKvKeys.sanitize(id));
if (entry == null || entry.getValue() == null) {
return null;
}
Expand Down Expand Up @@ -104,7 +105,7 @@ public void save(QueueStatistics queueStatistics) {
throw new IllegalArgumentException("id cannot be null: " + queueStatistics);
}
try {
kv().put(sanitize(queueStatistics.getId()), serialize(queueStatistics));
kv().put(NatsKvKeys.sanitize(queueStatistics.getId()), serialize(queueStatistics));
} catch (IOException | JetStreamApiException e) {
log.log(Level.WARNING, "save id=" + queueStatistics.getId() + " failed", e);
}
Expand All @@ -124,9 +125,4 @@ private QueueStatistics deserialize(byte[] bytes) {
return null;
}
}

/** KV keys allow {@code [A-Za-z0-9_=.-]} only. */
private static String sanitize(String key) {
return key == null ? "_" : key.replaceAll("[^A-Za-z0-9_=.-]", "_");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.github.sonus21.rqueue.models.db.QueueConfig;
import com.github.sonus21.rqueue.nats.internal.NatsProvisioner;
import com.github.sonus21.rqueue.nats.kv.NatsKvBuckets;
import com.github.sonus21.rqueue.nats.kv.NatsKvKeys;
import io.nats.client.JetStreamApiException;
import io.nats.client.KeyValue;
import io.nats.client.api.KeyValueEntry;
Expand Down Expand Up @@ -74,7 +75,7 @@ public QueueConfig getConfigByName(String name, boolean cached) {
return hit;
}
}
QueueConfig loaded = loadByKey(sanitize(name));
QueueConfig loaded = loadByKey(NatsKvKeys.sanitize(name));
if (loaded != null) {
cache.put(name, loaded);
}
Expand Down Expand Up @@ -120,7 +121,7 @@ public List<QueueConfig> findAllQConfig(Collection<String> ids) {
@Override
public void saveQConfig(QueueConfig queueConfig) {
try {
kv().put(sanitize(queueConfig.getName()), serialize(queueConfig));
kv().put(NatsKvKeys.sanitize(queueConfig.getName()), serialize(queueConfig));
cache.put(queueConfig.getName(), queueConfig);
} catch (IOException | JetStreamApiException e) {
log.log(Level.WARNING, "saveQConfig " + queueConfig.getName() + " failed", e);
Expand Down Expand Up @@ -188,9 +189,4 @@ private QueueConfig deserialize(byte[] bytes) {
return null;
}
}

/** KV keys allow {@code [A-Za-z0-9_=.-]} only. */
private static String sanitize(String key) {
return key == null ? "_" : key.replaceAll("[^A-Za-z0-9_=.-]", "_");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -510,15 +510,16 @@ public static Duration resolveAckWait(QueueDetail q, RqueueNatsConfig config) {
/**
* Resolve the JetStream {@code maxDeliver} from per-queue {@link QueueDetail#getNumRetry()}
* (counted as initial delivery + N retries = numRetry + 1). The {@link Integer#MAX_VALUE}
* "retry forever" sentinel maps to JetStream's unlimited value ({@code -1}); non-positive
* numRetry falls back to {@code RqueueNatsConfig.ConsumerDefaults.getMaxDeliver()}.
* "retry forever" sentinel maps to JetStream's unlimited value ({@code -1}); zero means
* one total delivery and no retries. Negative numRetry falls back to
* {@code RqueueNatsConfig.ConsumerDefaults.getMaxDeliver()}.
*/
public static long resolveMaxDeliver(QueueDetail q, RqueueNatsConfig config) {
int numRetry = q.getNumRetry();
if (numRetry == Integer.MAX_VALUE) {
return -1L;
}
if (numRetry > 0) {
if (numRetry >= 0) {
return numRetry + 1L;
}
return config.getConsumerDefaults().getMaxDeliver();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2026 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*/
package com.github.sonus21.rqueue.nats.kv;

/** Utility methods for converting rqueue keys into NATS JetStream KV keys. */
public final class NatsKvKeys {

private NatsKvKeys() {}

/**
* NATS KV keys do not need Redis Cluster hash tags. Strip any {@code {name}} tag first, then
* coerce the remaining key into the conservative KV key character set used by this module.
*/
public static String sanitize(String key) {
if (key == null) {
return "_";
}
String sanitized = key.replaceAll("\\{([^{}]*)}", "$1").replaceAll("[^A-Za-z0-9_=.-]", "_");
return sanitized.isEmpty() ? "_" : sanitized;
}
}
Loading