Umstellung MQTT -> Websocket

This commit is contained in:
2026-02-09 19:30:54 +01:00
parent 6d43421544
commit 47938dda70
43 changed files with 692 additions and 3672 deletions

14
pom.xml
View File

@@ -64,20 +64,10 @@
<artifactId>spring-boot-starter-actuator</artifactId> <artifactId>spring-boot-starter-actuator</artifactId>
</dependency> </dependency>
<!-- Spring Integration for MQTT (managed by Spring Boot BOM) --> <!-- Spring WebSocket for messaging plugin -->
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId> <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!-- HiveMQ MQTT v5 Client -->
<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client</artifactId>
<version>1.3.3</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>

View File

@@ -24,35 +24,26 @@ import de.assecutor.votianlt.model.Comment;
import de.assecutor.votianlt.service.JobHistoryService; import de.assecutor.votianlt.service.JobHistoryService;
import de.assecutor.votianlt.service.EmailService; import de.assecutor.votianlt.service.EmailService;
import de.assecutor.votianlt.service.MessageService; import de.assecutor.votianlt.service.MessageService;
import de.assecutor.votianlt.service.ClientConnectionService;
import de.assecutor.votianlt.model.JobStatus; import de.assecutor.votianlt.model.JobStatus;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import com.fasterxml.jackson.databind.ObjectMapper; import de.assecutor.votianlt.messaging.MessagingPublisher;
import de.assecutor.votianlt.mqtt.MqttPublisher;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.bson.types.ObjectId; import org.bson.types.ObjectId;
/** /**
* MQTT message controller for handling real-time communication with apps. * Message controller for handling real-time communication with apps.
* Provides endpoints for sending and receiving messages via WebSocket/MQTT. * Provides endpoints for sending and receiving messages via WebSocket.
*/ */
@Component @Component
@Slf4j @Slf4j
public class MessageController { public class MessageController {
// Map to store userId -> clientId mapping for active sessions private final MessagingPublisher messagingPublisher;
private final Map<String, String> userClientIdMapping = new ConcurrentHashMap<>();
// Map to store clientId -> userId mapping for active sessions (reverse lookup)
private final Map<String, String> clientIdUserMapping = new ConcurrentHashMap<>();
private final MqttPublisher mqttPublisher;
private final AppUserRepository appUserRepository; private final AppUserRepository appUserRepository;
@@ -70,16 +61,13 @@ public class MessageController {
private final JobHistoryService jobHistoryService; private final JobHistoryService jobHistoryService;
private final EmailService emailService; private final EmailService emailService;
private final MessageService messageService; private final MessageService messageService;
private final ObjectMapper objectMapper;
private final ClientConnectionService clientConnectionService;
public MessageController(MqttPublisher mqttPublisher, AppUserRepository appUserRepository, public MessageController(MessagingPublisher messagingPublisher, AppUserRepository appUserRepository,
AppUserService appUserService, JobRepository jobRepository, CargoItemRepository cargoItemRepository, AppUserService appUserService, JobRepository jobRepository, CargoItemRepository cargoItemRepository,
TaskRepository taskRepository, PhotoRepository photoRepository, BarcodeRepository barcodeRepository, TaskRepository taskRepository, PhotoRepository photoRepository, BarcodeRepository barcodeRepository,
SignatureRepository signatureRepository, CommentRepository commentRepository, SignatureRepository signatureRepository, CommentRepository commentRepository,
JobHistoryService jobHistoryService, EmailService emailService, MessageService messageService, JobHistoryService jobHistoryService, EmailService emailService, MessageService messageService) {
ObjectMapper objectMapper, ClientConnectionService clientConnectionService) { this.messagingPublisher = messagingPublisher;
this.mqttPublisher = mqttPublisher;
this.appUserRepository = appUserRepository; this.appUserRepository = appUserRepository;
this.appUserService = appUserService; this.appUserService = appUserService;
this.jobRepository = jobRepository; this.jobRepository = jobRepository;
@@ -92,92 +80,52 @@ public class MessageController {
this.jobHistoryService = jobHistoryService; this.jobHistoryService = jobHistoryService;
this.emailService = emailService; this.emailService = emailService;
this.messageService = messageService; this.messageService = messageService;
this.objectMapper = objectMapper;
this.clientConnectionService = clientConnectionService;
} }
/** /**
* Authentication endpoint for mobile app users via MQTT. Client sends to * Authentication endpoint for mobile app users via WebSocket. Client sends to
* /server/login with payload { email, password, clientId }. The response is * /server/login with payload { email, password }. Returns the result to the
* sent back to the requesting client on /client/{clientId}/auth * caller (MessagingConfig) which handles session registration and response
* sending.
*/ */
public void handleAppLogin(AppLoginRequest request) { public AppLoginResponse handleAppLogin(AppLoginRequest request) {
AppLoginResponse response;
if (request == null || request.getEmail() == null || request.getPassword() == null if (request == null || request.getEmail() == null || request.getPassword() == null
|| request.getClientId() == null || request.getEmail().isBlank() || request.getPassword().isBlank() || request.getEmail().isBlank() || request.getPassword().isBlank()) {
|| request.getClientId().isBlank()) { return new AppLoginResponse(false, "E-Mail und Passwort sind erforderlich", null);
response = new AppLoginResponse(false, "E-Mail, Passwort und Client-ID sind erforderlich", null, null,
null);
} else {
AppUser user = appUserRepository.findByEmail(request.getEmail());
if (user == null) {
response = new AppLoginResponse(false, "Benutzer nicht gefunden", null, null, null);
} else {
boolean ok = appUserService.verifyPassword(request.getPassword(), user.getPassword());
if (!ok) {
response = new AppLoginResponse(false, "Ungültige Anmeldedaten", null, null, null);
} else {
response = new AppLoginResponse(true, "Anmeldung erfolgreich", null, null, user.getIdAsString());
// Store clientId mapping for this user session
storeClientIdMapping(user.getIdAsString(), request.getClientId());
// Register client for ping/pong monitoring
clientConnectionService.registerClient(request.getClientId(), user.getIdAsString());
}
}
} }
// Send response via MQTT to specific client AppUser user = appUserRepository.findByEmail(request.getEmail());
if (request != null && request.getClientId() != null && !request.getClientId().isBlank()) { if (user == null) {
mqttPublisher.publishAsJson("/client/" + request.getClientId() + "/auth", response, false); return new AppLoginResponse(false, "Benutzer nicht gefunden", null);
} }
boolean ok = appUserService.verifyPassword(request.getPassword(), user.getPassword());
if (!ok) {
return new AppLoginResponse(false, "Ungültige Anmeldedaten", null);
}
return new AppLoginResponse(true, "Anmeldung erfolgreich", user.getIdAsString());
} }
/** /**
* Endpoint to retrieve jobs assigned to a specific app user with related cargo * Retrieve jobs assigned to a specific app user with related cargo items and
* items and tasks. Client sends to /server/{clientId}/jobs/assigned with * tasks. The appUserId is determined from the authenticated WebSocket session.
* payload { appUserId }. The response is sent back to the requesting client on * Response is sent back on /client/jobs.
* /client/{clientId}/jobs
*/ */
public void handleGetAssignedJobs(Map<String, Object> request) { public void handleGetAssignedJobs(String appUserId) {
if (request == null || !request.containsKey("appUserId")) {
return;
}
String appUserId = request.get("appUserId").toString();
if (appUserId == null || appUserId.isBlank()) { if (appUserId == null || appUserId.isBlank()) {
return; return;
} }
// Attempt to get clientId from request (injected from topic) or from stored
// mapping
String clientId = null;
try {
Object cid = request.get("clientId");
if (cid != null)
clientId = cid.toString();
} catch (Exception e) {
// Ignore
}
if (clientId == null || clientId.isBlank()) {
clientId = getClientIdForUserId(appUserId);
}
// Find jobs assigned to this app user
List<Job> assignedJobs = jobRepository.findByAppUser(appUserId); List<Job> assignedJobs = jobRepository.findByAppUser(appUserId);
// For each job, fetch related cargo items and tasks (ordered by task order)
List<JobWithRelatedDataDTO> jobsWithRelatedData = assignedJobs.stream().map(job -> { List<JobWithRelatedDataDTO> jobsWithRelatedData = assignedJobs.stream().map(job -> {
List<CargoItem> cargoItems = cargoItemRepository.findByJobId(job.getId()); List<CargoItem> cargoItems = cargoItemRepository.findByJobId(job.getId());
List<BaseTask> tasks = taskRepository.findByJobIdOrderByTaskOrderAsc(job.getId()); List<BaseTask> tasks = taskRepository.findByJobIdOrderByTaskOrderAsc(job.getId());
return new JobWithRelatedDataDTO(job, cargoItems, tasks); return new JobWithRelatedDataDTO(job, cargoItems, tasks);
}).toList(); }).toList();
// Publish to the requesting client's topic if clientId is known messagingPublisher.publishAsJson(appUserId, "jobs", jobsWithRelatedData);
if (clientId != null && !clientId.isBlank()) {
String topic = "/client/" + clientId + "/jobs";
mqttPublisher.publishAsJson(topic, jobsWithRelatedData, false);
}
} }
/** /**
@@ -460,49 +408,20 @@ public class MessageController {
} }
/** /**
* Store the mapping between userId and clientId for active session * Handle incoming message from a client via WebSocket. Client sends to
*/ * /server/message with payload: { "content": "message payload",
private void storeClientIdMapping(String userId, String clientId) {
userClientIdMapping.put(userId, clientId);
clientIdUserMapping.put(clientId, userId);
}
/**
* Get the clientId for a given userId
*/
private String getClientIdForUserId(String userId) {
return userClientIdMapping.get(userId);
}
/**
* Handle pong response from a client. Client sends to /server/{clientId}/pong
* with payload { timestamp }. Used for connection monitoring.
*/
public void handlePong(Map<String, Object> payload) {
String clientId = payload.get("clientId") != null ? payload.get("clientId").toString() : null;
if (clientId != null && !clientId.isBlank()) {
clientConnectionService.handlePong(clientId);
}
}
/**
* Handle incoming message from a client via MQTT. Client sends to
* /server/{clientId}/message with payload: { "content": "message payload",
* "contentType": "TEXT|IMAGE", "jobId": "optional job id", "jobNumber": * "contentType": "TEXT|IMAGE", "jobId": "optional job id", "jobNumber":
* "optional job number" } * "optional job number" }
* *
* The clientId is extracted from the MQTT topic and represents the AppUser ID. * The appUserId is determined from the authenticated WebSocket session.
* This clientId is stored as the receiver field in the message.
*/ */
public void handleIncomingMessage(Map<String, Object> payload) { public void handleIncomingMessage(String appUserId, Map<String, Object> payload) {
try { try {
String clientId = payload.get("clientId") != null ? payload.get("clientId").toString() : null; if (appUserId == null || appUserId.isBlank()) {
if (clientId == null || clientId.isBlank()) {
return; return;
} }
payload.put("receiver", clientId); payload.put("receiver", appUserId);
ChatMessageInboundPayload inboundPayload = ChatMessageInboundPayload.fromPayload(payload); ChatMessageInboundPayload inboundPayload = ChatMessageInboundPayload.fromPayload(payload);
messageService.receiveMessageFromClient(inboundPayload); messageService.receiveMessageFromClient(inboundPayload);
} catch (Exception e) { } catch (Exception e) {

View File

@@ -1,5 +1,6 @@
package de.assecutor.votianlt.dto; package de.assecutor.votianlt.dto;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
@@ -7,8 +8,8 @@ import lombok.NoArgsConstructor;
@Data @Data
@NoArgsConstructor @NoArgsConstructor
@AllArgsConstructor @AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class AppLoginRequest { public class AppLoginRequest {
private String email; private String email;
private String password; private String password;
private String clientId;
} }

View File

@@ -10,7 +10,6 @@ import lombok.NoArgsConstructor;
public class AppLoginResponse { public class AppLoginResponse {
private boolean success; private boolean success;
private String message; private String message;
private String token; /** Only populated on success, for internal server-side routing. Not sent to client. */
private String userId;
private String appUserId; private String appUserId;
} }

View File

@@ -5,8 +5,8 @@ import java.util.Map;
import org.bson.types.ObjectId; import org.bson.types.ObjectId;
/** /**
* Normalized payload for chat messages sent by mobile clients via MQTT. * Normalized payload for chat messages sent by mobile clients via WebSocket.
* receiver = AppUser ID (clientId) extracted from MQTT topic * receiver = AppUser ID (clientId) extracted from topic
*/ */
public record ChatMessageInboundPayload(String receiver, String content, MessageContentType contentType, ObjectId jobId, public record ChatMessageInboundPayload(String receiver, String content, MessageContentType contentType, ObjectId jobId,
String jobNumber) { String jobNumber) {

View File

@@ -7,8 +7,8 @@ import de.assecutor.votianlt.model.MessageType;
import java.time.LocalDateTime; import java.time.LocalDateTime;
/** /**
* Outbound chat message payload published to MQTT subscribers. The receiver is * Outbound chat message payload published to subscribers. The receiver is
* implicit from the MQTT topic (/client/{appUserId}/message) * implicit from the WebSocket session (/client/message)
*/ */
public record ChatMessageOutboundPayload(String messageId, String content, MessageContentType contentType, public record ChatMessageOutboundPayload(String messageId, String content, MessageContentType contentType,
MessageOrigin origin, MessageType messageType, LocalDateTime createdAt, String jobId, String jobNumber, MessageOrigin origin, MessageType messageType, LocalDateTime createdAt, String jobId, String jobNumber,

View File

@@ -0,0 +1,131 @@
package de.assecutor.votianlt.messaging;
import de.assecutor.votianlt.controller.MessageController;
import de.assecutor.votianlt.dto.AppLoginRequest;
import de.assecutor.votianlt.dto.AppLoginResponse;
import de.assecutor.votianlt.service.ClientConnectionService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.charset.StandardCharsets;
import java.util.Map;
/**
* Configuration for the messaging system. Sets up message routing after
* application startup.
*/
@Configuration
@Slf4j
public class MessagingConfig {
private final WebSocketService webSocketService;
private final ObjectMapper objectMapper;
public MessagingConfig(WebSocketService webSocketService, ObjectMapper objectMapper) {
this.webSocketService = webSocketService;
this.objectMapper = objectMapper;
}
/**
* Set up message routing after application startup.
*/
@EventListener(ApplicationReadyEvent.class)
public void setupMessaging(ApplicationReadyEvent event) {
try {
MessageController messageController = event.getApplicationContext().getBean(MessageController.class);
ClientConnectionService clientConnectionService = event.getApplicationContext()
.getBean(ClientConnectionService.class);
setupSubscriptions(messageController, clientConnectionService);
log.info("[Messaging] Message routing configured");
} catch (Exception e) {
log.error("[Messaging] Failed to initialize: {}", e.getMessage());
throw new RuntimeException("Failed to initialize messaging", e);
}
}
/**
* Setup message subscriptions on the WebSocket service.
*/
private void setupSubscriptions(MessageController messageController,
ClientConnectionService clientConnectionService) {
// Login handler: authenticate and register session
webSocketService.registerMessageHandler("login", (wsSessionId, payload) -> {
handleLoginMessage(wsSessionId, payload, messageController, clientConnectionService);
});
// Task completion handler
webSocketService.registerMessageHandler("task_completed", (appUserId, payload) -> {
handlePayload(payload, payloadMap -> {
String taskType = payloadMap.get("taskType") != null ? payloadMap.get("taskType").toString() : null;
messageController.handleTaskCompleted(payloadMap, taskType);
});
});
// Jobs assigned handler
webSocketService.registerMessageHandler("jobs/assigned", (appUserId, payload) -> {
messageController.handleGetAssignedJobs(appUserId);
});
// Chat message handler
webSocketService.registerMessageHandler("message", (appUserId, payload) -> {
handlePayload(payload, payloadMap -> {
messageController.handleIncomingMessage(appUserId, payloadMap);
});
});
}
/**
* Handle login message. The wsSessionId identifies the pending WebSocket
* session. On success, registers the session under the appUserId and sends an
* auth response. On failure, sends an error response to the pending session.
*/
@SuppressWarnings("unchecked")
private void handleLoginMessage(String wsSessionId, byte[] payload, MessageController messageController,
ClientConnectionService clientConnectionService) {
try {
String json = new String(payload, StandardCharsets.UTF_8);
Map<String, Object> payloadMap = objectMapper.readValue(json, Map.class);
AppLoginRequest loginRequest = objectMapper.convertValue(payloadMap, AppLoginRequest.class);
AppLoginResponse response = messageController.handleAppLogin(loginRequest);
if (response.isSuccess()) {
String appUserId = response.getAppUserId();
webSocketService.registerAuthenticatedSession(wsSessionId, appUserId);
clientConnectionService.registerClient(appUserId);
// Send success response to the now-authenticated session
Map<String, Object> authResponse = Map.of("success", true, "message", response.getMessage());
byte[] responseBytes = objectMapper.writeValueAsBytes(authResponse);
webSocketService.sendToClient(appUserId, "auth", responseBytes);
} else {
// Send failure response to the pending session
Map<String, Object> authResponse = Map.of("success", false, "message", response.getMessage());
byte[] responseBytes = objectMapper.writeValueAsBytes(authResponse);
webSocketService.sendToSessionById(wsSessionId, "/client/auth", responseBytes);
}
} catch (Exception e) {
log.error("[Messaging] Login handling error: {}", e.getMessage());
}
}
/**
* Parse payload bytes to a Map and pass to the consumer.
*/
@SuppressWarnings("unchecked")
private void handlePayload(byte[] payload, java.util.function.Consumer<Map<String, Object>> handler) {
try {
String json = new String(payload, StandardCharsets.UTF_8);
Map<String, Object> payloadMap = objectMapper.readValue(json, Map.class);
handler.accept(payloadMap);
} catch (Exception e) {
log.error("[Messaging] Error parsing payload: {}", e.getMessage());
}
}
}

View File

@@ -0,0 +1,43 @@
package de.assecutor.votianlt.messaging;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
/**
* Publishing helper to send JSON payloads to clients via WebSocket.
*/
public interface MessagingPublisher {
void publishAsJson(String clientId, String messageType, Object payload);
}
@Component
@Slf4j
class MessagingPublisherImpl implements MessagingPublisher {
private final WebSocketService webSocketService;
private final ObjectMapper objectMapper;
public MessagingPublisherImpl(WebSocketService webSocketService, ObjectMapper objectMapper) {
this.webSocketService = webSocketService;
this.objectMapper = objectMapper;
}
@Override
public void publishAsJson(String clientId, String messageType, Object payload) {
try {
String json = objectMapper.writeValueAsString(payload);
byte[] data = json.getBytes(StandardCharsets.UTF_8);
webSocketService.sendToClient(clientId, messageType, data).exceptionally(ex -> {
log.error("[Messaging] Failed to deliver to {}/{}: {}", clientId, messageType, ex.getMessage());
return null;
});
} catch (Exception e) {
log.error("[Messaging] Failed to publish to {}/{}: {}", clientId, messageType, e.getMessage());
}
}
}

View File

@@ -0,0 +1,35 @@
package de.assecutor.votianlt.messaging;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
/**
* WebSocket configuration that registers the WebSocketService as a handler on
* the configured endpoint.
*/
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
private final WebSocketService webSocketService;
@Value("${app.messaging.websocket.path:/ws/messaging}")
private String wsPath;
@Value("${app.messaging.websocket.allowed-origins:*}")
private String allowedOrigins;
public WebSocketConfig(WebSocketService webSocketService) {
this.webSocketService = webSocketService;
}
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(webSocketService, wsPath).setAllowedOrigins(allowedOrigins.split(","))
.addInterceptors(new HttpSessionHandshakeInterceptor());
}
}

View File

@@ -0,0 +1,384 @@
package de.assecutor.votianlt.messaging;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.*;
/**
* WebSocket service for direct bidirectional communication with mobile clients.
*
* Wire Protocol: Each WebSocket message is a JSON document with a "topic" and
* "payload" field:
*
* <pre>
* {
* "topic": "/server/login",
* "payload": { ... }
* }
* </pre>
*
* Topic Structure:
* <ul>
* <li>Server to Client: /client/{messageType}</li>
* <li>Client to Server: /server/{messageType}</li>
* <li>Login (special): /server/login (unauthenticated)</li>
* </ul>
*/
@Component
@Slf4j
public class WebSocketService extends TextWebSocketHandler {
@FunctionalInterface
public interface MessageHandler {
void onMessageReceived(String clientId, byte[] payload);
}
private static final String TOPIC_TO_CLIENT = "/client/%s";
private static final long PENDING_SESSION_TIMEOUT_MS = 30_000;
private final ObjectMapper objectMapper;
// appUserId -> WebSocketSession
private final ConcurrentHashMap<String, WebSocketSession> clientSessions = new ConcurrentHashMap<>();
// sessionId -> appUserId (reverse lookup for cleanup on disconnect)
private final ConcurrentHashMap<String, String> sessionToClient = new ConcurrentHashMap<>();
// sessionId -> PendingSession (connected but not yet logged in)
private final ConcurrentHashMap<String, PendingSession> pendingSessions = new ConcurrentHashMap<>();
private final Map<String, MessageHandler> messageHandlers = new ConcurrentHashMap<>();
private volatile boolean initialized = false;
private ScheduledExecutorService pendingSessionCleanup;
public WebSocketService(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
// ==========================================
// Lifecycle
// ==========================================
@PostConstruct
public void init() {
pendingSessionCleanup = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "ws-pending-cleanup");
t.setDaemon(true);
return t;
});
pendingSessionCleanup.scheduleAtFixedRate(this::cleanupPendingSessions, 30, 30, TimeUnit.SECONDS);
initialized = true;
log.info("[WebSocket] Service initialized on endpoint /ws/messaging");
}
@PreDestroy
public void shutdown() {
if (pendingSessionCleanup != null) {
pendingSessionCleanup.shutdownNow();
}
for (var entry : clientSessions.entrySet()) {
try {
WebSocketSession session = entry.getValue();
if (session.isOpen()) {
session.close(CloseStatus.GOING_AWAY);
}
} catch (Exception e) {
log.warn("[WebSocket] Error closing session for client {}: {}", entry.getKey(), e.getMessage());
}
}
for (var entry : pendingSessions.entrySet()) {
try {
if (entry.getValue().session.isOpen()) {
entry.getValue().session.close(CloseStatus.GOING_AWAY);
}
} catch (Exception ignored) {
}
}
clientSessions.clear();
sessionToClient.clear();
pendingSessions.clear();
messageHandlers.clear();
initialized = false;
log.info("[WebSocket] Service shut down");
}
// ==========================================
// Public API
// ==========================================
public CompletableFuture<Void> sendToClient(String clientId, String messageType, byte[] payload) {
WebSocketSession session = clientSessions.get(clientId);
if (session == null || !session.isOpen()) {
return CompletableFuture
.failedFuture(new IOException("No active WebSocket session for client: " + clientId));
}
try {
String topic = String.format(TOPIC_TO_CLIENT, messageType);
String payloadJson = new String(payload, StandardCharsets.UTF_8);
ObjectNode wireMessage = objectMapper.createObjectNode();
wireMessage.put("topic", topic);
wireMessage.set("payload", objectMapper.readTree(payloadJson));
String wireJson = objectMapper.writeValueAsString(wireMessage);
log.info("[WebSocket OUT] {} -> {}", topic, wireJson);
sendToSession(session, wireJson);
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
log.error("[WebSocket] Failed to send to client {}: {}", clientId, e.getMessage());
return CompletableFuture.failedFuture(new IOException("Failed to send WebSocket message", e));
}
}
public void registerMessageHandler(String messageType, MessageHandler handler) {
messageHandlers.put(messageType, handler);
log.debug("[WebSocket] Registered handler for messageType: {}", messageType);
}
public boolean isConnected() {
return initialized;
}
public boolean isClientConnected(String clientId) {
WebSocketSession session = clientSessions.get(clientId);
return session != null && session.isOpen();
}
public int getConnectedClientCount() {
return clientSessions.size();
}
// ==========================================
// WebSocket handler methods
// ==========================================
@Override
public void afterConnectionEstablished(WebSocketSession session) {
pendingSessions.put(session.getId(), new PendingSession(session, Instant.now()));
log.info("[WebSocket] New connection: sessionId={}, remote={}", session.getId(), session.getRemoteAddress());
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
try {
String json = message.getPayload();
JsonNode wireMessage = objectMapper.readTree(json);
JsonNode topicNode = wireMessage.get("topic");
JsonNode payloadNode = wireMessage.get("payload");
if (topicNode == null || payloadNode == null) {
log.warn("[WebSocket] Invalid message format (missing topic or payload): {}", json);
return;
}
String topic = topicNode.asText();
byte[] payloadBytes = objectMapper.writeValueAsBytes(payloadNode);
log.info("[WebSocket IN] {} <- {}", topic, json);
// Login message (special: unauthenticated)
if ("/server/login".equals(topic)) {
handleLoginMessage(session, payloadBytes);
return;
}
// Regular client message: /server/{messageType}
if (topic.startsWith("/server/")) {
// Verify session is authenticated
String appUserId = sessionToClient.get(session.getId());
if (appUserId == null) {
log.warn("[WebSocket] Unauthenticated session {} tried to send: {}", session.getId(), topic);
return;
}
handleClientMessage(topic, appUserId, payloadBytes);
}
} catch (Exception e) {
log.error("[WebSocket] Error handling message: {}", e.getMessage(), e);
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
String sessionId = session.getId();
// Remove from pending sessions
pendingSessions.remove(sessionId);
// Remove from authenticated sessions
String clientId = sessionToClient.remove(sessionId);
if (clientId != null) {
clientSessions.remove(clientId, session);
log.info("[WebSocket] Client disconnected: clientId={}, reason={}", clientId, status);
} else {
log.info("[WebSocket] Unauthenticated session closed: sessionId={}", sessionId);
}
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) {
log.error("[WebSocket] Transport error for session {}: {}", session.getId(), exception.getMessage());
}
// ==========================================
// Internal message routing
// ==========================================
private void handleLoginMessage(WebSocketSession session, byte[] payloadBytes) {
MessageHandler handler = messageHandlers.get("login");
if (handler != null) {
handler.onMessageReceived(session.getId(), payloadBytes);
}
}
/**
* Register a pending session as authenticated under the given appUserId.
* Called by MessagingConfig after successful login.
*/
public void registerAuthenticatedSession(String wsSessionId, String appUserId) {
PendingSession pending = pendingSessions.get(wsSessionId);
if (pending == null) {
log.warn("[WebSocket] No pending session for wsSessionId={}", wsSessionId);
return;
}
registerClientSession(appUserId, pending.session());
}
/**
* Send a wire-format message directly to a session by its WebSocket sessionId.
* Used for sending login responses to pending (not yet authenticated) sessions.
*/
public void sendToSessionById(String wsSessionId, String topic, byte[] payload) {
try {
// Check pending sessions first
PendingSession pending = pendingSessions.get(wsSessionId);
WebSocketSession session = pending != null ? pending.session() : null;
// Fallback: check authenticated sessions via reverse lookup
if (session == null) {
String appUserId = sessionToClient.get(wsSessionId);
if (appUserId != null) {
session = clientSessions.get(appUserId);
}
}
if (session == null || !session.isOpen()) {
log.warn("[WebSocket] Cannot send to session {}: not found or closed", wsSessionId);
return;
}
String payloadJson = new String(payload, StandardCharsets.UTF_8);
ObjectNode wireMessage = objectMapper.createObjectNode();
wireMessage.put("topic", topic);
wireMessage.set("payload", objectMapper.readTree(payloadJson));
String wireJson = objectMapper.writeValueAsString(wireMessage);
log.info("[WebSocket OUT] {} -> {}", topic, wireJson);
sendToSession(session, wireJson);
} catch (Exception e) {
log.error("[WebSocket] Error sending to session {}: {}", wsSessionId, e.getMessage());
}
}
private void handleClientMessage(String topic, String appUserId, byte[] payload) {
String[] parts = topic.split("/");
// Handle /server/{messageType} where messageType can contain slashes
if (parts.length >= 3) {
String messageType = String.join("/", Arrays.copyOfRange(parts, 2, parts.length));
MessageHandler handler = messageHandlers.get(messageType);
if (handler != null) {
handler.onMessageReceived(appUserId, payload);
} else {
log.warn("[WebSocket] No handler registered for messageType: {}", messageType);
}
}
}
// ==========================================
// Session management
// ==========================================
private void registerClientSession(String clientId, WebSocketSession session) {
// Close old session if same clientId reconnects
WebSocketSession oldSession = clientSessions.put(clientId, session);
if (oldSession != null && oldSession.isOpen() && !oldSession.getId().equals(session.getId())) {
try {
String oldSessionId = oldSession.getId();
sessionToClient.remove(oldSessionId);
oldSession.close(CloseStatus.NORMAL.withReason("Replaced by new connection"));
log.info("[WebSocket] Closed old session for clientId={} (replaced)", clientId);
} catch (IOException e) {
log.warn("[WebSocket] Error closing old session for client {}: {}", clientId, e.getMessage());
}
}
sessionToClient.put(session.getId(), clientId);
pendingSessions.remove(session.getId());
log.info("[WebSocket] Client registered: clientId={}, sessionId={}", clientId, session.getId());
}
private void cleanupPendingSessions() {
Instant cutoff = Instant.now().minusMillis(PENDING_SESSION_TIMEOUT_MS);
pendingSessions.entrySet().removeIf(entry -> {
if (entry.getValue().connectedAt.isBefore(cutoff)) {
try {
WebSocketSession session = entry.getValue().session;
if (session.isOpen()) {
session.close(CloseStatus.POLICY_VIOLATION.withReason("Login timeout"));
}
log.info("[WebSocket] Closed pending session (login timeout): sessionId={}", entry.getKey());
} catch (IOException e) {
log.warn("[WebSocket] Error closing pending session: {}", e.getMessage());
}
return true;
}
return false;
});
}
// ==========================================
// Utility methods
// ==========================================
private void sendToSession(WebSocketSession session, String message) throws IOException {
synchronized (session) {
if (session.isOpen()) {
session.sendMessage(new TextMessage(message));
}
}
}
// ==========================================
// Internal types
// ==========================================
private record PendingSession(WebSocketSession session, Instant connectedAt) {
}
}

View File

@@ -1,242 +0,0 @@
package de.assecutor.votianlt.messaging.config;
import de.assecutor.votianlt.controller.MessageController;
import de.assecutor.votianlt.dto.AppLoginRequest;
import de.assecutor.votianlt.messaging.delivery.MessageDeliveryService;
import de.assecutor.votianlt.messaging.model.AcknowledgmentMessage;
import de.assecutor.votianlt.messaging.model.MessageEnvelope;
import de.assecutor.votianlt.messaging.plugin.*;
import de.assecutor.votianlt.messaging.plugin.mqtt.MqttMessagingPlugin;
import de.assecutor.votianlt.service.ClientConnectionService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.charset.StandardCharsets;
import java.util.Map;
/**
* Configuration for the plugin-based messaging system. Initializes the selected
* plugin and sets up message routing.
*/
@Configuration
@Slf4j
public class PluginMessagingConfig {
@Value("${app.messaging.plugin.type:mqtt}")
private String pluginType;
@Value("${app.messaging.plugin.mqtt.broker.host:mqtt-2.assecutor.de}")
private String mqttBrokerHost;
@Value("${app.messaging.plugin.mqtt.broker.port:1883}")
private int mqttBrokerPort;
@Value("${app.messaging.plugin.mqtt.username:app}")
private String mqttUsername;
@Value("${app.messaging.plugin.mqtt.password:apppwd}")
private String mqttPassword;
@Value("${app.messaging.plugin.mqtt.client.id:votianlt-server}")
private String mqttClientId;
private final PluginManager pluginManager;
private final ObjectMapper objectMapper;
public PluginMessagingConfig(PluginManager pluginManager, ObjectMapper objectMapper) {
this.pluginManager = pluginManager;
this.objectMapper = objectMapper;
}
/**
* Initialize the messaging plugin after application startup. This method is
* called after all beans are created, so we can safely access
* MessageDeliveryService.
*/
@EventListener(ApplicationReadyEvent.class)
public void initializePlugin(ApplicationReadyEvent event) {
try {
MessagingPlugin plugin = createPlugin(pluginType);
PluginConfig config = createPluginConfig(pluginType);
// Get beans from context (after all beans are created)
MessageDeliveryService deliveryService = event.getApplicationContext()
.getBean(MessageDeliveryService.class);
MessageController messageController = event.getApplicationContext().getBean(MessageController.class);
ClientConnectionService clientConnectionService = event.getApplicationContext()
.getBean(ClientConnectionService.class);
// Set up a listener to subscribe when connected
pluginManager.addStateListener(stateEvent -> {
if (stateEvent.isConnected()) {
try {
setupSubscriptions(deliveryService, messageController, clientConnectionService);
} catch (Exception e) {
log.error("[MQTT] Error setting up subscriptions: {}", e.getMessage());
}
} else if (stateEvent.getState() == ConnectionStateEvent.ConnectionState.DISCONNECTED) {
log.info("[MQTT] Disconnected from broker");
} else if (stateEvent.getState() == ConnectionStateEvent.ConnectionState.ERROR) {
log.error("[MQTT] Connection error: {}", stateEvent.getErrorMessage());
}
});
// Activate plugin
pluginManager.activatePlugin(plugin, config);
} catch (Exception e) {
log.error("[MQTT] Failed to initialize: {}", e.getMessage());
throw new RuntimeException("Failed to initialize messaging plugin", e);
}
}
/**
* Create a plugin instance based on the plugin type.
*/
private MessagingPlugin createPlugin(String type) {
return switch (type.toLowerCase()) {
case "mqtt" -> new MqttMessagingPlugin();
// Add more plugin types here in the future
// case "websocket" -> new WebSocketMessagingPlugin();
// case "grpc" -> new GrpcMessagingPlugin();
default -> throw new IllegalArgumentException("Unknown plugin type: " + type);
};
}
/**
* Create plugin configuration based on the plugin type.
*/
private PluginConfig createPluginConfig(String type) {
PluginConfig config = new PluginConfig();
switch (type.toLowerCase()) {
case "mqtt" -> {
config.setProperty("broker.host", mqttBrokerHost);
config.setProperty("broker.port", mqttBrokerPort);
config.setProperty("username", mqttUsername);
config.setProperty("password", mqttPassword);
config.setProperty("client.id", mqttClientId);
config.setProperty("auto.reconnect", true);
config.setProperty("clean.start", true);
}
// Add more plugin configurations here
default -> throw new IllegalArgumentException("Unknown plugin type: " + type);
}
return config;
}
/**
* Setup message subscriptions using the new plugin API.
*/
private void setupSubscriptions(MessageDeliveryService deliveryService, MessageController messageController,
ClientConnectionService clientConnectionService) {
try {
// Register ACK handler
pluginManager.registerAckHandler((messageId, payload) -> {
try {
String json = new String(payload, StandardCharsets.UTF_8);
MessageEnvelope envelope = objectMapper.readValue(json, MessageEnvelope.class);
AcknowledgmentMessage ack = objectMapper.convertValue(envelope.getPayload(),
AcknowledgmentMessage.class);
deliveryService.handleAcknowledgment(ack);
} catch (Exception e) {
// Ignore ACK handling errors
}
});
// Register message handlers for different message types
String[] messageTypes = { "task_completed", "jobs/assigned", "message", "login", "pong" };
for (String messageType : messageTypes) {
pluginManager.registerMessageHandler(messageType,
(clientId, payload) -> handleEnvelopedMessage(clientId, payload, deliveryService,
messageController, clientConnectionService));
}
} catch (Exception e) {
log.error("[MQTT] Error setting up subscriptions: {}", e.getMessage());
throw new RuntimeException("Failed to setup subscriptions", e);
}
}
/**
* Handle incoming enveloped message. Supports both new envelope format and
* legacy format for backwards compatibility.
*/
private void handleEnvelopedMessage(String clientId, byte[] payload, MessageDeliveryService deliveryService,
MessageController messageController, ClientConnectionService clientConnectionService) {
try {
String json = new String(payload, StandardCharsets.UTF_8);
// Try to parse as envelope first
try {
MessageEnvelope envelope = objectMapper.readValue(json, MessageEnvelope.class);
if (envelope.getMessageId() != null && envelope.getTopic() != null) {
deliveryService.handleIncomingMessage(envelope);
return;
}
} catch (Exception e) {
// Not a valid envelope, try legacy format
}
// Handle legacy format (direct payload without envelope)
handleLegacyMessage(clientId, json, messageController, clientConnectionService);
} catch (Exception e) {
// Ignore message handling errors
}
}
/**
* Handle legacy message format (without envelope wrapper). This supports older
* clients that don't use the envelope format.
*/
@SuppressWarnings("unchecked")
private void handleLegacyMessage(String clientId, String json, MessageController messageController,
ClientConnectionService clientConnectionService) {
try {
Map<String, Object> payload = objectMapper.readValue(json, Map.class);
// Check if this is a login request (has email, password, clientId)
if (payload.containsKey("email") && payload.containsKey("password") && payload.containsKey("clientId")) {
AppLoginRequest loginRequest = objectMapper.convertValue(payload, AppLoginRequest.class);
messageController.handleAppLogin(loginRequest);
return;
}
// Check if this is a pong response
if ("pong".equals(payload.get("type"))) {
String pongClientId = clientId != null ? clientId : (String) payload.get("clientId");
if (pongClientId != null) {
clientConnectionService.handlePong(pongClientId);
}
return;
}
// Check if this is a task completion
if (payload.containsKey("taskType") || payload.containsKey("taskId")) {
String taskType = payload.get("taskType") != null ? payload.get("taskType").toString() : null;
messageController.handleTaskCompleted(payload, taskType);
return;
}
// Check if this is a jobs/assigned request
if (payload.containsKey("appUserId")) {
if (clientId != null) {
payload.put("clientId", clientId);
}
messageController.handleGetAssignedJobs(payload);
return;
}
} catch (Exception e) {
// Ignore legacy message handling errors
}
}
}

View File

@@ -1,147 +0,0 @@
package de.assecutor.votianlt.messaging.delivery;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.assecutor.votianlt.controller.MessageController;
import de.assecutor.votianlt.dto.AppLoginRequest;
import de.assecutor.votianlt.messaging.model.MessageEnvelope;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* Handles acknowledgments and routes incoming messages to application layer.
* Acts as a bridge between the messaging layer and the application logic.
*/
@Component
@Slf4j
public class AcknowledgmentHandler {
private final MessageController messageController;
private final ObjectMapper objectMapper;
public AcknowledgmentHandler(@Lazy MessageController messageController, ObjectMapper objectMapper) {
this.messageController = messageController;
this.objectMapper = objectMapper;
}
/**
* Route incoming message envelope to appropriate application handler. Unwraps
* the envelope and delegates to MessageController.
*/
public void routeIncomingMessage(MessageEnvelope envelope) {
try {
String topic = envelope.getTopic();
Object payload = envelope.getPayload();
log.debug("[AckHandler] Routing message {} on topic {}", envelope.getMessageId(), topic);
// Convert payload to Map for routing
Map<String, Object> payloadMap = objectMapper.convertValue(payload,
new TypeReference<Map<String, Object>>() {
});
// Route based on topic pattern
if (topic.matches("/server/.+/task_completed")) {
handleTaskCompleted(payloadMap);
} else if (topic.matches("/server/.+/jobs/assigned")) {
handleJobsAssigned(topic, payloadMap);
} else if (topic.equals("/server/login")) {
handleLogin(payloadMap);
} else if (topic.matches("/server/.+/message")) {
handleIncomingMessage(topic, payloadMap);
} else if (topic.matches("/server/.+/pong")) {
handlePong(topic, payloadMap);
} else {
log.debug("[AckHandler] No route for topic {}", topic);
}
} catch (Exception e) {
log.error("[AckHandler] Error routing message {}: {}", envelope.getMessageId(), e.getMessage(), e);
}
}
/**
* Handle task completion message
*/
private void handleTaskCompleted(Map<String, Object> payload) {
try {
Object tt = payload.get("taskType");
String taskType = tt != null ? tt.toString() : null;
messageController.handleTaskCompleted(payload, taskType);
} catch (Exception e) {
log.error("[AckHandler] Error handling task_completed: {}", e.getMessage(), e);
}
}
/**
* Handle jobs assigned request
*/
private void handleJobsAssigned(String topic, Map<String, Object> payload) {
try {
// Extract clientId from topic: /server/{clientId}/jobs/assigned
String[] parts = topic.split("/");
String clientId = parts.length > 2 ? parts[2] : null;
if (clientId != null && !clientId.isBlank()) {
payload.put("clientId", clientId);
} else {
log.warn("[AckHandler] Couldn't extract clientId from topic {} for jobs/assigned", topic);
}
messageController.handleGetAssignedJobs(payload);
} catch (Exception e) {
log.error("[AckHandler] Error handling jobs/assigned: {}", e.getMessage(), e);
}
}
/**
* Handle login request Topic: /server/login
*/
private void handleLogin(Map<String, Object> payload) {
try {
AppLoginRequest req = objectMapper.convertValue(payload, AppLoginRequest.class);
messageController.handleAppLogin(req);
} catch (Exception e) {
log.error("[AckHandler] Error handling login: {}", e.getMessage(), e);
}
}
/**
* Handle incoming chat message
*/
private void handleIncomingMessage(String topic, Map<String, Object> payload) {
try {
// Extract clientId from topic: /server/{clientId}/message
String[] parts = topic.split("/");
String clientId = parts.length > 2 ? parts[2] : null;
if (clientId != null && !clientId.isBlank()) {
payload.put("clientId", clientId);
} else {
log.warn("[AckHandler] Couldn't extract clientId from topic {} for message", topic);
}
messageController.handleIncomingMessage(payload);
} catch (Exception e) {
log.error("[AckHandler] Error handling incoming message: {}", e.getMessage(), e);
}
}
/**
* Handle pong response from client for connection monitoring
*/
private void handlePong(String topic, Map<String, Object> payload) {
try {
// Extract clientId from topic: /server/{clientId}/pong
String[] parts = topic.split("/");
String clientId = parts.length > 2 ? parts[2] : null;
if (clientId != null && !clientId.isBlank()) {
payload.put("clientId", clientId);
} else {
log.warn("[AckHandler] Couldn't extract clientId from topic {} for pong", topic);
}
messageController.handlePong(payload);
} catch (Exception e) {
log.error("[AckHandler] Error handling pong: {}", e.getMessage(), e);
}
}
}

View File

@@ -1,61 +0,0 @@
package de.assecutor.votianlt.messaging.delivery;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
/**
* Configuration for message delivery service.
*/
@Configuration
@ConfigurationProperties(prefix = "app.messaging.delivery")
@Data
public class DeliveryConfig {
/**
* Maximum number of retry attempts for failed deliveries
*/
private int maxRetries = 3;
/**
* Initial delay before first retry
*/
private Duration retryInitialDelay = Duration.ofSeconds(5);
/**
* Maximum delay between retries
*/
private Duration retryMaxDelay = Duration.ofMinutes(5);
/**
* Backoff multiplier for exponential backoff
*/
private double retryBackoffMultiplier = 2.0;
/**
* Timeout for waiting for acknowledgment
*/
private Duration ackTimeout = Duration.ofSeconds(30);
/**
* Default message expiry duration
*/
private Duration messageExpiry = Duration.ofHours(24);
/**
* Interval for cleanup task (in minutes)
*/
private int cleanupIntervalMinutes = 60;
/**
* Interval for retry task (in seconds)
*/
private int retryIntervalSeconds = 30;
/**
* Retention period for acknowledged deliveries (in days)
*/
private int acknowledgedRetentionDays = 7;
}

View File

@@ -1,135 +0,0 @@
package de.assecutor.votianlt.messaging.delivery;
import de.assecutor.votianlt.messaging.model.*;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
/**
* Service for reliable message delivery with acknowledgment tracking. Provides
* guaranteed delivery with retry mechanism and acknowledgment handling.
*/
public interface MessageDeliveryService {
/**
* Send a message to a specific client with delivery tracking and
* acknowledgment.
*
* @param clientId
* The target client identifier
* @param messageType
* The type of message (e.g., "jobs", "message", "auth", "task")
* @param payload
* The message payload (will be serialized to JSON)
* @param options
* Delivery options (retries, timeout, etc.)
* @return CompletableFuture with delivery receipt
*/
CompletableFuture<DeliveryReceipt> sendToClient(String clientId, String messageType, Object payload,
DeliveryOptions options);
/**
* Send a message to a specific client with default delivery options.
*
* @param clientId
* The target client identifier
* @param messageType
* The type of message
* @param payload
* The message payload
* @return CompletableFuture with delivery receipt
*/
default CompletableFuture<DeliveryReceipt> sendToClient(String clientId, String messageType, Object payload) {
return sendToClient(clientId, messageType, payload, DeliveryOptions.standard());
}
/**
* Send a message with delivery tracking and acknowledgment.
*
* @deprecated Use
* {@link #sendToClient(String, String, Object, DeliveryOptions)}
* instead
*
* @param topic
* The destination topic
* @param payload
* The message payload (will be serialized to JSON)
* @param options
* Delivery options (retries, timeout, etc.)
* @return CompletableFuture with delivery receipt
*/
@Deprecated
CompletableFuture<DeliveryReceipt> sendMessage(String topic, Object payload, DeliveryOptions options);
/**
* Send a message with default delivery options.
*
* @deprecated Use {@link #sendToClient(String, String, Object)} instead
*
* @param topic
* The destination topic
* @param payload
* The message payload
* @return CompletableFuture with delivery receipt
*/
@Deprecated
default CompletableFuture<DeliveryReceipt> sendMessage(String topic, Object payload) {
return sendMessage(topic, payload, DeliveryOptions.standard());
}
/**
* Handle incoming message envelope from transport layer. Extracts payload and
* routes to application layer.
*
* @param envelope
* The received message envelope
*/
void handleIncomingMessage(MessageEnvelope envelope);
/**
* Handle acknowledgment from client. Updates delivery status and removes from
* pending queue.
*
* @param ack
* The acknowledgment message
*/
void handleAcknowledgment(AcknowledgmentMessage ack);
/**
* Get the current delivery status for a message.
*
* @param messageId
* The message ID
* @return Optional containing the delivery status, or empty if not found
*/
Optional<DeliveryStatus> getDeliveryStatus(String messageId);
/**
* Get detailed pending delivery information.
*
* @param messageId
* The message ID
* @return Optional containing the pending delivery, or empty if not found
*/
Optional<PendingDelivery> getPendingDelivery(String messageId);
/**
* Retry all pending deliveries that are ready for retry. Called by scheduled
* task.
*/
void retryPendingDeliveries();
/**
* Retry pending deliveries for a specific client. Called when a client
* reconnects.
*
* @param clientId
* The client identifier
*/
void retryPendingDeliveriesForClient(String clientId);
/**
* Clean up expired and completed deliveries. Called by scheduled task.
*/
void cleanupOldDeliveries();
}

View File

@@ -1,403 +0,0 @@
package de.assecutor.votianlt.messaging.delivery;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.assecutor.votianlt.messaging.model.*;
import de.assecutor.votianlt.messaging.plugin.PluginManager;
import de.assecutor.votianlt.messaging.plugin.SendOptions;
import de.assecutor.votianlt.repository.PendingDeliveryRepository;
import de.assecutor.votianlt.service.ClientConnectionService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Implementation of MessageDeliveryService with reliable delivery guarantees.
*/
@Service
@Slf4j
public class MessageDeliveryServiceImpl implements MessageDeliveryService {
@Value("${app.messaging.delivery.ack-retry-interval-seconds:5}")
private int ackRetryIntervalSeconds;
@Value("${app.messaging.delivery.ack-max-retries:4}")
private int ackMaxRetries;
private final PluginManager pluginManager;
private final PendingDeliveryRepository pendingDeliveryRepository;
private final AcknowledgmentHandler acknowledgmentHandler;
private final DeliveryConfig config;
private final ObjectMapper objectMapper;
private final ClientConnectionService clientConnectionService;
private ScheduledExecutorService retryScheduler;
public MessageDeliveryServiceImpl(PluginManager pluginManager, PendingDeliveryRepository pendingDeliveryRepository,
AcknowledgmentHandler acknowledgmentHandler, DeliveryConfig config, ObjectMapper objectMapper,
ClientConnectionService clientConnectionService) {
this.pluginManager = pluginManager;
this.pendingDeliveryRepository = pendingDeliveryRepository;
this.acknowledgmentHandler = acknowledgmentHandler;
this.config = config;
this.objectMapper = objectMapper;
this.clientConnectionService = clientConnectionService;
}
@PostConstruct
public void startRetryScheduler() {
retryScheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "message-retry-scheduler");
t.setDaemon(true);
return t;
});
retryScheduler.scheduleAtFixedRate(this::retryPendingDeliveries, ackRetryIntervalSeconds,
ackRetryIntervalSeconds, TimeUnit.SECONDS);
log.info("[MessageDelivery] Started retry scheduler (interval: {}s, max retries: {})", ackRetryIntervalSeconds,
ackMaxRetries);
}
@PreDestroy
public void stopRetryScheduler() {
if (retryScheduler != null) {
retryScheduler.shutdown();
try {
if (!retryScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
retryScheduler.shutdownNow();
}
} catch (InterruptedException e) {
retryScheduler.shutdownNow();
Thread.currentThread().interrupt();
}
log.info("[MessageDelivery] Stopped retry scheduler");
}
}
@Override
public CompletableFuture<DeliveryReceipt> sendToClient(String clientId, String messageType, Object payload,
DeliveryOptions options) {
try {
String destination = clientId + "/" + messageType;
final LocalDateTime expiresAt = options.calculateExpiryTime();
MessageEnvelope envelope = new MessageEnvelope(destination, payload, options.isRequiresAck(), expiresAt);
final String messageId = envelope.getMessageId();
String json = objectMapper.writeValueAsString(envelope);
byte[] envelopeData = json.getBytes(StandardCharsets.UTF_8);
if (options.isRequiresAck()) {
PendingDelivery pending = new PendingDelivery(messageId, destination, envelopeData,
options.getMaxRetries(), expiresAt);
pendingDeliveryRepository.save(pending);
}
SendOptions sendOptions = SendOptions.builder().qos(options.getQos()).retained(options.isRetained())
.build();
final boolean requiresAck = options.isRequiresAck();
final Duration ackTimeout = options.getAckTimeout();
log.info("[MessageDelivery] Sending message {} to client {} (type: {})", messageId, clientId, messageType);
return pluginManager.sendToClient(clientId, messageType, envelopeData, sendOptions).thenApply(v -> {
if (requiresAck) {
updatePendingDeliveryAfterSend(messageId, ackTimeout);
}
return DeliveryReceipt.submitted(messageId, destination, expiresAt);
}).exceptionally(ex -> {
log.error("[MessageDelivery] Failed to send message {}: {}", messageId, ex.getMessage());
if (requiresAck) {
markPendingDeliveryFailed(messageId, ex.getMessage());
}
return DeliveryReceipt.failed(messageId, destination);
});
} catch (Exception e) {
log.error("[MessageDelivery] Error creating message for client {}: {}", clientId, e.getMessage());
return CompletableFuture.completedFuture(DeliveryReceipt.failed("error", clientId + "/" + messageType));
}
}
@Override
@Deprecated
public CompletableFuture<DeliveryReceipt> sendMessage(String topic, Object payload, DeliveryOptions options) {
String[] parts = topic.split("/");
if (parts.length >= 4 && parts[1].equals("client")) {
String clientId = parts[2];
String messageType = parts[3];
return sendToClient(clientId, messageType, payload, options);
}
log.warn("[MessageDelivery] Using deprecated sendMessage with topic: {}", topic);
return CompletableFuture.completedFuture(DeliveryReceipt.failed("error", topic));
}
@Override
public void handleIncomingMessage(MessageEnvelope envelope) {
try {
log.info("[MessageDelivery] Received message {} on topic {}", envelope.getMessageId(), envelope.getTopic());
if (envelope.isRequiresAck()) {
sendAcknowledgment(envelope);
}
acknowledgmentHandler.routeIncomingMessage(envelope);
} catch (Exception e) {
log.error("[MessageDelivery] Error handling incoming message {}: {}", envelope.getMessageId(),
e.getMessage());
}
}
@Override
public void handleAcknowledgment(AcknowledgmentMessage ack) {
try {
log.info("[MessageDelivery] Received ACK for message {} (status: {})", ack.getMessageId(), ack.getStatus());
Optional<PendingDelivery> pendingOpt = pendingDeliveryRepository.findByMessageId(ack.getMessageId());
if (pendingOpt.isEmpty()) {
return;
}
PendingDelivery pending = pendingOpt.get();
switch (ack.getStatus()) {
case RECEIVED, PROCESSED -> {
pendingDeliveryRepository.delete(pending);
}
case FAILED -> {
pending.markAsFailed(ack.getErrorMessage());
pendingDeliveryRepository.save(pending);
log.warn("[MessageDelivery] Message {} failed on client: {}", ack.getMessageId(),
ack.getErrorMessage());
}
}
} catch (Exception e) {
log.error("[MessageDelivery] Error handling ACK for message {}: {}", ack.getMessageId(), e.getMessage());
}
}
@Override
public Optional<DeliveryStatus> getDeliveryStatus(String messageId) {
return pendingDeliveryRepository.findByMessageId(messageId).map(PendingDelivery::getStatus);
}
@Override
public Optional<PendingDelivery> getPendingDelivery(String messageId) {
return pendingDeliveryRepository.findByMessageId(messageId);
}
@Override
public void retryPendingDeliveries() {
try {
List<PendingDelivery> readyForRetry = pendingDeliveryRepository
.findByStatusAndNextRetryAtBefore(DeliveryStatus.SENT, LocalDateTime.now());
if (readyForRetry.isEmpty()) {
return;
}
for (PendingDelivery pending : readyForRetry) {
retryDelivery(pending);
}
} catch (Exception e) {
log.error("[MessageDelivery] Error during retry process: {}", e.getMessage());
}
}
@Override
public void retryPendingDeliveriesForClient(String clientId) {
if (clientId == null || clientId.isBlank()) {
return;
}
try {
List<PendingDelivery> pendingDeliveries = pendingDeliveryRepository
.findByStatusIn(List.of(DeliveryStatus.PENDING, DeliveryStatus.SENT));
for (PendingDelivery pending : pendingDeliveries) {
String topic = pending.getTopic();
if (topic != null && topic.startsWith(clientId + "/")) {
retryDelivery(pending);
}
}
} catch (Exception e) {
log.error("[MessageDelivery] Error retrying deliveries for client {}: {}", clientId, e.getMessage());
}
}
@Override
public void cleanupOldDeliveries() {
try {
LocalDateTime cutoff = LocalDateTime.now().minus(Duration.ofDays(7));
List<PendingDelivery> oldAcknowledged = pendingDeliveryRepository
.findByStatusAndAcknowledgedAtBefore(DeliveryStatus.ACKNOWLEDGED, cutoff);
if (!oldAcknowledged.isEmpty()) {
pendingDeliveryRepository.deleteAll(oldAcknowledged);
}
List<PendingDelivery> expired = pendingDeliveryRepository.findByStatusInAndExpiresAtBefore(
List.of(DeliveryStatus.PENDING, DeliveryStatus.SENT), LocalDateTime.now());
for (PendingDelivery pending : expired) {
pending.markAsExpired();
pendingDeliveryRepository.save(pending);
}
} catch (Exception e) {
log.error("[MessageDelivery] Error during cleanup: {}", e.getMessage());
}
}
private void updatePendingDeliveryAfterSend(String messageId, Duration ackTimeout) {
try {
Optional<PendingDelivery> pendingOpt = pendingDeliveryRepository.findByMessageId(messageId);
if (pendingOpt.isPresent()) {
PendingDelivery pending = pendingOpt.get();
LocalDateTime nextRetry = LocalDateTime.now().plus(ackTimeout);
pending.markAsSent(nextRetry);
pendingDeliveryRepository.save(pending);
}
} catch (Exception e) {
log.error("[MessageDelivery] Error updating pending delivery {}: {}", messageId, e.getMessage());
}
}
private void markPendingDeliveryFailed(String messageId, String reason) {
try {
Optional<PendingDelivery> pendingOpt = pendingDeliveryRepository.findByMessageId(messageId);
if (pendingOpt.isPresent()) {
PendingDelivery pending = pendingOpt.get();
pending.markAsFailed(reason);
pendingDeliveryRepository.save(pending);
}
} catch (Exception e) {
log.error("[MessageDelivery] Error marking delivery as failed {}: {}", messageId, e.getMessage());
}
}
private void retryDelivery(PendingDelivery pending) {
try {
if (pending.isExpired()) {
pending.markAsExpired();
pendingDeliveryRepository.save(pending);
return;
}
if (pending.hasReachedMaxRetries()) {
pending.markAsFailed("Max retries reached");
pendingDeliveryRepository.save(pending);
return;
}
String topic = pending.getTopic();
String[] parts = topic.split("/");
if (parts.length < 2) {
pending.markAsFailed("Invalid topic format");
pendingDeliveryRepository.save(pending);
return;
}
String clientId = parts[0];
String messageType = parts[1];
Duration backoffDelay = calculateBackoff(pending.getRetryCount() + 1);
LocalDateTime nextRetry = LocalDateTime.now().plus(backoffDelay);
if (!clientConnectionService.isClientConnected(clientId)) {
pending.setNextRetryAt(nextRetry);
pendingDeliveryRepository.save(pending);
return;
}
pending.incrementRetryCount();
SendOptions options = SendOptions.reliable();
pluginManager.sendToClient(clientId, messageType, pending.getEnvelopeData(), options).thenAccept(v -> {
pending.markAsSent(nextRetry);
pendingDeliveryRepository.save(pending);
}).exceptionally(ex -> {
log.error("[MessageDelivery] Retry failed for message {}: {}", pending.getMessageId(), ex.getMessage());
pending.markAsFailed(ex.getMessage());
pendingDeliveryRepository.save(pending);
return null;
});
} catch (Exception e) {
log.error("[MessageDelivery] Error retrying delivery {}: {}", pending.getMessageId(), e.getMessage());
}
}
private void sendAcknowledgment(MessageEnvelope envelope) {
try {
String clientId = extractClientIdFromTopic(envelope.getTopic());
if (clientId == null) {
return;
}
AcknowledgmentMessage ack = new AcknowledgmentMessage(envelope.getMessageId(), AckStatus.RECEIVED,
"server");
String ackJson = objectMapper.writeValueAsString(ack);
byte[] ackData = ackJson.getBytes(StandardCharsets.UTF_8);
log.info("[MessageDelivery] Sending ACK for message {} to client {}", envelope.getMessageId(), clientId);
pluginManager.sendAckToClient(clientId, envelope.getMessageId(), ackData, SendOptions.fireAndForget())
.exceptionally(ex -> {
log.error("[MessageDelivery] Failed to send ACK for message {}: {}", envelope.getMessageId(),
ex.getMessage());
return null;
});
} catch (Exception e) {
log.error("[MessageDelivery] Error sending ACK for message {}: {}", envelope.getMessageId(),
e.getMessage());
}
}
private Duration calculateBackoff(int retryCount) {
long delayMs = (long) (config.getRetryInitialDelay().toMillis()
* Math.pow(config.getRetryBackoffMultiplier(), retryCount - 1));
long maxDelayMs = config.getRetryMaxDelay().toMillis();
return Duration.ofMillis(Math.min(delayMs, maxDelayMs));
}
private String extractClientIdFromTopic(String topic) {
if (topic == null) {
return null;
}
if (topic.startsWith("/server/")) {
String[] parts = topic.split("/");
if (parts.length > 2) {
return parts[2];
}
}
if (topic.contains("/")) {
String[] parts = topic.split("/");
if (parts.length >= 1) {
return parts[0];
}
}
return null;
}
}

View File

@@ -1,41 +0,0 @@
package de.assecutor.votianlt.messaging.delivery;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* Scheduled tasks for message delivery retry and cleanup.
*/
@Component
public class RetryScheduler {
private final MessageDeliveryService deliveryService;
public RetryScheduler(MessageDeliveryService deliveryService) {
this.deliveryService = deliveryService;
}
/**
* Retry pending deliveries every 30 seconds (configurable)
*/
@Scheduled(fixedDelayString = "${app.messaging.delivery.retry-interval-seconds:30}000")
public void retryPendingDeliveries() {
try {
deliveryService.retryPendingDeliveries();
} catch (Exception e) {
// Ignore retry errors
}
}
/**
* Cleanup old deliveries every hour (configurable)
*/
@Scheduled(fixedDelayString = "${app.messaging.delivery.cleanup-interval-minutes:60}000")
public void cleanupOldDeliveries() {
try {
deliveryService.cleanupOldDeliveries();
} catch (Exception e) {
// Ignore cleanup errors
}
}
}

View File

@@ -1,21 +0,0 @@
package de.assecutor.votianlt.messaging.model;
/**
* Status of message acknowledgment from client.
*/
public enum AckStatus {
/**
* Message was received by the client
*/
RECEIVED,
/**
* Message was successfully processed by the client
*/
PROCESSED,
/**
* Message processing failed on the client side
*/
FAILED
}

View File

@@ -1,62 +0,0 @@
package de.assecutor.votianlt.messaging.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
/**
* Acknowledgment message sent by clients to confirm message receipt/processing.
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class AcknowledgmentMessage {
/**
* ID of the message being acknowledged
*/
private String messageId;
/**
* Status of the acknowledgment
*/
private AckStatus status;
/**
* Timestamp when the acknowledgment was created
*/
private LocalDateTime timestamp;
/**
* ID of the client sending the acknowledgment
*/
private String clientId;
/**
* Optional error message if status is FAILED
*/
private String errorMessage;
/**
* Constructor for successful acknowledgment
*/
public AcknowledgmentMessage(String messageId, AckStatus status, String clientId) {
this.messageId = messageId;
this.status = status;
this.timestamp = LocalDateTime.now();
this.clientId = clientId;
}
/**
* Constructor for failed acknowledgment with error message
*/
public AcknowledgmentMessage(String messageId, String clientId, String errorMessage) {
this.messageId = messageId;
this.status = AckStatus.FAILED;
this.timestamp = LocalDateTime.now();
this.clientId = clientId;
this.errorMessage = errorMessage;
}
}

View File

@@ -1,84 +0,0 @@
package de.assecutor.votianlt.messaging.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.Duration;
import java.time.LocalDateTime;
/**
* Options for message delivery configuration.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DeliveryOptions {
/**
* Whether this message requires acknowledgment
*/
@Builder.Default
private boolean requiresAck = true;
/**
* Maximum number of retry attempts
*/
@Builder.Default
private int maxRetries = 3;
/**
* Timeout for acknowledgment
*/
@Builder.Default
private Duration ackTimeout = Duration.ofSeconds(30);
/**
* Message expiry duration from now
*/
@Builder.Default
private Duration expiryDuration = Duration.ofHours(24);
/**
* QoS level for transport (MQTT specific, but kept generic)
*/
@Builder.Default
private int qos = 2;
/**
* Whether message should be retained by broker
*/
@Builder.Default
private boolean retained = false;
/**
* Calculate expiry timestamp from duration
*/
public LocalDateTime calculateExpiryTime() {
return LocalDateTime.now().plus(expiryDuration);
}
/**
* Default options for standard messages
*/
public static DeliveryOptions standard() {
return DeliveryOptions.builder().build();
}
/**
* Options for fire-and-forget messages (no acknowledgment required)
*/
public static DeliveryOptions fireAndForget() {
return DeliveryOptions.builder().requiresAck(false).maxRetries(0).build();
}
/**
* Options for critical messages with extended retry
*/
public static DeliveryOptions critical() {
return DeliveryOptions.builder().requiresAck(true).maxRetries(5).ackTimeout(Duration.ofMinutes(2))
.expiryDuration(Duration.ofDays(7)).build();
}
}

View File

@@ -1,55 +0,0 @@
package de.assecutor.votianlt.messaging.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
/**
* Receipt returned when a message is submitted for delivery.
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class DeliveryReceipt {
/**
* Unique message identifier
*/
private String messageId;
/**
* Topic where message was sent
*/
private String topic;
/**
* When the message was submitted
*/
private LocalDateTime submittedAt;
/**
* Initial delivery status
*/
private DeliveryStatus status;
/**
* When the message will expire
*/
private LocalDateTime expiresAt;
/**
* Create a receipt for a successfully submitted message
*/
public static DeliveryReceipt submitted(String messageId, String topic, LocalDateTime expiresAt) {
return new DeliveryReceipt(messageId, topic, LocalDateTime.now(), DeliveryStatus.PENDING, expiresAt);
}
/**
* Create a receipt for a failed submission
*/
public static DeliveryReceipt failed(String messageId, String topic) {
return new DeliveryReceipt(messageId, topic, LocalDateTime.now(), DeliveryStatus.FAILED, null);
}
}

View File

@@ -1,31 +0,0 @@
package de.assecutor.votianlt.messaging.model;
/**
* Status of a message delivery attempt.
*/
public enum DeliveryStatus {
/**
* Message is queued but not yet sent
*/
PENDING,
/**
* Message has been sent to the transport layer
*/
SENT,
/**
* Client has acknowledged receipt of the message
*/
ACKNOWLEDGED,
/**
* Delivery failed after all retry attempts
*/
FAILED,
/**
* Message expired before delivery could be confirmed
*/
EXPIRED
}

View File

@@ -1,101 +0,0 @@
package de.assecutor.votianlt.messaging.model;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* Envelope that wraps all messages sent through the messaging system. Contains
* metadata for delivery tracking and acknowledgment. This is a DTO class - not
* persisted to MongoDB.
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class MessageEnvelope {
/**
* Unique identifier for this message (UUID)
*/
private String messageId;
/**
* Timestamp when the envelope was created
*/
private LocalDateTime timestamp;
/**
* Target topic for this message
*/
private String topic;
/**
* The actual message payload (can be any serializable object)
*/
private Object payload;
/**
* Whether this message requires acknowledgment from the receiver
*/
private boolean requiresAck;
/**
* Number of times this message has been retried
*/
private int retryCount;
/**
* When this message expires and should no longer be delivered
*/
private LocalDateTime expiresAt;
/**
* Additional metadata for the message
*/
private Map<String, String> metadata;
/**
* Constructor for creating a new envelope with payload
*/
public MessageEnvelope(String topic, Object payload, boolean requiresAck, LocalDateTime expiresAt) {
this.messageId = UUID.randomUUID().toString();
this.timestamp = LocalDateTime.now();
this.topic = topic;
this.payload = payload;
this.requiresAck = requiresAck;
this.retryCount = 0;
this.expiresAt = expiresAt;
this.metadata = new HashMap<>();
}
/**
* Check if this message has expired
*/
public boolean isExpired() {
return expiresAt != null && LocalDateTime.now().isAfter(expiresAt);
}
/**
* Increment the retry counter
*/
public void incrementRetryCount() {
this.retryCount++;
}
/**
* Add metadata to the envelope
*/
public void addMetadata(String key, String value) {
if (this.metadata == null) {
this.metadata = new HashMap<>();
}
this.metadata.put(key, value);
}
}

View File

@@ -1,210 +0,0 @@
package de.assecutor.votianlt.messaging.model;
import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.bson.types.ObjectId;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.index.Indexed;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.core.mapping.Field;
import java.time.LocalDateTime;
/**
* Represents a message delivery that is pending acknowledgment. Stored in
* MongoDB for retry and tracking purposes.
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Document(collection = "pending_deliveries")
public class PendingDelivery {
@Id
@JsonIgnore
private ObjectId id;
/**
* Unique message identifier
*/
@Field("message_id")
@Indexed(unique = true)
private String messageId;
/**
* Target topic for this message
*/
@Field("topic")
private String topic;
/**
* Serialized envelope data (JSON bytes)
*/
@Field("envelope_data")
private byte[] envelopeData;
/**
* Current delivery status
*/
@Field("status")
@Indexed
private DeliveryStatus status;
/**
* When the delivery record was created
*/
@Field("created_at")
private LocalDateTime createdAt;
/**
* When the message was last sent
*/
@Field("sent_at")
private LocalDateTime sentAt;
/**
* When acknowledgment was received
*/
@Field("acknowledged_at")
private LocalDateTime acknowledgedAt;
/**
* Number of retry attempts made
*/
@Field("retry_count")
private int retryCount;
/**
* Maximum number of retries allowed
*/
@Field("max_retries")
private int maxRetries;
/**
* When the next retry should be attempted
*/
@Field("next_retry_at")
@Indexed
private LocalDateTime nextRetryAt;
/**
* When this delivery expires
*/
@Field("expires_at")
@Indexed
private LocalDateTime expiresAt;
/**
* Reason for failure (if status is FAILED)
*/
@Field("failure_reason")
private String failureReason;
/**
* Client ID (extracted from topic if available)
*/
@Field("client_id")
private String clientId;
/**
* Constructor for new pending delivery
*/
public PendingDelivery(String messageId, String topic, byte[] envelopeData, int maxRetries,
LocalDateTime expiresAt) {
this.messageId = messageId;
this.topic = topic;
this.envelopeData = envelopeData;
this.status = DeliveryStatus.PENDING;
this.createdAt = LocalDateTime.now();
this.retryCount = 0;
this.maxRetries = maxRetries;
this.expiresAt = expiresAt;
this.clientId = extractClientIdFromTopic(topic);
}
/**
* Mark as sent and schedule next retry
*/
public void markAsSent(LocalDateTime nextRetryAt) {
this.status = DeliveryStatus.SENT;
this.sentAt = LocalDateTime.now();
this.nextRetryAt = nextRetryAt;
}
/**
* Mark as acknowledged
*/
public void markAsAcknowledged() {
this.status = DeliveryStatus.ACKNOWLEDGED;
this.acknowledgedAt = LocalDateTime.now();
}
/**
* Mark as failed with reason
*/
public void markAsFailed(String reason) {
this.status = DeliveryStatus.FAILED;
this.failureReason = reason;
}
/**
* Mark as expired
*/
public void markAsExpired() {
this.status = DeliveryStatus.EXPIRED;
this.failureReason = "Message expired before delivery could be confirmed";
}
/**
* Increment retry count
*/
public void incrementRetryCount() {
this.retryCount++;
}
/**
* Check if max retries reached
*/
public boolean hasReachedMaxRetries() {
return retryCount >= maxRetries;
}
/**
* Check if expired
*/
public boolean isExpired() {
return expiresAt != null && LocalDateTime.now().isAfter(expiresAt);
}
/**
* Check if ready for retry
*/
public boolean isReadyForRetry() {
return status == DeliveryStatus.SENT && nextRetryAt != null && LocalDateTime.now().isAfter(nextRetryAt)
&& !hasReachedMaxRetries() && !isExpired();
}
/**
* Extract client ID from topic pattern /client/{clientId}/...
*/
private String extractClientIdFromTopic(String topic) {
if (topic != null && topic.startsWith("/client/")) {
String[] parts = topic.split("/");
if (parts.length > 2) {
return parts[2];
}
}
return null;
}
/**
* Returns the ObjectId as string for JSON serialization
*/
@JsonGetter("id")
public String getIdAsString() {
return id != null ? id.toString() : null;
}
}

View File

@@ -1,107 +0,0 @@
package de.assecutor.votianlt.messaging.plugin;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
/**
* Event representing a connection state change.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ConnectionStateEvent {
/**
* Connection state
*/
private ConnectionState state;
/**
* Previous connection state
*/
private ConnectionState previousState;
/**
* Timestamp of the state change
*/
@Builder.Default
private LocalDateTime timestamp = LocalDateTime.now();
/**
* Optional error message if state is ERROR or DISCONNECTED
*/
private String errorMessage;
/**
* Optional exception if state is ERROR
*/
private Throwable exception;
/**
* Plugin that generated this event
*/
private String pluginName;
/**
* Connection states
*/
public enum ConnectionState {
/**
* Plugin is initializing
*/
INITIALIZING,
/**
* Plugin is connecting to the transport
*/
CONNECTING,
/**
* Plugin is connected and ready
*/
CONNECTED,
/**
* Plugin is disconnecting
*/
DISCONNECTING,
/**
* Plugin is disconnected
*/
DISCONNECTED,
/**
* Plugin encountered an error
*/
ERROR,
/**
* Plugin is reconnecting after a failure
*/
RECONNECTING
}
/**
* Check if the connection is active.
*
* @return true if connected
*/
public boolean isConnected() {
return state == ConnectionState.CONNECTED;
}
/**
* Check if there was an error.
*
* @return true if state is ERROR
*/
public boolean isError() {
return state == ConnectionState.ERROR;
}
}

View File

@@ -1,182 +0,0 @@
package de.assecutor.votianlt.messaging.plugin;
import java.util.concurrent.CompletableFuture;
/**
* Interface for messaging transport plugins. Plugins implement specific
* transport protocols (MQTT, WebSocket, gRPC, etc.) and provide a unified
* interface for the messaging layer.
*
* The plugin is responsible for managing the internal topic/channel structure.
* The messaging layer only uses clientId and messageType as identifiers.
*/
public interface MessagingPlugin {
/**
* Initialize the plugin with configuration. Called once during application
* startup.
*
* @param config
* Plugin-specific configuration
* @throws PluginException
* if initialization fails
*/
void init(PluginConfig config) throws PluginException;
/**
* Shutdown the plugin and release resources. Called during application
* shutdown.
*
* @throws PluginException
* if shutdown fails
*/
void exit() throws PluginException;
/**
* Callback when connection state changes. The plugin should call this method
* when the underlying transport connection state changes (connected,
* disconnected, error).
*
* @param listener
* Connection state listener
*/
void setConnectionListener(ConnectionStateListener listener);
/**
* Send a message to a specific client. The plugin is responsible for
* determining the correct topic/channel based on the messageType.
*
* @param clientId
* Target client identifier
* @param messageType
* Type of message (e.g., "jobs", "message", "auth", "task")
* @param payload
* Message payload as byte array
* @param options
* Transport-specific options
* @return CompletableFuture that completes when message is sent
* @throws PluginException
* if sending fails
*/
CompletableFuture<Void> sendToClient(String clientId, String messageType, byte[] payload, SendOptions options)
throws PluginException;
/**
* Send an acknowledgment to a specific client. The plugin is responsible for
* determining the correct ACK topic/channel.
*
* @param clientId
* Target client identifier
* @param messageId
* Message ID being acknowledged
* @param payload
* ACK payload as byte array
* @param options
* Transport-specific options
* @return CompletableFuture that completes when ACK is sent
* @throws PluginException
* if sending fails
*/
CompletableFuture<Void> sendAckToClient(String clientId, String messageId, byte[] payload, SendOptions options)
throws PluginException;
/**
* Register a handler for incoming messages of a specific type from clients. The
* plugin is responsible for subscribing to the appropriate topics/channels.
*
* @param messageType
* Type of message to handle (e.g., "task_completed", "message",
* "jobs/assigned", "login")
* @param handler
* Message handler to be called when a message is received
* @throws PluginException
* if registration fails
*/
void registerMessageHandler(String messageType, ClientMessageHandler handler) throws PluginException;
/**
* Register a handler for incoming acknowledgments from clients. The plugin is
* responsible for subscribing to the appropriate ACK topics/channels.
*
* @param handler
* ACK handler to be called when an ACK is received
* @throws PluginException
* if registration fails
*/
void registerAckHandler(AckHandler handler) throws PluginException;
/**
* Check if the plugin is currently connected.
*
* @return true if connected, false otherwise
*/
boolean isConnected();
/**
* Get the plugin name/type identifier.
*
* @return Plugin name (e.g., "mqtt", "websocket", "grpc")
*/
String getPluginName();
/**
* Get plugin version.
*
* @return Plugin version string
*/
String getPluginVersion();
/**
* Get plugin metadata/information.
*
* @return Plugin metadata
*/
PluginMetadata getMetadata();
/**
* Callback interface for connection state changes.
*/
@FunctionalInterface
interface ConnectionStateListener {
/**
* Called when connection state changes.
*
* @param event
* Connection state event
*/
void onConnectionStateChanged(ConnectionStateEvent event);
}
/**
* Handler for received messages from clients. Includes the clientId extracted
* from the topic/channel.
*/
@FunctionalInterface
interface ClientMessageHandler {
/**
* Called when a message is received from a client.
*
* @param clientId
* Client identifier extracted from the topic/channel
* @param payload
* Message payload as byte array
*/
void onMessageReceived(String clientId, byte[] payload);
}
/**
* Handler for received acknowledgments from clients.
*/
@FunctionalInterface
interface AckHandler {
/**
* Called when an ACK is received from a client.
*
* @param messageId
* Message ID being acknowledged
* @param payload
* ACK payload as byte array
*/
void onAckReceived(String messageId, byte[] payload);
}
}

View File

@@ -1,141 +0,0 @@
package de.assecutor.votianlt.messaging.plugin;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.HashMap;
import java.util.Map;
/**
* Configuration for messaging plugins. Provides a flexible key-value store for
* plugin-specific settings.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class PluginConfig {
/**
* Plugin-specific properties
*/
@Builder.Default
private Map<String, Object> properties = new HashMap<>();
/**
* Get a string property.
*
* @param key
* Property key
* @return Property value or null if not found
*/
public String getString(String key) {
Object value = properties.get(key);
return value != null ? value.toString() : null;
}
/**
* Get a string property with default value.
*
* @param key
* Property key
* @param defaultValue
* Default value if property not found
* @return Property value or default
*/
public String getString(String key, String defaultValue) {
String value = getString(key);
return value != null ? value : defaultValue;
}
/**
* Get an integer property.
*
* @param key
* Property key
* @return Property value or null if not found
*/
public Integer getInt(String key) {
Object value = properties.get(key);
if (value instanceof Integer) {
return (Integer) value;
} else if (value instanceof String) {
try {
return Integer.parseInt((String) value);
} catch (NumberFormatException e) {
return null;
}
}
return null;
}
/**
* Get an integer property with default value.
*
* @param key
* Property key
* @param defaultValue
* Default value if property not found
* @return Property value or default
*/
public int getInt(String key, int defaultValue) {
Integer value = getInt(key);
return value != null ? value : defaultValue;
}
/**
* Get a boolean property.
*
* @param key
* Property key
* @return Property value or null if not found
*/
public Boolean getBoolean(String key) {
Object value = properties.get(key);
if (value instanceof Boolean) {
return (Boolean) value;
} else if (value instanceof String) {
return Boolean.parseBoolean((String) value);
}
return null;
}
/**
* Get a boolean property with default value.
*
* @param key
* Property key
* @param defaultValue
* Default value if property not found
* @return Property value or default
*/
public boolean getBoolean(String key, boolean defaultValue) {
Boolean value = getBoolean(key);
return value != null ? value : defaultValue;
}
/**
* Set a property.
*
* @param key
* Property key
* @param value
* Property value
*/
public void setProperty(String key, Object value) {
properties.put(key, value);
}
/**
* Check if a property exists.
*
* @param key
* Property key
* @return true if property exists
*/
public boolean hasProperty(String key) {
return properties.containsKey(key);
}
}

View File

@@ -1,19 +0,0 @@
package de.assecutor.votianlt.messaging.plugin;
/**
* Exception thrown by messaging plugins.
*/
public class PluginException extends Exception {
public PluginException(String message) {
super(message);
}
public PluginException(String message, Throwable cause) {
super(message, cause);
}
public PluginException(Throwable cause) {
super(cause);
}
}

View File

@@ -1,271 +0,0 @@
package de.assecutor.votianlt.messaging.plugin;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import jakarta.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
/**
* Manager for messaging plugins. Handles plugin lifecycle, registration, and
* delegation.
*/
@Component
@Slf4j
public class PluginManager {
private MessagingPlugin activePlugin;
private final List<ConnectionStateEvent> connectionHistory = new ArrayList<>();
private final List<PluginStateListener> stateListeners = new ArrayList<>();
/**
* Initialize and activate a plugin.
*
* @param plugin
* Plugin to activate
* @param config
* Plugin configuration
* @throws PluginException
* if initialization fails
*/
public void activatePlugin(MessagingPlugin plugin, PluginConfig config) throws PluginException {
log.info("[PluginManager] Activating plugin: {}", plugin.getPluginName());
// Shutdown existing plugin if any
if (activePlugin != null) {
log.info("[PluginManager] Shutting down existing plugin: {}", activePlugin.getPluginName());
try {
activePlugin.exit();
} catch (Exception e) {
log.error("[PluginManager] Error shutting down existing plugin: {}", e.getMessage(), e);
}
}
// Set connection listener
plugin.setConnectionListener(event -> {
String previousState = event.getPreviousState() != null ? event.getPreviousState().toString() : "NONE";
log.info("[PluginManager] Connection state changed: {} -> {}", previousState, event.getState());
connectionHistory.add(event);
notifyStateListeners(event);
});
// Initialize plugin
plugin.init(config);
activePlugin = plugin;
log.info("[PluginManager] Plugin activated: {} v{}", plugin.getPluginName(), plugin.getPluginVersion());
}
/**
* Get the currently active plugin.
*
* @return Active plugin or empty if none
*/
public Optional<MessagingPlugin> getActivePlugin() {
return Optional.ofNullable(activePlugin);
}
/**
* Send a message to a specific client via the active plugin.
*
* @param clientId
* Target client identifier
* @param messageType
* Type of message (e.g., "jobs", "message", "auth", "task")
* @param payload
* Message payload
* @param options
* Send options
* @return CompletableFuture that completes when message is sent
* @throws PluginException
* if no plugin is active or sending fails
*/
public CompletableFuture<Void> sendToClient(String clientId, String messageType, byte[] payload,
SendOptions options) throws PluginException {
if (activePlugin == null) {
return CompletableFuture.failedFuture(new PluginException("No active plugin"));
}
if (!activePlugin.isConnected()) {
return CompletableFuture.failedFuture(new PluginException("Plugin is not connected"));
}
return activePlugin.sendToClient(clientId, messageType, payload, options);
}
/**
* Send an acknowledgment to a specific client via the active plugin.
*
* @param clientId
* Target client identifier
* @param messageId
* Message ID being acknowledged
* @param payload
* ACK payload
* @param options
* Send options
* @return CompletableFuture that completes when ACK is sent
* @throws PluginException
* if no plugin is active or sending fails
*/
public CompletableFuture<Void> sendAckToClient(String clientId, String messageId, byte[] payload,
SendOptions options) throws PluginException {
if (activePlugin == null) {
return CompletableFuture.failedFuture(new PluginException("No active plugin"));
}
if (!activePlugin.isConnected()) {
return CompletableFuture.failedFuture(new PluginException("Plugin is not connected"));
}
return activePlugin.sendAckToClient(clientId, messageId, payload, options);
}
/**
* Register a handler for incoming messages of a specific type from clients.
*
* @param messageType
* Type of message to handle
* @param handler
* Message handler
* @throws PluginException
* if no plugin is active or registration fails
*/
public void registerMessageHandler(String messageType, MessagingPlugin.ClientMessageHandler handler)
throws PluginException {
if (activePlugin == null) {
throw new PluginException("No active plugin");
}
activePlugin.registerMessageHandler(messageType, handler);
}
/**
* Register a handler for incoming acknowledgments from clients.
*
* @param handler
* ACK handler
* @throws PluginException
* if no plugin is active or registration fails
*/
public void registerAckHandler(MessagingPlugin.AckHandler handler) throws PluginException {
if (activePlugin == null) {
throw new PluginException("No active plugin");
}
activePlugin.registerAckHandler(handler);
}
/**
* Check if the active plugin is connected.
*
* @return true if connected, false otherwise
*/
public boolean isConnected() {
return activePlugin != null && activePlugin.isConnected();
}
/**
* Get metadata of the active plugin.
*
* @return Plugin metadata or empty if no plugin is active
*/
public Optional<PluginMetadata> getActivePluginMetadata() {
return Optional.ofNullable(activePlugin).map(MessagingPlugin::getMetadata);
}
/**
* Get connection history.
*
* @return List of connection state events
*/
public List<ConnectionStateEvent> getConnectionHistory() {
return new ArrayList<>(connectionHistory);
}
/**
* Get the last connection state event.
*
* @return Last connection state event or empty if none
*/
public Optional<ConnectionStateEvent> getLastConnectionState() {
if (connectionHistory.isEmpty()) {
return Optional.empty();
}
return Optional.of(connectionHistory.get(connectionHistory.size() - 1));
}
/**
* Add a plugin state listener.
*
* @param listener
* State listener
*/
public void addStateListener(PluginStateListener listener) {
stateListeners.add(listener);
}
/**
* Remove a plugin state listener.
*
* @param listener
* State listener
*/
public void removeStateListener(PluginStateListener listener) {
stateListeners.remove(listener);
}
/**
* Notify all state listeners of a connection state change.
*
* @param event
* Connection state event
*/
private void notifyStateListeners(ConnectionStateEvent event) {
for (PluginStateListener listener : stateListeners) {
try {
listener.onConnectionStateChanged(event);
} catch (Exception e) {
log.error("[PluginManager] Error in state listener: {}", e.getMessage(), e);
}
}
}
/**
* Shutdown the plugin manager and active plugin.
*/
@PreDestroy
public void shutdown() {
log.info("[PluginManager] Shutting down plugin manager");
if (activePlugin != null) {
try {
activePlugin.exit();
log.info("[PluginManager] Active plugin shut down successfully");
} catch (Exception e) {
log.error("[PluginManager] Error shutting down active plugin: {}", e.getMessage(), e);
}
activePlugin = null;
}
stateListeners.clear();
connectionHistory.clear();
}
/**
* Listener interface for plugin state changes.
*/
@FunctionalInterface
public interface PluginStateListener {
/**
* Called when plugin connection state changes.
*
* @param event
* Connection state event
*/
void onConnectionStateChanged(ConnectionStateEvent event);
}
}

View File

@@ -1,92 +0,0 @@
package de.assecutor.votianlt.messaging.plugin;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.ArrayList;
import java.util.List;
/**
* Metadata about a messaging plugin.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class PluginMetadata {
/**
* Plugin name
*/
private String name;
/**
* Plugin version
*/
private String version;
/**
* Plugin description
*/
private String description;
/**
* Plugin author/vendor
*/
private String author;
/**
* Supported features
*/
@Builder.Default
private List<String> supportedFeatures = new ArrayList<>();
/**
* Whether the plugin supports wildcards in topic patterns
*/
@Builder.Default
private boolean supportsWildcards = false;
/**
* Whether the plugin supports retained messages
*/
@Builder.Default
private boolean supportsRetainedMessages = false;
/**
* Whether the plugin supports QoS levels
*/
@Builder.Default
private boolean supportsQos = false;
/**
* Maximum QoS level supported (0, 1, 2)
*/
@Builder.Default
private int maxQosLevel = 0;
/**
* Check if a feature is supported.
*
* @param feature
* Feature name
* @return true if supported
*/
public boolean supportsFeature(String feature) {
return supportedFeatures.contains(feature);
}
/**
* Add a supported feature.
*
* @param feature
* Feature name
*/
public void addSupportedFeature(String feature) {
if (!supportedFeatures.contains(feature)) {
supportedFeatures.add(feature);
}
}
}

