Erweiterungen

This commit is contained in:
2026-01-07 08:52:14 +01:00
parent cd1a2fc2be
commit 9b838863d9
6 changed files with 83 additions and 338 deletions

View File

@@ -10,7 +10,8 @@
"Bash(xargs kill:*)", "Bash(xargs kill:*)",
"Bash(cat:*)", "Bash(cat:*)",
"Bash(mongosh:*)", "Bash(mongosh:*)",
"Bash(mongo:*)" "Bash(mongo:*)",
"Bash(kill:*)"
], ],
"deny": [], "deny": [],
"ask": [] "ask": []

View File

@@ -4,7 +4,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import de.assecutor.votianlt.messaging.model.*; import de.assecutor.votianlt.messaging.model.*;
import de.assecutor.votianlt.messaging.plugin.PluginManager; import de.assecutor.votianlt.messaging.plugin.PluginManager;
import de.assecutor.votianlt.messaging.plugin.SendOptions; import de.assecutor.votianlt.messaging.plugin.SendOptions;
import de.assecutor.votianlt.repository.MessageEnvelopeRepository;
import de.assecutor.votianlt.repository.PendingDeliveryRepository; import de.assecutor.votianlt.repository.PendingDeliveryRepository;
import de.assecutor.votianlt.service.ClientConnectionService; import de.assecutor.votianlt.service.ClientConnectionService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@@ -17,12 +16,9 @@ import jakarta.annotation.PreDestroy;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.time.Duration; import java.time.Duration;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@@ -42,48 +38,22 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService {
private final PluginManager pluginManager; private final PluginManager pluginManager;
private final PendingDeliveryRepository pendingDeliveryRepository; private final PendingDeliveryRepository pendingDeliveryRepository;
private final MessageEnvelopeRepository envelopeRepository;
private final AcknowledgmentHandler acknowledgmentHandler; private final AcknowledgmentHandler acknowledgmentHandler;
private final DeliveryConfig config; private final DeliveryConfig config;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
private final ClientConnectionService clientConnectionService; private final ClientConnectionService clientConnectionService;
// In-memory tracking of messages awaiting acknowledgment
private final Map<String, PendingAckMessage> pendingAckMessages = new ConcurrentHashMap<>();
private ScheduledExecutorService retryScheduler; private ScheduledExecutorService retryScheduler;
/**
* Holds information about a message pending acknowledgment.
*/
private static class PendingAckMessage {
final String messageId;
final String clientId;
final String messageType;
final byte[] envelopeData;
final String json;
int retryCount;
PendingAckMessage(String messageId, String clientId, String messageType, byte[] envelopeData, String json) {
this.messageId = messageId;
this.clientId = clientId;
this.messageType = messageType;
this.envelopeData = envelopeData;
this.json = json;
this.retryCount = 0;
}
}
public MessageDeliveryServiceImpl( public MessageDeliveryServiceImpl(
PluginManager pluginManager, PluginManager pluginManager,
PendingDeliveryRepository pendingDeliveryRepository, PendingDeliveryRepository pendingDeliveryRepository,
MessageEnvelopeRepository envelopeRepository,
AcknowledgmentHandler acknowledgmentHandler, AcknowledgmentHandler acknowledgmentHandler,
DeliveryConfig config, DeliveryConfig config,
ObjectMapper objectMapper, ObjectMapper objectMapper,
ClientConnectionService clientConnectionService) { ClientConnectionService clientConnectionService) {
this.pluginManager = pluginManager; this.pluginManager = pluginManager;
this.pendingDeliveryRepository = pendingDeliveryRepository; this.pendingDeliveryRepository = pendingDeliveryRepository;
this.envelopeRepository = envelopeRepository;
this.acknowledgmentHandler = acknowledgmentHandler; this.acknowledgmentHandler = acknowledgmentHandler;
this.config = config; this.config = config;
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
@@ -98,7 +68,7 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService {
return t; return t;
}); });
retryScheduler.scheduleAtFixedRate( retryScheduler.scheduleAtFixedRate(
this::checkAndRetryPendingAckMessages, this::retryPendingDeliveries,
ackRetryIntervalSeconds, ackRetryIntervalSeconds,
ackRetryIntervalSeconds, ackRetryIntervalSeconds,
TimeUnit.SECONDS TimeUnit.SECONDS
@@ -123,81 +93,17 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService {
} }
} }
/**
* Check for pending messages and retry sending them.
* Called every RETRY_INTERVAL_SECONDS seconds.
*/
private void checkAndRetryPendingAckMessages() {
if (pendingAckMessages.isEmpty()) {
return;
}
log.debug("[MessageDelivery] Checking {} pending messages for retry", pendingAckMessages.size());
List<String> toRemove = new ArrayList<>();
for (PendingAckMessage pending : pendingAckMessages.values()) {
// Only retry if client is connected
if (!clientConnectionService.isClientConnected(pending.clientId)) {
log.debug("[MessageDelivery] Skipping retry for message {} - client {} is not connected",
pending.messageId, pending.clientId);
continue;
}
pending.retryCount++;
if (pending.retryCount > ackMaxRetries) {
// Max retries reached - log and remove
log.error("[MessageDelivery] Message {} not acknowledged after {} attempts. Removing from retry queue. JSON: {}",
pending.messageId, ackMaxRetries, pending.json);
toRemove.add(pending.messageId);
} else {
// Retry sending
log.info("[MessageDelivery] Retrying message {} (attempt {}/{})",
pending.messageId, pending.retryCount, ackMaxRetries);
try {
SendOptions sendOptions = SendOptions.reliable();
pluginManager.sendToClient(pending.clientId, pending.messageType, pending.envelopeData, sendOptions)
.thenAccept(v -> log.info("[MessageDelivery] Retry {} sent successfully for message {}",
pending.retryCount, pending.messageId))
.exceptionally(ex -> {
log.error("[MessageDelivery] Retry {} failed for message {}: {}",
pending.retryCount, pending.messageId, ex.getMessage());
return null;
});
} catch (Exception e) {
log.error("[MessageDelivery] Error during retry for message {}: {}",
pending.messageId, e.getMessage(), e);
}
}
}
// Remove messages that exceeded max retries
toRemove.forEach(pendingAckMessages::remove);
}
@Override @Override
public CompletableFuture<DeliveryReceipt> sendToClient(String clientId, String messageType, Object payload, DeliveryOptions options) { public CompletableFuture<DeliveryReceipt> sendToClient(String clientId, String messageType, Object payload, DeliveryOptions options) {
try { try {
// Create destination identifier for tracking
String destination = clientId + "/" + messageType; String destination = clientId + "/" + messageType;
// Create message envelope
final LocalDateTime expiresAt = options.calculateExpiryTime(); final LocalDateTime expiresAt = options.calculateExpiryTime();
MessageEnvelope envelope = new MessageEnvelope(destination, payload, options.isRequiresAck(), expiresAt); MessageEnvelope envelope = new MessageEnvelope(destination, payload, options.isRequiresAck(), expiresAt);
// Save envelope to database
envelope = envelopeRepository.save(envelope);
final String messageId = envelope.getMessageId(); final String messageId = envelope.getMessageId();
log.debug("[MessageDelivery] Created envelope {} for client {} (type: {})", messageId, clientId, messageType);
// Serialize envelope to JSON
String json = objectMapper.writeValueAsString(envelope); String json = objectMapper.writeValueAsString(envelope);
byte[] envelopeData = json.getBytes(StandardCharsets.UTF_8); byte[] envelopeData = json.getBytes(StandardCharsets.UTF_8);
log.info("[MessageDelivery] Sending JSON to client {} (type: {}): {}", clientId, messageType, json);
// Create pending delivery record if acknowledgment is required
if (options.isRequiresAck()) { if (options.isRequiresAck()) {
PendingDelivery pending = new PendingDelivery( PendingDelivery pending = new PendingDelivery(
messageId, messageId,
@@ -207,15 +113,8 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService {
expiresAt expiresAt
); );
pendingDeliveryRepository.save(pending); pendingDeliveryRepository.save(pending);
log.debug("[MessageDelivery] Created pending delivery for message {}", messageId);
// Add to in-memory retry queue
PendingAckMessage pendingAck = new PendingAckMessage(messageId, clientId, messageType, envelopeData, json);
pendingAckMessages.put(messageId, pendingAck);
log.info("[MessageDelivery] Added message {} to retry queue (requires ACK)", messageId);
} }
// Send via plugin manager
SendOptions sendOptions = SendOptions.builder() SendOptions sendOptions = SendOptions.builder()
.qos(options.getQos()) .qos(options.getQos())
.retained(options.isRetained()) .retained(options.isRetained())
@@ -224,19 +123,17 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService {
final boolean requiresAck = options.isRequiresAck(); final boolean requiresAck = options.isRequiresAck();
final Duration ackTimeout = options.getAckTimeout(); final Duration ackTimeout = options.getAckTimeout();
log.info("[MessageDelivery] Sending message {} to client {} (type: {})", messageId, clientId, messageType);
return pluginManager.sendToClient(clientId, messageType, envelopeData, sendOptions) return pluginManager.sendToClient(clientId, messageType, envelopeData, sendOptions)
.thenApply(v -> { .thenApply(v -> {
// Update pending delivery status
if (requiresAck) { if (requiresAck) {
updatePendingDeliveryAfterSend(messageId, ackTimeout); updatePendingDeliveryAfterSend(messageId, ackTimeout);
} }
log.info("[MessageDelivery] Successfully sent message {} to client {} (type: {})",
messageId, clientId, messageType);
return DeliveryReceipt.submitted(messageId, destination, expiresAt); return DeliveryReceipt.submitted(messageId, destination, expiresAt);
}) })
.exceptionally(ex -> { .exceptionally(ex -> {
log.error("[MessageDelivery] Failed to send message {} to client {} (type: {}): {}", log.error("[MessageDelivery] Failed to send message {}: {}", messageId, ex.getMessage());
messageId, clientId, messageType, ex.getMessage());
if (requiresAck) { if (requiresAck) {
markPendingDeliveryFailed(messageId, ex.getMessage()); markPendingDeliveryFailed(messageId, ex.getMessage());
} }
@@ -244,8 +141,7 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService {
}); });
} catch (Exception e) { } catch (Exception e) {
log.error("[MessageDelivery] Error creating message for client {} (type: {}): {}", log.error("[MessageDelivery] Error creating message for client {}: {}", clientId, e.getMessage());
clientId, messageType, e.getMessage(), e);
return CompletableFuture.completedFuture(DeliveryReceipt.failed("error", clientId + "/" + messageType)); return CompletableFuture.completedFuture(DeliveryReceipt.failed("error", clientId + "/" + messageType));
} }
} }
@@ -253,16 +149,12 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService {
@Override @Override
@Deprecated @Deprecated
public CompletableFuture<DeliveryReceipt> sendMessage(String topic, Object payload, DeliveryOptions options) { public CompletableFuture<DeliveryReceipt> sendMessage(String topic, Object payload, DeliveryOptions options) {
// Extract clientId and messageType from topic
// Topic format: /client/{clientId}/{messageType}
String[] parts = topic.split("/"); String[] parts = topic.split("/");
if (parts.length >= 4 && parts[1].equals("client")) { if (parts.length >= 4 && parts[1].equals("client")) {
String clientId = parts[2]; String clientId = parts[2];
String messageType = parts[3]; String messageType = parts[3];
return sendToClient(clientId, messageType, payload, options); return sendToClient(clientId, messageType, payload, options);
} }
// Fallback for legacy topics - log warning
log.warn("[MessageDelivery] Using deprecated sendMessage with topic: {}", topic); log.warn("[MessageDelivery] Using deprecated sendMessage with topic: {}", topic);
return CompletableFuture.completedFuture(DeliveryReceipt.failed("error", topic)); return CompletableFuture.completedFuture(DeliveryReceipt.failed("error", topic));
} }
@@ -270,40 +162,30 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService {
@Override @Override
public void handleIncomingMessage(MessageEnvelope envelope) { public void handleIncomingMessage(MessageEnvelope envelope) {
try { try {
log.info("[MessageDelivery] Received message {} on topic {}", log.info("[MessageDelivery] Received message {} on topic {}",
envelope.getMessageId(), envelope.getTopic()); envelope.getMessageId(), envelope.getTopic());
// Send acknowledgment if required
if (envelope.isRequiresAck()) { if (envelope.isRequiresAck()) {
sendAcknowledgment(envelope); sendAcknowledgment(envelope);
} }
// Forward to acknowledgment handler for application routing
acknowledgmentHandler.routeIncomingMessage(envelope); acknowledgmentHandler.routeIncomingMessage(envelope);
} catch (Exception e) { } catch (Exception e) {
log.error("[MessageDelivery] Error handling incoming message {}: {}", log.error("[MessageDelivery] Error handling incoming message {}: {}",
envelope.getMessageId(), e.getMessage(), e); envelope.getMessageId(), e.getMessage());
} }
} }
@Override @Override
public void handleAcknowledgment(AcknowledgmentMessage ack) { public void handleAcknowledgment(AcknowledgmentMessage ack) {
try { try {
log.info("[MessageDelivery] Received acknowledgment for message {} with status {}", log.info("[MessageDelivery] Received ACK for message {} (status: {})",
ack.getMessageId(), ack.getStatus()); ack.getMessageId(), ack.getStatus());
// Remove from in-memory retry queue
PendingAckMessage removed = pendingAckMessages.remove(ack.getMessageId());
if (removed != null) {
log.info("[MessageDelivery] Removed message {} from retry queue (ACK received)", ack.getMessageId());
}
Optional<PendingDelivery> pendingOpt = pendingDeliveryRepository.findByMessageId(ack.getMessageId()); Optional<PendingDelivery> pendingOpt = pendingDeliveryRepository.findByMessageId(ack.getMessageId());
if (pendingOpt.isEmpty()) { if (pendingOpt.isEmpty()) {
log.warn("[MessageDelivery] No pending delivery found for acknowledged message {}",
ack.getMessageId());
return; return;
} }
@@ -311,21 +193,19 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService {
switch (ack.getStatus()) { switch (ack.getStatus()) {
case RECEIVED, PROCESSED -> { case RECEIVED, PROCESSED -> {
pending.markAsAcknowledged(); pendingDeliveryRepository.delete(pending);
pendingDeliveryRepository.save(pending);
log.info("[MessageDelivery] Message {} acknowledged successfully", ack.getMessageId());
} }
case FAILED -> { case FAILED -> {
pending.markAsFailed(ack.getErrorMessage()); pending.markAsFailed(ack.getErrorMessage());
pendingDeliveryRepository.save(pending); pendingDeliveryRepository.save(pending);
log.warn("[MessageDelivery] Message {} failed on client: {}", log.warn("[MessageDelivery] Message {} failed on client: {}",
ack.getMessageId(), ack.getErrorMessage()); ack.getMessageId(), ack.getErrorMessage());
} }
} }
} catch (Exception e) { } catch (Exception e) {
log.error("[MessageDelivery] Error handling acknowledgment for message {}: {}", log.error("[MessageDelivery] Error handling ACK for message {}: {}",
ack.getMessageId(), e.getMessage(), e); ack.getMessageId(), e.getMessage());
} }
} }
@@ -347,18 +227,15 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService {
.findByStatusAndNextRetryAtBefore(DeliveryStatus.SENT, LocalDateTime.now()); .findByStatusAndNextRetryAtBefore(DeliveryStatus.SENT, LocalDateTime.now());
if (readyForRetry.isEmpty()) { if (readyForRetry.isEmpty()) {
log.debug("[MessageDelivery] No pending deliveries ready for retry");
return; return;
} }
log.info("[MessageDelivery] Retrying {} pending deliveries", readyForRetry.size());
for (PendingDelivery pending : readyForRetry) { for (PendingDelivery pending : readyForRetry) {
retryDelivery(pending); retryDelivery(pending);
} }
} catch (Exception e) { } catch (Exception e) {
log.error("[MessageDelivery] Error during retry process: {}", e.getMessage(), e); log.error("[MessageDelivery] Error during retry process: {}", e.getMessage());
} }
} }
@@ -368,90 +245,49 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService {
return; return;
} }
log.info("[MessageDelivery] Client {} reconnected - retrying pending messages", clientId);
// Retry in-memory pending ACK messages for this client
int inMemoryCount = 0;
for (PendingAckMessage pending : pendingAckMessages.values()) {
if (clientId.equals(pending.clientId)) {
inMemoryCount++;
try {
SendOptions sendOptions = SendOptions.reliable();
pluginManager.sendToClient(pending.clientId, pending.messageType, pending.envelopeData, sendOptions)
.thenAccept(v -> log.info("[MessageDelivery] Reconnect retry sent for message {} to client {}",
pending.messageId, clientId))
.exceptionally(ex -> {
log.error("[MessageDelivery] Reconnect retry failed for message {}: {}",
pending.messageId, ex.getMessage());
return null;
});
} catch (Exception e) {
log.error("[MessageDelivery] Error during reconnect retry for message {}: {}",
pending.messageId, e.getMessage(), e);
}
}
}
// Retry database pending deliveries for this client
try { try {
List<PendingDelivery> pendingDeliveries = pendingDeliveryRepository List<PendingDelivery> pendingDeliveries = pendingDeliveryRepository
.findByStatusIn(List.of(DeliveryStatus.PENDING, DeliveryStatus.SENT)); .findByStatusIn(List.of(DeliveryStatus.PENDING, DeliveryStatus.SENT));
int dbCount = 0;
for (PendingDelivery pending : pendingDeliveries) { for (PendingDelivery pending : pendingDeliveries) {
String topic = pending.getTopic(); String topic = pending.getTopic();
if (topic != null && topic.startsWith(clientId + "/")) { if (topic != null && topic.startsWith(clientId + "/")) {
dbCount++;
retryDelivery(pending); retryDelivery(pending);
} }
} }
log.info("[MessageDelivery] Triggered retry for client {}: {} in-memory, {} from database",
clientId, inMemoryCount, dbCount);
} catch (Exception e) { } catch (Exception e) {
log.error("[MessageDelivery] Error retrying database deliveries for client {}: {}", log.error("[MessageDelivery] Error retrying deliveries for client {}: {}", clientId, e.getMessage());
clientId, e.getMessage(), e);
} }
} }
@Override @Override
public void cleanupOldDeliveries() { public void cleanupOldDeliveries() {
try { try {
// Clean up acknowledged deliveries older than configured retention
LocalDateTime cutoff = LocalDateTime.now().minus(Duration.ofDays(7)); LocalDateTime cutoff = LocalDateTime.now().minus(Duration.ofDays(7));
List<PendingDelivery> oldAcknowledged = pendingDeliveryRepository List<PendingDelivery> oldAcknowledged = pendingDeliveryRepository
.findByStatusAndAcknowledgedAtBefore(DeliveryStatus.ACKNOWLEDGED, cutoff); .findByStatusAndAcknowledgedAtBefore(DeliveryStatus.ACKNOWLEDGED, cutoff);
if (!oldAcknowledged.isEmpty()) { if (!oldAcknowledged.isEmpty()) {
pendingDeliveryRepository.deleteAll(oldAcknowledged); pendingDeliveryRepository.deleteAll(oldAcknowledged);
log.info("[MessageDelivery] Cleaned up {} old acknowledged deliveries", oldAcknowledged.size());
} }
// Mark expired deliveries
List<PendingDelivery> expired = pendingDeliveryRepository List<PendingDelivery> expired = pendingDeliveryRepository
.findByStatusInAndExpiresAtBefore( .findByStatusInAndExpiresAtBefore(
List.of(DeliveryStatus.PENDING, DeliveryStatus.SENT), List.of(DeliveryStatus.PENDING, DeliveryStatus.SENT),
LocalDateTime.now() LocalDateTime.now()
); );
for (PendingDelivery pending : expired) { for (PendingDelivery pending : expired) {
pending.markAsExpired(); pending.markAsExpired();
pendingDeliveryRepository.save(pending); pendingDeliveryRepository.save(pending);
} }
if (!expired.isEmpty()) {
log.info("[MessageDelivery] Marked {} deliveries as expired", expired.size());
}
} catch (Exception e) { } catch (Exception e) {
log.error("[MessageDelivery] Error during cleanup: {}", e.getMessage(), e); log.error("[MessageDelivery] Error during cleanup: {}", e.getMessage());
} }
} }
/**
* Update pending delivery after successful send
*/
private void updatePendingDeliveryAfterSend(String messageId, Duration ackTimeout) { private void updatePendingDeliveryAfterSend(String messageId, Duration ackTimeout) {
try { try {
Optional<PendingDelivery> pendingOpt = pendingDeliveryRepository.findByMessageId(messageId); Optional<PendingDelivery> pendingOpt = pendingDeliveryRepository.findByMessageId(messageId);
@@ -466,9 +302,6 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService {
} }
} }
/**
* Mark pending delivery as failed
*/
private void markPendingDeliveryFailed(String messageId, String reason) { private void markPendingDeliveryFailed(String messageId, String reason) {
try { try {
Optional<PendingDelivery> pendingOpt = pendingDeliveryRepository.findByMessageId(messageId); Optional<PendingDelivery> pendingOpt = pendingDeliveryRepository.findByMessageId(messageId);
@@ -482,40 +315,23 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService {
} }
} }
/**
* Retry a pending delivery
*/
private void retryDelivery(PendingDelivery pending) { private void retryDelivery(PendingDelivery pending) {
try { try {
// Check if expired
if (pending.isExpired()) { if (pending.isExpired()) {
pending.markAsExpired(); pending.markAsExpired();
pendingDeliveryRepository.save(pending); pendingDeliveryRepository.save(pending);
log.warn("[MessageDelivery] Message {} expired, not retrying", pending.getMessageId());
return; return;
} }
// Check if max retries reached
if (pending.hasReachedMaxRetries()) { if (pending.hasReachedMaxRetries()) {
pending.markAsFailed("Max retries reached"); pending.markAsFailed("Max retries reached");
pendingDeliveryRepository.save(pending); pendingDeliveryRepository.save(pending);
log.warn("[MessageDelivery] Message {} reached max retries", pending.getMessageId());
return; return;
} }
// Increment retry count
pending.incrementRetryCount();
// Calculate next retry time with exponential backoff
Duration backoffDelay = calculateBackoff(pending.getRetryCount());
LocalDateTime nextRetry = LocalDateTime.now().plus(backoffDelay);
// Extract clientId and messageType from topic
// Topic format: clientId/messageType
String topic = pending.getTopic(); String topic = pending.getTopic();
String[] parts = topic.split("/"); String[] parts = topic.split("/");
if (parts.length < 2) { if (parts.length < 2) {
log.error("[MessageDelivery] Invalid topic format for retry: {}", topic);
pending.markAsFailed("Invalid topic format"); pending.markAsFailed("Invalid topic format");
pendingDeliveryRepository.save(pending); pendingDeliveryRepository.save(pending);
return; return;
@@ -524,76 +340,64 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService {
String clientId = parts[0]; String clientId = parts[0];
String messageType = parts[1]; String messageType = parts[1];
// Only retry if client is connected Duration backoffDelay = calculateBackoff(pending.getRetryCount() + 1);
LocalDateTime nextRetry = LocalDateTime.now().plus(backoffDelay);
if (!clientConnectionService.isClientConnected(clientId)) { if (!clientConnectionService.isClientConnected(clientId)) {
log.debug("[MessageDelivery] Skipping retry for message {} - client {} is not connected", pending.setNextRetryAt(nextRetry);
pending.getMessageId(), clientId); pendingDeliveryRepository.save(pending);
return; return;
} }
// Send via plugin manager pending.incrementRetryCount();
SendOptions options = SendOptions.reliable(); SendOptions options = SendOptions.reliable();
pluginManager.sendToClient(clientId, messageType, pending.getEnvelopeData(), options) pluginManager.sendToClient(clientId, messageType, pending.getEnvelopeData(), options)
.thenAccept(v -> { .thenAccept(v -> {
pending.markAsSent(nextRetry); pending.markAsSent(nextRetry);
pendingDeliveryRepository.save(pending); pendingDeliveryRepository.save(pending);
log.info("[MessageDelivery] Retry {} successful for message {}",
pending.getRetryCount(), pending.getMessageId());
}) })
.exceptionally(ex -> { .exceptionally(ex -> {
log.error("[MessageDelivery] Retry failed for message {}: {}", log.error("[MessageDelivery] Retry failed for message {}: {}", pending.getMessageId(), ex.getMessage());
pending.getMessageId(), ex.getMessage());
pending.markAsFailed(ex.getMessage()); pending.markAsFailed(ex.getMessage());
pendingDeliveryRepository.save(pending); pendingDeliveryRepository.save(pending);
return null; return null;
}); });
} catch (Exception e) { } catch (Exception e) {
log.error("[MessageDelivery] Error retrying delivery {}: {}", log.error("[MessageDelivery] Error retrying delivery {}: {}", pending.getMessageId(), e.getMessage());
pending.getMessageId(), e.getMessage(), e);
} }
} }
/**
* Send acknowledgment back to client
*/
private void sendAcknowledgment(MessageEnvelope envelope) { private void sendAcknowledgment(MessageEnvelope envelope) {
try { try {
// Extract client ID from topic (e.g., /server/{clientId}/... or clientId/messageType)
String clientId = extractClientIdFromTopic(envelope.getTopic()); String clientId = extractClientIdFromTopic(envelope.getTopic());
if (clientId == null) { if (clientId == null) {
log.warn("[MessageDelivery] Cannot send ACK, no clientId in topic: {}", envelope.getTopic());
return; return;
} }
// Create acknowledgment message
AcknowledgmentMessage ack = new AcknowledgmentMessage( AcknowledgmentMessage ack = new AcknowledgmentMessage(
envelope.getMessageId(), envelope.getMessageId(),
AckStatus.RECEIVED, AckStatus.RECEIVED,
"server" "server"
); );
// Send ACK to client using new API
String ackJson = objectMapper.writeValueAsString(ack); String ackJson = objectMapper.writeValueAsString(ack);
byte[] ackData = ackJson.getBytes(StandardCharsets.UTF_8); byte[] ackData = ackJson.getBytes(StandardCharsets.UTF_8);
log.info("[MessageDelivery] Sending ACK for message {} to client {}", envelope.getMessageId(), clientId);
pluginManager.sendAckToClient(clientId, envelope.getMessageId(), ackData, SendOptions.fireAndForget()) pluginManager.sendAckToClient(clientId, envelope.getMessageId(), ackData, SendOptions.fireAndForget())
.thenAccept(v -> log.debug("[MessageDelivery] Sent ACK for message {}", envelope.getMessageId()))
.exceptionally(ex -> { .exceptionally(ex -> {
log.error("[MessageDelivery] Failed to send ACK for message {}: {}", log.error("[MessageDelivery] Failed to send ACK for message {}: {}", envelope.getMessageId(), ex.getMessage());
envelope.getMessageId(), ex.getMessage());
return null; return null;
}); });
} catch (Exception e) { } catch (Exception e) {
log.error("[MessageDelivery] Error sending acknowledgment for message {}: {}", log.error("[MessageDelivery] Error sending ACK for message {}: {}", envelope.getMessageId(), e.getMessage());
envelope.getMessageId(), e.getMessage(), e);
} }
} }
/**
* Calculate exponential backoff delay
*/
private Duration calculateBackoff(int retryCount) { private Duration calculateBackoff(int retryCount) {
long delayMs = (long) (config.getRetryInitialDelay().toMillis() long delayMs = (long) (config.getRetryInitialDelay().toMillis()
* Math.pow(config.getRetryBackoffMultiplier(), retryCount - 1)); * Math.pow(config.getRetryBackoffMultiplier(), retryCount - 1));
@@ -601,16 +405,11 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService {
return Duration.ofMillis(Math.min(delayMs, maxDelayMs)); return Duration.ofMillis(Math.min(delayMs, maxDelayMs));
} }
/**
* Extract client ID from topic pattern.
* Supports both old format (/server/{clientId}/...) and new format (clientId/messageType)
*/
private String extractClientIdFromTopic(String topic) { private String extractClientIdFromTopic(String topic) {
if (topic == null) { if (topic == null) {
return null; return null;
} }
// Old format: /server/{clientId}/...
if (topic.startsWith("/server/")) { if (topic.startsWith("/server/")) {
String[] parts = topic.split("/"); String[] parts = topic.split("/");
if (parts.length > 2) { if (parts.length > 2) {
@@ -618,7 +417,6 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService {
} }
} }
// New format: clientId/messageType
if (topic.contains("/")) { if (topic.contains("/")) {
String[] parts = topic.split("/"); String[] parts = topic.split("/");
if (parts.length >= 1) { if (parts.length >= 1) {
@@ -629,4 +427,3 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService {
return null; return null;
} }
} }

View File

@@ -1,15 +1,9 @@
package de.assecutor.votianlt.messaging.model; package de.assecutor.votianlt.messaging.model;
import com.fasterxml.jackson.annotation.JsonGetter; import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import org.bson.types.ObjectId;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.index.Indexed;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.core.mapping.Field;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.HashMap; import java.util.HashMap;
@@ -19,64 +13,52 @@ import java.util.UUID;
/** /**
* Envelope that wraps all messages sent through the messaging system. * Envelope that wraps all messages sent through the messaging system.
* Contains metadata for delivery tracking and acknowledgment. * Contains metadata for delivery tracking and acknowledgment.
* This is a DTO class - not persisted to MongoDB.
*/ */
@Data @Data
@NoArgsConstructor @NoArgsConstructor
@AllArgsConstructor @AllArgsConstructor
@Document(collection = "message_envelopes") @JsonIgnoreProperties(ignoreUnknown = true)
public class MessageEnvelope { public class MessageEnvelope {
@Id
@JsonIgnore
private ObjectId id;
/** /**
* Unique identifier for this message (UUID) * Unique identifier for this message (UUID)
*/ */
@Field("message_id")
@Indexed(unique = true)
private String messageId; private String messageId;
/** /**
* Timestamp when the envelope was created * Timestamp when the envelope was created
*/ */
@Field("timestamp")
private LocalDateTime timestamp; private LocalDateTime timestamp;
/** /**
* Target topic for this message * Target topic for this message
*/ */
@Field("topic")
private String topic; private String topic;
/** /**
* The actual message payload (can be any serializable object) * The actual message payload (can be any serializable object)
*/ */
@Field("payload")
private Object payload; private Object payload;
/** /**
* Whether this message requires acknowledgment from the receiver * Whether this message requires acknowledgment from the receiver
*/ */
@Field("requires_ack")
private boolean requiresAck; private boolean requiresAck;
/** /**
* Number of times this message has been retried * Number of times this message has been retried
*/ */
@Field("retry_count")
private int retryCount; private int retryCount;
/** /**
* When this message expires and should no longer be delivered * When this message expires and should no longer be delivered
*/ */
@Field("expires_at")
private LocalDateTime expiresAt; private LocalDateTime expiresAt;
/** /**
* Additional metadata for the message * Additional metadata for the message
*/ */
@Field("metadata")
private Map<String, String> metadata; private Map<String, String> metadata;
/** /**
@@ -116,13 +98,4 @@ public class MessageEnvelope {
} }
this.metadata.put(key, value); this.metadata.put(key, value);
} }
/**
* Returns the ObjectId as string for JSON serialization
*/
@JsonGetter("id")
public String getIdAsString() {
return id != null ? id.toString() : null;
}
} }

View File

@@ -98,9 +98,11 @@ public class AuthenticatedStartView extends VerticalLayout {
// Features Grid // Features Grid
HorizontalLayout featuresGrid = new HorizontalLayout(); HorizontalLayout featuresGrid = new HorizontalLayout();
featuresGrid.setWidthFull();
featuresGrid.setSpacing(true); featuresGrid.setSpacing(true);
featuresGrid.setJustifyContentMode(FlexComponent.JustifyContentMode.CENTER);
featuresGrid.setDefaultVerticalComponentAlignment(FlexComponent.Alignment.START); featuresGrid.setDefaultVerticalComponentAlignment(FlexComponent.Alignment.START);
featuresGrid.getStyle().set("flex-wrap", "wrap");
featuresGrid.getStyle().set("width", "100%");
// Feature Cards // Feature Cards
featuresGrid.add(createFeatureCard(VaadinIcon.COG, "Einrichtungsassistent", featuresGrid.add(createFeatureCard(VaadinIcon.COG, "Einrichtungsassistent",

View File

@@ -1,43 +0,0 @@
package de.assecutor.votianlt.repository;
import de.assecutor.votianlt.messaging.model.MessageEnvelope;
import org.bson.types.ObjectId;
import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.stereotype.Repository;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
/**
* Repository for MessageEnvelope entities.
*/
@Repository
public interface MessageEnvelopeRepository extends MongoRepository<MessageEnvelope, ObjectId> {
/**
* Find envelope by message ID
*/
Optional<MessageEnvelope> findByMessageId(String messageId);
/**
* Find all envelopes for a specific topic
*/
List<MessageEnvelope> findByTopic(String topic);
/**
* Find expired envelopes
*/
List<MessageEnvelope> findByExpiresAtBefore(LocalDateTime dateTime);
/**
* Find envelopes created after a specific time
*/
List<MessageEnvelope> findByTimestampAfter(LocalDateTime dateTime);
/**
* Delete envelopes older than specified time
*/
void deleteByTimestampBefore(LocalDateTime dateTime);
}

View File

@@ -108,21 +108,35 @@ public class ClientConnectionService {
/** /**
* Handles a pong response from a client. * Handles a pong response from a client.
* Searches by both clientId and userId since pong is sent to /server/{userId}/pong.
* *
* @param clientId The client that sent the pong * @param id The client or user identifier that sent the pong
*/ */
public void handlePong(String clientId) { public void handlePong(String id) {
if (clientId == null || clientId.isBlank()) { if (id == null || id.isBlank()) {
log.warn("Received pong from null or blank clientId"); log.warn("Received pong from null or blank id");
return; return;
} }
ClientState state = connectedClients.get(clientId); // First try direct lookup by clientId
String clientId = id;
ClientState state = connectedClients.get(id);
// If not found, search by userId
if (state == null) {
for (Map.Entry<String, ClientState> entry : connectedClients.entrySet()) {
if (id.equals(entry.getValue().userId())) {
clientId = entry.getKey();
state = entry.getValue();
break;
}
}
}
if (state != null) { if (state != null) {
boolean wasDisconnected = !state.connected(); boolean wasDisconnected = !state.connected();
ClientState updatedState = state.withPongReceived(Instant.now()); ClientState updatedState = state.withPongReceived(Instant.now());
connectedClients.put(clientId, updatedState); connectedClients.put(clientId, updatedState);
log.debug("Pong received from client: clientId={}", clientId);
// If client was disconnected and is now reconnected, retry pending messages // If client was disconnected and is now reconnected, retry pending messages
if (wasDisconnected) { if (wasDisconnected) {
@@ -130,19 +144,29 @@ public class ClientConnectionService {
messageDeliveryService.retryPendingDeliveriesForClient(clientId); messageDeliveryService.retryPendingDeliveriesForClient(clientId);
} }
} else { } else {
log.warn("Received pong from unknown client: clientId={}", clientId); log.warn("Received pong from unknown id: {}", id);
} }
} }
/** /**
* Checks if a client is currently connected. * Checks if a client is currently connected.
* Searches by both clientId and userId.
* *
* @param clientId The client identifier * @param id The client or user identifier
* @return true if the client is connected * @return true if the client is connected
*/ */
public boolean isClientConnected(String clientId) { public boolean isClientConnected(String id) {
ClientState state = connectedClients.get(clientId); if (id == null || id.isBlank()) {
return state != null && state.connected(); return false;
}
// First try direct lookup by clientId
ClientState state = connectedClients.get(id);
if (state != null && state.connected()) {
return true;
}
// Then search by userId
return connectedClients.values().stream()
.anyMatch(s -> s.connected() && id.equals(s.userId()));
} }
/** /**
@@ -209,10 +233,9 @@ public class ClientConnectionService {
} }
} }
// Send ping to connected clients // Send ping to connected clients (use userId for topic)
if (state.connected()) { if (state.connected() && state.userId() != null) {
log.info("[ClientConnectionService] Sending ping to client: {}", clientId); sendPing(state.userId());
sendPing(clientId);
ClientState updatedState = state.withPingSent(now); ClientState updatedState = state.withPingSent(now);
connectedClients.put(clientId, updatedState); connectedClients.put(clientId, updatedState);
} }
@@ -220,11 +243,11 @@ public class ClientConnectionService {
} }
/** /**
* Sends a ping message to a specific client. * Sends a ping message to a specific user.
* *
* @param clientId The target client * @param userId The target user ID (MongoDB ObjectId)
*/ */
private void sendPing(String clientId) { private void sendPing(String userId) {
try { try {
Map<String, Object> pingPayload = Map.of( Map<String, Object> pingPayload = Map.of(
"type", "ping", "type", "ping",
@@ -233,24 +256,16 @@ public class ClientConnectionService {
String json = objectMapper.writeValueAsString(pingPayload); String json = objectMapper.writeValueAsString(pingPayload);
byte[] payload = json.getBytes(StandardCharsets.UTF_8); byte[] payload = json.getBytes(StandardCharsets.UTF_8);
log.info("[ClientConnectionService] Ping payload for client {}: {}", clientId, json);
SendOptions options = SendOptions.builder() SendOptions options = SendOptions.builder()
.qos(1) .qos(1)
.retained(false) .retained(false)
.build(); .build();
pluginManager.sendToClient(clientId, "ping", payload, options) pluginManager.sendToClient(userId, "ping", payload, options);
.whenComplete((result, error) -> {
if (error != null) {
log.warn("[ClientConnectionService] Failed to send ping to client {}: {}", clientId, error.getMessage());
} else {
log.info("[ClientConnectionService] Ping sent successfully to client: {}", clientId);
}
});
} catch (Exception e) { } catch (Exception e) {
log.error("Error sending ping to client {}: {}", clientId, e.getMessage(), e); log.error("Error sending ping to user {}: {}", userId, e.getMessage());
} }
} }