View File

@@ -1,84 +0,0 @@
package de.assecutor.votianlt.messaging.plugin;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
/**
* Represents a message received from a messaging plugin.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ReceivedMessage {
/**
* Topic/channel the message was received on
*/
private String topic;
/**
* Message payload
*/
private byte[] payload;
/**
* Quality of Service level (if applicable)
*/
private int qos;
/**
* Whether the message was retained
*/
private boolean retained;
/**
* Timestamp when message was received
*/
@Builder.Default
private LocalDateTime receivedAt = LocalDateTime.now();
/**
* Additional metadata from the transport
*/
@Builder.Default
private Map<String, Object> metadata = new HashMap<>();
/**
* Get metadata value.
*
* @param key
* Metadata key
* @return Metadata value or null
*/
public Object getMetadata(String key) {
return metadata.get(key);
}
/**
* Set metadata value.
*
* @param key
* Metadata key
* @param value
* Metadata value
*/
public void setMetadata(String key, Object value) {
metadata.put(key, value);
}
/**
* Get payload as UTF-8 string.
*
* @return Payload as string
*/
public String getPayloadAsString() {
return payload != null ? new String(payload, java.nio.charset.StandardCharsets.UTF_8) : null;
}
}

View File

@@ -1,99 +0,0 @@
package de.assecutor.votianlt.messaging.plugin;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.HashMap;
import java.util.Map;
/**
* Options for sending messages via plugins. Provides transport-agnostic options
* with extensibility for plugin-specific settings.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SendOptions {
/**
* Quality of Service level (0, 1, 2 for MQTT-like transports)
*/
@Builder.Default
private int qos = 1;
/**
* Whether the message should be retained by the broker/server
*/
@Builder.Default
private boolean retained = false;
/**
* Message priority (if supported by transport)
*/
@Builder.Default
private int priority = 0;
/**
* Message expiry time in seconds (if supported by transport)
*/
private Long expirySeconds;
/**
* Additional plugin-specific options
*/
@Builder.Default
private Map<String, Object> additionalOptions = new HashMap<>();
/**
* Get an additional option.
*
* @param key
* Option key
* @return Option value or null
*/
public Object getAdditionalOption(String key) {
return additionalOptions.get(key);
}
/**
* Set an additional option.
*
* @param key
* Option key
* @param value
* Option value
*/
public void setAdditionalOption(String key, Object value) {
additionalOptions.put(key, value);
}
/**
* Create default send options.
*
* @return Default options
*/
public static SendOptions defaults() {
return SendOptions.builder().build();
}
/**
* Create options for fire-and-forget messages.
*
* @return Fire-and-forget options
*/
public static SendOptions fireAndForget() {
return SendOptions.builder().qos(0).retained(false).build();
}
/**
* Create options for reliable delivery.
*
* @return Reliable delivery options
*/
public static SendOptions reliable() {
return SendOptions.builder().qos(2).retained(false).build();
}
}

View File

@@ -1,416 +0,0 @@
package de.assecutor.votianlt.messaging.plugin.mqtt;
import com.hivemq.client.mqtt.MqttClient;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import de.assecutor.votianlt.messaging.plugin.*;
import de.assecutor.votianlt.messaging.plugin.ConnectionStateEvent.ConnectionState;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
/**
* MQTT implementation of the MessagingPlugin interface. Uses HiveMQ MQTT 5
* client for communication.
*
* Topic Structure (managed internally): - Server -> Client:
* /client/{clientId}/{messageType} - Client -> Server:
* /server/{clientId}/{messageType} - ACK Server -> Client:
* /client/{clientId}/ack (messageId in payload) - ACK Client -> Server:
* /server/{clientId}/ack (messageId in payload)
*/
@Slf4j
public class MqttMessagingPlugin implements MessagingPlugin {
private static final String PLUGIN_NAME = "mqtt";
private static final String PLUGIN_VERSION = "2.0.0";
// Topic templates
private static final String TOPIC_TO_CLIENT = "/client/%s/%s"; // /client/{clientId}/{messageType}
private static final String TOPIC_ACK_TO_CLIENT = "/client/%s/ack"; // /client/{clientId}/ack (messageId in payload)
// Subscription patterns
private static final String PATTERN_FROM_CLIENT = "/server/+/%s"; // /server/+/{messageType}
private static final String PATTERN_ACK_FROM_CLIENT = "/server/+/ack"; // /server/+/ack
private Mqtt5AsyncClient mqttClient;
private ConnectionStateListener connectionListener;
private final Map<String, ClientMessageHandler> messageHandlers = new ConcurrentHashMap<>();
private AckHandler ackHandler;
private volatile boolean connected = false;
// Configuration keys
private static final String CONFIG_BROKER_HOST = "broker.host";
private static final String CONFIG_BROKER_PORT = "broker.port";
private static final String CONFIG_USERNAME = "username";
private static final String CONFIG_PASSWORD = "password";
private static final String CONFIG_CLIENT_ID = "client.id";
private static final String CONFIG_CLEAN_START = "clean.start";
private static final String CONFIG_CONNECTION_TIMEOUT = "connection.timeout.seconds";
private static final String CONFIG_KEEP_ALIVE = "keep.alive.seconds";
@Override
public void init(PluginConfig config) throws PluginException {
try {
notifyConnectionState(ConnectionState.INITIALIZING, null);
// Extract configuration
String brokerHost = config.getString(CONFIG_BROKER_HOST, "localhost");
int brokerPort = config.getInt(CONFIG_BROKER_PORT, 1883);
String username = config.getString(CONFIG_USERNAME);
String password = config.getString(CONFIG_PASSWORD);
String clientId = config.getString(CONFIG_CLIENT_ID, "votianlt-" + UUID.randomUUID());
boolean cleanStart = config.getBoolean(CONFIG_CLEAN_START, true);
int connectionTimeout = config.getInt(CONFIG_CONNECTION_TIMEOUT, 60);
int keepAlive = config.getInt(CONFIG_KEEP_ALIVE, 60);
// Build MQTT client
var clientBuilder = MqttClient.builder().useMqttVersion5().identifier(clientId).serverHost(brokerHost)
.serverPort(brokerPort).automaticReconnect().initialDelay(1, java.util.concurrent.TimeUnit.SECONDS)
.maxDelay(30, java.util.concurrent.TimeUnit.SECONDS).applyAutomaticReconnect();
mqttClient = clientBuilder.buildAsync();
// Build connect options
var connectBuilder = com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect.builder()
.cleanStart(cleanStart).keepAlive(keepAlive);
if (username != null && password != null) {
connectBuilder.simpleAuth().username(username).password(password.getBytes(StandardCharsets.UTF_8))
.applySimpleAuth();
}
// Connect asynchronously
notifyConnectionState(ConnectionState.CONNECTING, null);
mqttClient.connect(connectBuilder.build())
.orTimeout(connectionTimeout, java.util.concurrent.TimeUnit.SECONDS)
.whenComplete((connAck, throwable) -> {
if (throwable != null) {
String errorMsg = String.format("Connection to %s:%d failed: %s", brokerHost, brokerPort,
throwable.getMessage());
log.error("[MQTT] Connection failed: {}", errorMsg);
connected = false;
notifyConnectionState(ConnectionState.ERROR, errorMsg);
} else {
log.info("[MQTT] Server connected to {}:{}", brokerHost, brokerPort);
connected = true;
setupGlobalMessageHandler();
notifyConnectionState(ConnectionState.CONNECTED, null);
}
});
} catch (Exception e) {
log.error("[MQTT] Initialization failed: {}", e.getMessage(), e);
throw new PluginException("Failed to initialize MQTT plugin", e);
}
}
@Override
public void exit() throws PluginException {
try {
notifyConnectionState(ConnectionState.DISCONNECTING, null);
if (mqttClient != null) {
var clientState = mqttClient.getState();
if (clientState.isConnected()) {
try {
mqttClient.disconnect().join();
} catch (Exception disconnectEx) {
// Client may already be disconnected
}
}
}
connected = false;
messageHandlers.clear();
ackHandler = null;
notifyConnectionState(ConnectionState.DISCONNECTED, null);
} catch (Exception e) {
log.error("[MQTT] Shutdown failed: {}", e.getMessage());
connected = false;
messageHandlers.clear();
ackHandler = null;
}
}
@Override
public void setConnectionListener(ConnectionStateListener listener) {
this.connectionListener = listener;
}
@Override
public CompletableFuture<Void> sendToClient(String clientId, String messageType, byte[] payload,
SendOptions options) throws PluginException {
if (!connected) {
return CompletableFuture.failedFuture(new PluginException("MQTT client is not connected"));
}
String topic = String.format(TOPIC_TO_CLIENT, clientId, messageType);
String json = new String(payload, StandardCharsets.UTF_8);
log.info("[MQTT OUT] {} -> {}", topic, json);
return sendToTopic(topic, payload, options);
}
@Override
public CompletableFuture<Void> sendAckToClient(String clientId, String messageId, byte[] payload,
SendOptions options) throws PluginException {
if (!connected) {
return CompletableFuture.failedFuture(new PluginException("MQTT client is not connected"));
}
String topic = String.format(TOPIC_ACK_TO_CLIENT, clientId);
return sendToTopic(topic, payload, options);
}
@Override
public void registerMessageHandler(String messageType, ClientMessageHandler handler) throws PluginException {
if (!connected) {
throw new PluginException("MQTT client is not connected");
}
messageHandlers.put(messageType, handler);
// Special case for login: subscribe to /server/login (without clientId)
if ("login".equals(messageType)) {
String loginTopic = "/server/login";
mqttClient.subscribeWith().topicFilter(loginTopic).qos(MqttQos.EXACTLY_ONCE).send()
.whenComplete((subAck, throwable) -> {
if (throwable != null) {
log.error("[MQTT] Subscription to {} failed: {}", loginTopic, throwable.getMessage());
messageHandlers.remove(messageType);
}
});
} else {
// Standard pattern: /server/+/{messageType}
String topicPattern = String.format(PATTERN_FROM_CLIENT, messageType);
mqttClient.subscribeWith().topicFilter(topicPattern).qos(MqttQos.EXACTLY_ONCE).send()
.whenComplete((subAck, throwable) -> {
if (throwable != null) {
log.error("[MQTT] Subscription to {} failed: {}", topicPattern, throwable.getMessage());
messageHandlers.remove(messageType);
}
});
}
}
@Override
public void registerAckHandler(AckHandler handler) throws PluginException {
if (!connected) {
throw new PluginException("MQTT client is not connected");
}
this.ackHandler = handler;
// Subscribe to ACK topic pattern
mqttClient.subscribeWith().topicFilter(PATTERN_ACK_FROM_CLIENT).qos(MqttQos.EXACTLY_ONCE).send()
.whenComplete((subAck, throwable) -> {
if (throwable != null) {
log.error("[MQTT] Subscription to {} failed: {}", PATTERN_ACK_FROM_CLIENT,
throwable.getMessage());
this.ackHandler = null;
}
});
}
@Override
public boolean isConnected() {
return connected;
}
@Override
public String getPluginName() {
return PLUGIN_NAME;
}
@Override
public String getPluginVersion() {
return PLUGIN_VERSION;
}
@Override
public PluginMetadata getMetadata() {
return PluginMetadata.builder().name(PLUGIN_NAME).version(PLUGIN_VERSION)
.description("MQTT v5 messaging plugin using HiveMQ client").supportsWildcards(true)
.supportsRetainedMessages(true).supportsQos(true).maxQosLevel(2).build();
}
/**
* Setup global message handler to route incoming messages to registered
* handlers.
*/
private void setupGlobalMessageHandler() {
mqttClient.publishes(com.hivemq.client.mqtt.MqttGlobalPublishFilter.ALL, publish -> {
handleIncomingMessage(publish);
});
}
/**
* Handle incoming MQTT message and route to appropriate handler.
*/
private void handleIncomingMessage(Mqtt5Publish publish) {
String topic = publish.getTopic().toString();
byte[] payload = publish.getPayloadAsBytes();
String json = new String(payload, StandardCharsets.UTF_8);
// Log incoming message with topic and JSON
log.info("[MQTT IN] {} <- {}", topic, json);
try {
// Check if it's an ACK message (topic ends with /ack)
if (topic.startsWith("/server/") && topic.endsWith("/ack")) {
handleAckMessage(topic, payload);
}
// Check if it's a client message
else if (topic.startsWith("/server/")) {
handleClientMessage(topic, payload);
}
} catch (Exception e) {
log.error("[MQTT] Error handling message on topic {}: {}", topic, e.getMessage());
}
}
/**
* Handle ACK message from client. Topic format: /server/{clientId}/ack
* (messageId in payload)
*/
private void handleAckMessage(String topic, byte[] payload) {
if (ackHandler == null) {
return;
}
// Extract clientId from topic: /server/{clientId}/ack
String[] parts = topic.split("/");
if (parts.length >= 4) {
// Extract messageId from payload
String payloadStr = new String(payload, StandardCharsets.UTF_8);
String messageId = extractMessageIdFromPayload(payloadStr);
if (messageId != null) {
ackHandler.onAckReceived(messageId, payload);
}
}
}
/**
* Extract messageId from ACK payload. Expected payload format: JSON with
* "messageId" field, e.g., {"messageId": "abc-123"} or plain messageId string.
*/
private String extractMessageIdFromPayload(String payload) {
if (payload == null || payload.isBlank()) {
return null;
}
payload = payload.trim();
// Try to extract from JSON format: {"messageId": "..."}
if (payload.startsWith("{")) {
// Simple JSON parsing for messageId field
int keyIndex = payload.indexOf("\"messageId\"");
if (keyIndex == -1) {
keyIndex = payload.indexOf("'messageId'");
}
if (keyIndex >= 0) {
int colonIndex = payload.indexOf(":", keyIndex);
if (colonIndex >= 0) {
int valueStart = payload.indexOf("\"", colonIndex);
if (valueStart == -1) {
valueStart = payload.indexOf("'", colonIndex);
}
if (valueStart >= 0) {
char quote = payload.charAt(valueStart);
int valueEnd = payload.indexOf(quote, valueStart + 1);
if (valueEnd > valueStart) {
return payload.substring(valueStart + 1, valueEnd);
}
}
}
}
}
// If not JSON, treat the entire payload as the messageId
return payload;
}
/**
* Handle client message. Topic format: /server/{clientId}/{messageType} or
* /server/{messageType} (for login) messageType can contain slashes, e.g.,
* "jobs/assigned"
*/
private void handleClientMessage(String topic, byte[] payload) {
// Extract clientId and messageType from topic
String[] parts = topic.split("/");
// Handle /server/login (without clientId)
if (parts.length == 3 && "login".equals(parts[2])) {
String messageType = parts[2];
ClientMessageHandler handler = messageHandlers.get(messageType);
if (handler != null) {
handler.onMessageReceived(null, payload);
}
return;
}
// Handle /server/{clientId}/{messageType} where messageType can contain slashes
if (parts.length >= 4) {
String clientId = parts[2];
// Join all parts from index 3 onwards to form the full messageType
// e.g., /server/clientId/jobs/assigned -> messageType = "jobs/assigned"
String messageType = String.join("/", java.util.Arrays.copyOfRange(parts, 3, parts.length));
ClientMessageHandler handler = messageHandlers.get(messageType);
if (handler != null) {
handler.onMessageReceived(clientId, payload);
}
}
}
/**
* Send message to a specific MQTT topic.
*/
private CompletableFuture<Void> sendToTopic(String topic, byte[] payload, SendOptions options) {
try {
var publishBuilder = Mqtt5Publish.builder().topic(topic).payload(payload).qos(mapQos(options.getQos()))
.retain(options.isRetained());
return mqttClient.publish(publishBuilder.build()).thenApply(publishResult -> null);
} catch (Exception e) {
log.error("[MQTT] Failed to publish to topic {}: {}", topic, e.getMessage());
return CompletableFuture.failedFuture(new PluginException("Failed to publish message", e));
}
}
/**
* Map QoS level to MQTT QoS.
*/
private MqttQos mapQos(int qos) {
return switch (qos) {
case 0 -> MqttQos.AT_MOST_ONCE;
case 1 -> MqttQos.AT_LEAST_ONCE;
case 2 -> MqttQos.EXACTLY_ONCE;
default -> MqttQos.AT_LEAST_ONCE;
};
}
/**
* Notify connection state listener.
*/
private void notifyConnectionState(ConnectionState state, String message) {
if (connectionListener != null) {
ConnectionStateEvent event = ConnectionStateEvent.builder().state(state).previousState(null)
.errorMessage(message).pluginName(PLUGIN_NAME).build();
try {
connectionListener.onConnectionStateChanged(event);
} catch (Exception e) {
log.error("[MQTT] Error in connection listener: {}", e.getMessage());
}
}
}
}

View File

@@ -1,56 +0,0 @@
package de.assecutor.votianlt.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.bson.types.ObjectId;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.core.mapping.Field;
import java.time.LocalDateTime;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Document(collection = "pending_mqtt_messages")
public class PendingMqttMessage {
@Id
private ObjectId id;
@Field("topic")
private String topic;
@Field("payload")
private byte[] payload;
@Field("qos")
private int qos;
@Field("retained")
private boolean retained;
@Field("created_at")
private LocalDateTime createdAt;
@Field("retry_count")
private int retryCount = 0;
@Field("last_retry_at")
private LocalDateTime lastRetryAt;
public PendingMqttMessage(String topic, byte[] payload, int qos, boolean retained) {
this.topic = topic;
this.payload = payload;
this.qos = qos;
this.retained = retained;
this.createdAt = LocalDateTime.now();
this.retryCount = 0;
}
public void incrementRetryCount() {
this.retryCount++;
this.lastRetryAt = LocalDateTime.now();
}
}

View File

@@ -1,68 +0,0 @@
package de.assecutor.votianlt.mqtt;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.assecutor.votianlt.messaging.delivery.MessageDeliveryService;
import de.assecutor.votianlt.messaging.model.DeliveryOptions;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.context.annotation.Lazy;
/**
* Simple MQTT publishing helper to send JSON payloads.
*
* This implementation now uses MessageDeliveryService for reliable delivery
* with acknowledgment tracking and retry mechanism.
*
* Note: In environments where Spring Integration MQTT is unavailable (e.g.,
* offline CI), this implementation degrades to a no-op publisher that logs the
* intended message.
*/
public interface MqttPublisher {
void publishAsJson(String topic, Object payload);
void publishAsJson(String topic, Object payload, boolean retained);
}
@Component
@Slf4j
class MqttPublisherImpl implements MqttPublisher {
private final ObjectMapper objectMapper;
private final MessageDeliveryService deliveryService;
public MqttPublisherImpl(@Lazy MessageDeliveryService deliveryService, ObjectMapper objectMapper) {
this.deliveryService = deliveryService;
this.objectMapper = objectMapper;
}
@Override
public void publishAsJson(String topic, Object payload) {
publishAsJson(topic, payload, false);
}
@Override
public void publishAsJson(String topic, Object payload, boolean retained) {
try {
// Parse topic to extract clientId and messageType
// Expected format: /client/{clientId}/{messageType}
String[] parts = topic.split("/");
if (parts.length < 4 || !"client".equals(parts[1])) {
log.warn("Invalid topic format: {}. Expected /client/{clientId}/{messageType}", topic);
return;
}
String clientId = parts[2];
String messageType = parts[3];
// Use MessageDeliveryService for reliable delivery
DeliveryOptions options = DeliveryOptions.builder().requiresAck(true).retained(retained).build();
deliveryService.sendToClient(clientId, messageType, payload, options).exceptionally(ex -> {
log.error("[MQTT] Failed to deliver to {}: {}", topic, ex.getMessage());
return null;
});
} catch (Exception e) {
log.error("[MQTT] Failed to publish to {}: {}", topic, e.getMessage());
}
}
}

View File

@@ -5,7 +5,7 @@ import de.assecutor.votianlt.model.CargoItem;
import de.assecutor.votianlt.model.Job; import de.assecutor.votianlt.model.Job;
import de.assecutor.votianlt.model.JobStatus; import de.assecutor.votianlt.model.JobStatus;
import de.assecutor.votianlt.model.task.BaseTask; import de.assecutor.votianlt.model.task.BaseTask;
import de.assecutor.votianlt.mqtt.MqttPublisher; import de.assecutor.votianlt.messaging.MessagingPublisher;
import de.assecutor.votianlt.repository.JobRepository; import de.assecutor.votianlt.repository.JobRepository;
import de.assecutor.votianlt.repository.TaskRepository; import de.assecutor.votianlt.repository.TaskRepository;
import de.assecutor.votianlt.security.SecurityService; import de.assecutor.votianlt.security.SecurityService;
@@ -36,7 +36,7 @@ public class AddJobService {
private final JobHistoryService jobHistoryService; private final JobHistoryService jobHistoryService;
private final EmailService emailService; private final EmailService emailService;
private final ClientConnectionService clientConnectionService; private final ClientConnectionService clientConnectionService;
private final MqttPublisher mqttPublisher; private final MessagingPublisher messagingPublisher;
/** /**
* Speichert einen neuen Auftrag samt CargoItems und Tasks * Speichert einen neuen Auftrag samt CargoItems und Tasks
@@ -123,7 +123,7 @@ public class AddJobService {
e.getMessage()); e.getMessage());
} }
// MQTT-Benachrichtigung an Client senden, wenn online // Benachrichtigung an Client senden, wenn online
notifyClientJobCreated(savedJob); notifyClientJobCreated(savedJob);
log.info("Auftrag erfolgreich gespeichert: {}", savedJob.getJobNumber()); log.info("Auftrag erfolgreich gespeichert: {}", savedJob.getJobNumber());
@@ -182,7 +182,7 @@ public class AddJobService {
} }
/** /**
* Sendet den neu erstellten Job per MQTT an den zugewiesenen Client, falls dieser * Sendet den neu erstellten Job per WebSocket an den zugewiesenen Client, falls dieser
* online ist. * online ist.
*/ */
private void notifyClientJobCreated(Job job) { private void notifyClientJobCreated(Job job) {
@@ -208,10 +208,9 @@ public class AddJobService {
// Erstelle DTO mit allen Daten // Erstelle DTO mit allen Daten
JobWithRelatedDataDTO jobData = new JobWithRelatedDataDTO(job, cargoItems, tasks); JobWithRelatedDataDTO jobData = new JobWithRelatedDataDTO(job, cargoItems, tasks);
String topic = "/client/" + appUserId + "/job_created"; log.info("[JOB] Sending job_created to {}: jobId={}, jobNumber={}", appUserId, job.getId().toHexString(),
log.info("[JOB] Sending job_created to {}: jobId={}, jobNumber={}", topic, job.getId().toHexString(),
job.getJobNumber()); job.getJobNumber());
mqttPublisher.publishAsJson(topic, jobData, false); messagingPublisher.publishAsJson(appUserId, "job_created", jobData);
} catch (Exception e) { } catch (Exception e) {
log.warn("[JOB] Failed to send job_created notification: {}", e.getMessage()); log.warn("[JOB] Failed to send job_created notification: {}", e.getMessage());
} }

View File

@@ -40,7 +40,6 @@ public class AdminDashboardView extends Main {
private final BarcodeRepository barcodeRepository; private final BarcodeRepository barcodeRepository;
private final SignatureRepository signatureRepository; private final SignatureRepository signatureRepository;
private final CommentRepository commentRepository; private final CommentRepository commentRepository;
private final PendingMqttMessageRepository pendingMqttMessageRepository;
private final Div statisticsContainer; private final Div statisticsContainer;
@@ -48,8 +47,7 @@ public class AdminDashboardView extends Main {
public AdminDashboardView(JobRepository jobRepository, TaskRepository taskRepository, UserRepository userRepository, public AdminDashboardView(JobRepository jobRepository, TaskRepository taskRepository, UserRepository userRepository,
AppUserRepository appUserRepository, CargoItemRepository cargoItemRepository, AppUserRepository appUserRepository, CargoItemRepository cargoItemRepository,
PhotoRepository photoRepository, BarcodeRepository barcodeRepository, PhotoRepository photoRepository, BarcodeRepository barcodeRepository,
SignatureRepository signatureRepository, CommentRepository commentRepository, SignatureRepository signatureRepository, CommentRepository commentRepository) {
PendingMqttMessageRepository pendingMqttMessageRepository) {
this.jobRepository = jobRepository; this.jobRepository = jobRepository;
this.taskRepository = taskRepository; this.taskRepository = taskRepository;
@@ -60,7 +58,6 @@ public class AdminDashboardView extends Main {
this.barcodeRepository = barcodeRepository; this.barcodeRepository = barcodeRepository;
this.signatureRepository = signatureRepository; this.signatureRepository = signatureRepository;
this.commentRepository = commentRepository; this.commentRepository = commentRepository;
this.pendingMqttMessageRepository = pendingMqttMessageRepository;
setSizeFull(); setSizeFull();
addClassNames(LumoUtility.BoxSizing.BORDER, LumoUtility.Display.FLEX, LumoUtility.FlexDirection.COLUMN, addClassNames(LumoUtility.BoxSizing.BORDER, LumoUtility.Display.FLEX, LumoUtility.FlexDirection.COLUMN,
@@ -291,11 +288,8 @@ public class AdminDashboardView extends Main {
cards.add(createStatCard("Datenbank", "Fehler", VaadinIcon.DATABASE, "red")); cards.add(createStatCard("Datenbank", "Fehler", VaadinIcon.DATABASE, "red"));
} }
// Pending MQTT messages // Messaging status
long pendingMqttMessages = pendingMqttMessageRepository.count(); cards.add(createStatCard("WebSocket", "Aktiv", VaadinIcon.CONNECT, "green"));
String mqttStatus = pendingMqttMessages == 0 ? "OK" : "Warteschlange: " + pendingMqttMessages;
String mqttColor = pendingMqttMessages == 0 ? "green" : "orange";
cards.add(createStatCard("MQTT", mqttStatus, VaadinIcon.CONNECT, mqttColor));
// System uptime (placeholder) // System uptime (placeholder)
cards.add(createStatCard("Anwendung", "Läuft", VaadinIcon.HEART, "green")); cards.add(createStatCard("Anwendung", "Läuft", VaadinIcon.HEART, "green"));

View File

@@ -20,7 +20,7 @@ import com.vaadin.flow.router.PageTitle;
import com.vaadin.flow.router.Route; import com.vaadin.flow.router.Route;
import de.assecutor.votianlt.model.Job; import de.assecutor.votianlt.model.Job;
import de.assecutor.votianlt.model.JobStatus; import de.assecutor.votianlt.model.JobStatus;
import de.assecutor.votianlt.mqtt.MqttPublisher; import de.assecutor.votianlt.messaging.MessagingPublisher;
import de.assecutor.votianlt.util.DateTimeFormatUtil; import de.assecutor.votianlt.util.DateTimeFormatUtil;
import de.assecutor.votianlt.repository.JobRepository; import de.assecutor.votianlt.repository.JobRepository;
import de.assecutor.votianlt.security.SecurityService; import de.assecutor.votianlt.security.SecurityService;
@@ -47,18 +47,18 @@ public class ShowJobsView extends VerticalLayout {
private final JobHistoryService jobHistoryService; private final JobHistoryService jobHistoryService;
private final SecurityService securityService; private final SecurityService securityService;
private final ClientConnectionService clientConnectionService; private final ClientConnectionService clientConnectionService;
private final MqttPublisher mqttPublisher; private final MessagingPublisher messagingPublisher;
private final Grid<Job> grid = new Grid<>(Job.class, false); private final Grid<Job> grid = new Grid<>(Job.class, false);
@Autowired @Autowired
public ShowJobsView(JobRepository jobRepository, JobHistoryService jobHistoryService, public ShowJobsView(JobRepository jobRepository, JobHistoryService jobHistoryService,
SecurityService securityService, ClientConnectionService clientConnectionService, SecurityService securityService, ClientConnectionService clientConnectionService,
MqttPublisher mqttPublisher) { MessagingPublisher messagingPublisher) {
this.jobRepository = jobRepository; this.jobRepository = jobRepository;
this.jobHistoryService = jobHistoryService; this.jobHistoryService = jobHistoryService;
this.securityService = securityService; this.securityService = securityService;
this.clientConnectionService = clientConnectionService; this.clientConnectionService = clientConnectionService;
this.mqttPublisher = mqttPublisher; this.messagingPublisher = messagingPublisher;
setSizeFull(); setSizeFull();
setPadding(true); setPadding(true);
setSpacing(true); setSpacing(true);
@@ -230,9 +230,8 @@ public class ShowJobsView extends VerticalLayout {
"jobNumber", job.getJobNumber() != null ? job.getJobNumber() : "", "jobNumber", job.getJobNumber() != null ? job.getJobNumber() : "",
"deletedAt", LocalDateTime.now().toString()); "deletedAt", LocalDateTime.now().toString());
String topic = "/client/" + appUserId + "/job_deleted"; log.info("[JOB] Sending job_deleted to {}: {}", appUserId, payload);
log.info("[JOB] Sending job_deleted to {}: {}", topic, payload); messagingPublisher.publishAsJson(appUserId, "job_deleted", payload);
mqttPublisher.publishAsJson(topic, payload, false);
} }
private void loadData() { private void loadData() {

View File

@@ -1,69 +0,0 @@
package de.assecutor.votianlt.repository;
import de.assecutor.votianlt.messaging.model.DeliveryStatus;
import de.assecutor.votianlt.messaging.model.PendingDelivery;
import org.bson.types.ObjectId;
import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.stereotype.Repository;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
/**
* Repository for PendingDelivery entities.
*/
@Repository
public interface PendingDeliveryRepository extends MongoRepository<PendingDelivery, ObjectId> {
/**
* Find pending delivery by message ID
*/
Optional<PendingDelivery> findByMessageId(String messageId);
/**
* Find all deliveries with a specific status
*/
List<PendingDelivery> findByStatus(DeliveryStatus status);
/**
* Find deliveries ready for retry (status = SENT and nextRetryAt is in the
* past)
*/
List<PendingDelivery> findByStatusAndNextRetryAtBefore(DeliveryStatus status, LocalDateTime dateTime);
/**
* Find acknowledged deliveries older than specified time
*/
List<PendingDelivery> findByStatusAndAcknowledgedAtBefore(DeliveryStatus status, LocalDateTime dateTime);
/**
* Find deliveries with specific statuses that have expired
*/
List<PendingDelivery> findByStatusInAndExpiresAtBefore(List<DeliveryStatus> statuses, LocalDateTime dateTime);
/**
* Find deliveries with specific statuses
*/
List<PendingDelivery> findByStatusIn(List<DeliveryStatus> statuses);
/**
* Find all deliveries for a specific client
*/
List<PendingDelivery> findByClientId(String clientId);
/**
* Find all deliveries for a specific topic
*/
List<PendingDelivery> findByTopic(String topic);
/**
* Count deliveries by status
*/
long countByStatus(DeliveryStatus status);
/**
* Delete deliveries older than specified time
*/
void deleteByCreatedAtBefore(LocalDateTime dateTime);
}

View File

@@ -1,33 +0,0 @@
package de.assecutor.votianlt.repository;
import de.assecutor.votianlt.model.PendingMqttMessage;
import org.bson.types.ObjectId;
import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.stereotype.Repository;
import java.time.LocalDateTime;
import java.util.List;
@Repository
public interface PendingMqttMessageRepository extends MongoRepository<PendingMqttMessage, ObjectId> {
/**
* Find all pending messages ordered by creation time (oldest first)
*/
List<PendingMqttMessage> findAllByOrderByCreatedAtAsc();
/**
* Find messages that haven't been retried for a while (for cleanup)
*/
List<PendingMqttMessage> findByLastRetryAtBeforeOrLastRetryAtIsNull(LocalDateTime before);
/**
* Count pending messages
*/
long count();
/**
* Delete messages older than specified date (for cleanup)
*/
void deleteByCreatedAtBefore(LocalDateTime before);
}

View File

@@ -27,11 +27,13 @@ public class SecurityConfig extends VaadinWebSecurity {
new AntPathRequestMatcher("/frontend/**"), new AntPathRequestMatcher("/webjars/**"), new AntPathRequestMatcher("/frontend/**"), new AntPathRequestMatcher("/webjars/**"),
new AntPathRequestMatcher("/h2-console/**"), new AntPathRequestMatcher("/h2-console/**"),
new AntPathRequestMatcher("/frontend-es5/**", "/frontend-es6/**"), new AntPathRequestMatcher("/frontend-es5/**", "/frontend-es6/**"),
new AntPathRequestMatcher("/mcp/**")) new AntPathRequestMatcher("/mcp/**"),
new AntPathRequestMatcher("/ws/**"))
.permitAll()); .permitAll());
// Standard-CSRF-Konfiguration // Standard-CSRF-Konfiguration
http.csrf(csrf -> csrf.ignoringRequestMatchers(new AntPathRequestMatcher("/h2-console/**"))); http.csrf(csrf -> csrf.ignoringRequestMatchers(new AntPathRequestMatcher("/h2-console/**"),
new AntPathRequestMatcher("/ws/**")));
// Delegiere die Basis-Konfiguration an VaadinWebSecurity // Delegiere die Basis-Konfiguration an VaadinWebSecurity
// Dies fügt automatisch .anyRequest().authenticated() hinzu // Dies fügt automatisch .anyRequest().authenticated() hinzu

View File

@@ -1,248 +1,46 @@
package de.assecutor.votianlt.service; package de.assecutor.votianlt.service;
import com.fasterxml.jackson.databind.ObjectMapper; import de.assecutor.votianlt.messaging.WebSocketService;
import de.assecutor.votianlt.messaging.delivery.MessageDeliveryService;
import de.assecutor.votianlt.messaging.plugin.PluginManager;
import de.assecutor.votianlt.messaging.plugin.SendOptions;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/** /**
* Service for managing client connections via Ping/Pong mechanism. Tracks * Service for managing client connections. Connection state is determined
* connected clients and periodically checks their connectivity. * directly from the WebSocket session lifecycle.
*/ */
@Service @Service
@Slf4j @Slf4j
public class ClientConnectionService { public class ClientConnectionService {
/** private final WebSocketService webSocketService;
* Represents the connection state of a client.
*/
public record ClientState(String clientId, String userId, boolean connected, Instant lastPingSent,
Instant lastPongReceived, Instant connectedAt) {
public ClientState withPingSent(Instant pingSent) {
return new ClientState(clientId, userId, connected, pingSent, lastPongReceived, connectedAt);
}
public ClientState withPongReceived(Instant pongReceived) { public ClientConnectionService(WebSocketService webSocketService) {
return new ClientState(clientId, userId, true, lastPingSent, pongReceived, connectedAt); this.webSocketService = webSocketService;
}
public ClientState withConnected(boolean isConnected) {
return new ClientState(clientId, userId, isConnected, lastPingSent, lastPongReceived, connectedAt);
}
}
private final Map<String, ClientState> connectedClients = new ConcurrentHashMap<>();
private final PluginManager pluginManager;
private final ObjectMapper objectMapper;
private final MessageDeliveryService messageDeliveryService;
@Value("${app.client.ping.interval-seconds:15}")
private int pingIntervalSeconds;
@Value("${app.client.ping.timeout-seconds:5}")
private int pingTimeoutSeconds;
public ClientConnectionService(PluginManager pluginManager, ObjectMapper objectMapper,
@Lazy MessageDeliveryService messageDeliveryService) {
this.pluginManager = pluginManager;
this.objectMapper = objectMapper;
this.messageDeliveryService = messageDeliveryService;
} }
/** /**
* Registers a client as connected after successful login. * Called after successful login.
* *
* @param clientId * @param appUserId
* The unique client identifier * The app user ID (MongoDB ObjectId)
* @param userId
* The user ID associated with this client
*/ */
public void registerClient(String clientId, String userId) { public void registerClient(String appUserId) {
if (clientId == null || clientId.isBlank()) { if (appUserId == null || appUserId.isBlank()) {
return; return;
} }
ClientState previousState = connectedClients.get(clientId); log.info("[CLIENT] Registered: {}", appUserId);
boolean wasDisconnected = previousState != null && !previousState.connected();
Instant now = Instant.now();
ClientState state = new ClientState(clientId, userId, true, null, now, now);
connectedClients.put(clientId, state);
log.info("[CLIENT] Connected: {}", clientId);
// If client was previously disconnected, retry pending messages
if (wasDisconnected) {
messageDeliveryService.retryPendingDeliveriesForClient(clientId);
}
} }
/** /**
* Unregisters a client (e.g., on explicit logout). * Checks if a client is currently connected via WebSocket.
* *
* @param clientId * @param appUserId
* The client identifier to unregister * The app user ID
* @return true if the client has an active WebSocket session
*/ */
public void unregisterClient(String clientId) { public boolean isClientConnected(String appUserId) {
ClientState removed = connectedClients.remove(clientId); return webSocketService.isClientConnected(appUserId);
if (removed != null) {
log.info("[CLIENT] Disconnected: {}", clientId);
}
}
/**
* Handles a pong response from a client. Searches by both clientId and userId
* since pong is sent to /server/{userId}/pong.
*
* @param id
* The client or user identifier that sent the pong
*/
public void handlePong(String id) {
if (id == null || id.isBlank()) {
return;
}
// First try direct lookup by clientId
String clientId = id;
ClientState state = connectedClients.get(id);
// If not found, search by userId
if (state == null) {
for (Map.Entry<String, ClientState> entry : connectedClients.entrySet()) {
if (id.equals(entry.getValue().userId())) {
clientId = entry.getKey();
state = entry.getValue();
break;
}
}
}
if (state != null) {
log.info("[PONG] Received from {}", clientId);
boolean wasDisconnected = !state.connected();
ClientState updatedState = state.withPongReceived(Instant.now());
connectedClients.put(clientId, updatedState);
// If client was disconnected and is now reconnected, retry pending messages
if (wasDisconnected) {
messageDeliveryService.retryPendingDeliveriesForClient(clientId);
}
}
}
/**
* Checks if a client is currently connected. Searches by both clientId and
* userId.
*
* @param id
* The client or user identifier
* @return true if the client is connected
*/
public boolean isClientConnected(String id) {
if (id == null || id.isBlank()) {
return false;
}
// First try direct lookup by clientId
ClientState state = connectedClients.get(id);
if (state != null && state.connected()) {
return true;
}
// Then search by userId
return connectedClients.values().stream().anyMatch(s -> s.connected() && id.equals(s.userId()));
}
/**
* Gets all connected client IDs.
*
* @return Set of connected client IDs
*/
public Set<String> getConnectedClientIds() {
return connectedClients.entrySet().stream().filter(e -> e.getValue().connected()).map(Map.Entry::getKey)
.collect(java.util.stream.Collectors.toSet());
}
/**
* Gets the connection state for a specific client.
*
* @param clientId
* The client identifier
* @return ClientState or null if not found
*/
public ClientState getClientState(String clientId) {
return connectedClients.get(clientId);
}
/**
* Scheduled task to send pings to all connected clients. Runs based on the
* configured interval (app.client.ping.interval-seconds).
*/
@Scheduled(fixedRateString = "${app.client.ping.interval-seconds:15}000")
public void sendPingsToAllClients() {
if (!pluginManager.isConnected() || getConnectedClientCount() == 0) {
return;
}
Instant now = Instant.now();
for (Map.Entry<String, ClientState> entry : connectedClients.entrySet()) {
String clientId = entry.getKey();
ClientState state = entry.getValue();
// Check if previous ping timed out
if (state.lastPingSent() != null && state.connected()) {
Instant expectedPongBy = state.lastPingSent().plusSeconds(pingTimeoutSeconds);
boolean pongReceivedAfterPing = state.lastPongReceived() != null
&& state.lastPongReceived().isAfter(state.lastPingSent());
if (now.isAfter(expectedPongBy) && !pongReceivedAfterPing) {
// Client did not respond in time - mark as disconnected
ClientState disconnectedState = state.withConnected(false);
connectedClients.put(clientId, disconnectedState);
log.info("[CLIENT] Timeout: {}", clientId);
continue;
}
}
// Send ping to connected clients (use userId for topic)
if (state.connected() && state.userId() != null) {
sendPing(state.userId());
ClientState updatedState = state.withPingSent(now);
connectedClients.put(clientId, updatedState);
}
}
}
/**
* Sends a ping message to a specific user.
*
* @param userId
* The target user ID (MongoDB ObjectId)
*/
private void sendPing(String userId) {
try {
Map<String, Object> pingPayload = Map.of("type", "ping", "timestamp", Instant.now().toEpochMilli());
String json = objectMapper.writeValueAsString(pingPayload);
byte[] payload = json.getBytes(StandardCharsets.UTF_8);
SendOptions options = SendOptions.builder().qos(1).retained(false).build();
log.info("[PING] Sent to {}", userId);
pluginManager.sendToClient(userId, "ping", payload, options);
} catch (Exception e) {
log.error("[PING] Error sending to {}: {}", userId, e.getMessage());
}
} }
/** /**
@@ -251,15 +49,6 @@ public class ClientConnectionService {
* @return Number of connected clients * @return Number of connected clients
*/ */
public int getConnectedClientCount() { public int getConnectedClientCount() {
return (int) connectedClients.values().stream().filter(ClientState::connected).count(); return webSocketService.getConnectedClientCount();
}
/**
* Gets the total number of registered clients (connected and disconnected).
*
* @return Total number of registered clients
*/
public int getTotalClientCount() {
return connectedClients.size();
} }
} }

View File

@@ -8,7 +8,7 @@ import de.assecutor.votianlt.model.MessageType;
import de.assecutor.votianlt.dto.ChatMessageInboundPayload; import de.assecutor.votianlt.dto.ChatMessageInboundPayload;
import de.assecutor.votianlt.dto.ChatMessageOutboundPayload; import de.assecutor.votianlt.dto.ChatMessageOutboundPayload;
import de.assecutor.votianlt.event.MessageReceivedEvent; import de.assecutor.votianlt.event.MessageReceivedEvent;
import de.assecutor.votianlt.mqtt.MqttPublisher; import de.assecutor.votianlt.messaging.MessagingPublisher;
import de.assecutor.votianlt.repository.JobRepository; import de.assecutor.votianlt.repository.JobRepository;
import de.assecutor.votianlt.repository.MessageRepository; import de.assecutor.votianlt.repository.MessageRepository;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@@ -26,14 +26,14 @@ public class MessageService {
private final MessageRepository messageRepository; private final MessageRepository messageRepository;
private final JobRepository jobRepository; private final JobRepository jobRepository;
private final MqttPublisher mqttPublisher; private final MessagingPublisher messagingPublisher;
private final ApplicationEventPublisher eventPublisher; private final ApplicationEventPublisher eventPublisher;
public MessageService(MessageRepository messageRepository, JobRepository jobRepository, MqttPublisher mqttPublisher, public MessageService(MessageRepository messageRepository, JobRepository jobRepository, MessagingPublisher messagingPublisher,
ApplicationEventPublisher eventPublisher) { ApplicationEventPublisher eventPublisher) {
this.messageRepository = messageRepository; this.messageRepository = messageRepository;
this.jobRepository = jobRepository; this.jobRepository = jobRepository;
this.mqttPublisher = mqttPublisher; this.messagingPublisher = messagingPublisher;
this.eventPublisher = eventPublisher; this.eventPublisher = eventPublisher;
} }
@@ -45,7 +45,7 @@ public class MessageService {
} }
/** /**
* Send a general message to a client via MQTT * Send a general message to a client via WebSocket
* *
* @param content * @param content
* Message content * Message content
@@ -59,12 +59,12 @@ public class MessageService {
public Message sendGeneralMessageToClient(String content, String receiver, MessageContentType contentType) { public Message sendGeneralMessageToClient(String content, String receiver, MessageContentType contentType) {
Message message = new Message(content, receiver, MessageOrigin.SERVER, contentType); Message message = new Message(content, receiver, MessageOrigin.SERVER, contentType);
message = saveMessage(message); message = saveMessage(message);
publishMessageToMqtt(message, receiver); publishMessage(message, receiver);
return message; return message;
} }
/** /**
* Send a job-related message to a client via MQTT * Send a job-related message to a client via WebSocket
* *
* @param content * @param content
* Message content * Message content
@@ -85,7 +85,7 @@ public class MessageService {
Message message = new Message(content, receiver, MessageOrigin.SERVER, contentType, context.jobId(), Message message = new Message(content, receiver, MessageOrigin.SERVER, contentType, context.jobId(),
context.jobNumber()); context.jobNumber());
message = saveMessage(message); message = saveMessage(message);
publishMessageToMqtt(message, receiver); publishMessage(message, receiver);
return message; return message;
} }
@@ -111,15 +111,14 @@ public class MessageService {
} }
/** /**
* Publish message to MQTT topic for the receiver * Publish message to topic for the receiver
*/ */
private void publishMessageToMqtt(Message message, String receiver) { private void publishMessage(Message message, String receiver) {
try { try {
String topic = "/client/" + receiver + "/message";
ChatMessageOutboundPayload payload = ChatMessageOutboundPayload.fromMessage(message); ChatMessageOutboundPayload payload = ChatMessageOutboundPayload.fromMessage(message);
mqttPublisher.publishAsJson(topic, payload, false); messagingPublisher.publishAsJson(receiver, "message", payload);
} catch (Exception e) { } catch (Exception e) {
log.error("[MQTT] Error publishing message: {}", e.getMessage()); log.error("[Messaging] Error publishing message: {}", e.getMessage());
} }
} }

View File

@@ -59,45 +59,11 @@ spring.jackson.default-property-inclusion=non_null
# 2FA Configuration (global toggle - individual users can disable in their profile) # 2FA Configuration (global toggle - individual users can disable in their profile)
app.security.two-factor.enabled=true app.security.two-factor.enabled=true
# Message Delivery Layer Configuration # WebSocket Configuration
app.messaging.delivery.max-retries=3 app.messaging.websocket.path=/ws/messaging
app.messaging.delivery.retry-initial-delay=5s app.messaging.websocket.max-text-message-size=65536
app.messaging.delivery.retry-max-delay=5m app.messaging.websocket.max-session-idle-timeout=300000
app.messaging.delivery.retry-backoff-multiplier=2.0 app.messaging.websocket.allowed-origins=*
app.messaging.delivery.ack-timeout=30s
app.messaging.delivery.message-expiry=24h
app.messaging.delivery.cleanup-interval-minutes=60
app.messaging.delivery.retry-interval-seconds=30
app.messaging.delivery.acknowledged-retention-days=7
app.messaging.delivery.ack-retry-interval-seconds=5
app.messaging.delivery.ack-max-retries=4
# Messaging Plugin Configuration
app.messaging.plugin.type=mqtt
app.messaging.plugin.mqtt.broker.host=mqtt-2.assecutor.de
app.messaging.plugin.mqtt.broker.port=42099
app.messaging.plugin.mqtt.username=app
app.messaging.plugin.mqtt.password=apppwd
app.messaging.plugin.mqtt.client.id=votianlt-server
# Client Connection Monitoring (Ping/Pong)
# Server sends ping to: /client/{clientId}/ping
# Client responds to: /server/{clientId}/pong
#
# Ping JSON (Server -> Client):
# {
# "type": "ping",
# "timestamp": 1702835000000
# }
#
# Pong JSON (Client -> Server):
# {
# "type": "pong",
# "timestamp": 1702835000000
# }
#
app.client.ping.interval-seconds=15
app.client.ping.timeout-seconds=5
# Application Version - automatically set from pom.xml during build # Application Version - automatically set from pom.xml during build
app.version=@project.version@ app.version=@project.version@