diff --git a/pom.xml b/pom.xml index 4cb2cd8..b42fc0b 100644 --- a/pom.xml +++ b/pom.xml @@ -64,20 +64,10 @@ spring-boot-starter-actuator - + org.springframework.boot - spring-boot-starter-integration - - - org.springframework.integration - spring-integration-mqtt - - - - com.hivemq - hivemq-mqtt-client - 1.3.3 + spring-boot-starter-websocket org.springframework.boot diff --git a/src/main/java/de/assecutor/votianlt/controller/MessageController.java b/src/main/java/de/assecutor/votianlt/controller/MessageController.java index 459a2c3..35a5e99 100644 --- a/src/main/java/de/assecutor/votianlt/controller/MessageController.java +++ b/src/main/java/de/assecutor/votianlt/controller/MessageController.java @@ -24,35 +24,26 @@ import de.assecutor.votianlt.model.Comment; import de.assecutor.votianlt.service.JobHistoryService; import de.assecutor.votianlt.service.EmailService; import de.assecutor.votianlt.service.MessageService; -import de.assecutor.votianlt.service.ClientConnectionService; import de.assecutor.votianlt.model.JobStatus; import lombok.extern.slf4j.Slf4j; -import com.fasterxml.jackson.databind.ObjectMapper; -import de.assecutor.votianlt.mqtt.MqttPublisher; +import de.assecutor.votianlt.messaging.MessagingPublisher; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; import org.bson.types.ObjectId; /** - * MQTT message controller for handling real-time communication with apps. - * Provides endpoints for sending and receiving messages via WebSocket/MQTT. + * Message controller for handling real-time communication with apps. + * Provides endpoints for sending and receiving messages via WebSocket. */ @Component @Slf4j public class MessageController { - // Map to store userId -> clientId mapping for active sessions - private final Map userClientIdMapping = new ConcurrentHashMap<>(); - - // Map to store clientId -> userId mapping for active sessions (reverse lookup) - private final Map clientIdUserMapping = new ConcurrentHashMap<>(); - - private final MqttPublisher mqttPublisher; + private final MessagingPublisher messagingPublisher; private final AppUserRepository appUserRepository; @@ -70,16 +61,13 @@ public class MessageController { private final JobHistoryService jobHistoryService; private final EmailService emailService; private final MessageService messageService; - private final ObjectMapper objectMapper; - private final ClientConnectionService clientConnectionService; - public MessageController(MqttPublisher mqttPublisher, AppUserRepository appUserRepository, + public MessageController(MessagingPublisher messagingPublisher, AppUserRepository appUserRepository, AppUserService appUserService, JobRepository jobRepository, CargoItemRepository cargoItemRepository, TaskRepository taskRepository, PhotoRepository photoRepository, BarcodeRepository barcodeRepository, SignatureRepository signatureRepository, CommentRepository commentRepository, - JobHistoryService jobHistoryService, EmailService emailService, MessageService messageService, - ObjectMapper objectMapper, ClientConnectionService clientConnectionService) { - this.mqttPublisher = mqttPublisher; + JobHistoryService jobHistoryService, EmailService emailService, MessageService messageService) { + this.messagingPublisher = messagingPublisher; this.appUserRepository = appUserRepository; this.appUserService = appUserService; this.jobRepository = jobRepository; @@ -92,92 +80,52 @@ public class MessageController { this.jobHistoryService = jobHistoryService; this.emailService = emailService; this.messageService = messageService; - this.objectMapper = objectMapper; - this.clientConnectionService = clientConnectionService; } /** - * Authentication endpoint for mobile app users via MQTT. Client sends to - * /server/login with payload { email, password, clientId }. The response is - * sent back to the requesting client on /client/{clientId}/auth + * Authentication endpoint for mobile app users via WebSocket. Client sends to + * /server/login with payload { email, password }. Returns the result to the + * caller (MessagingConfig) which handles session registration and response + * sending. */ - public void handleAppLogin(AppLoginRequest request) { - AppLoginResponse response; - + public AppLoginResponse handleAppLogin(AppLoginRequest request) { if (request == null || request.getEmail() == null || request.getPassword() == null - || request.getClientId() == null || request.getEmail().isBlank() || request.getPassword().isBlank() - || request.getClientId().isBlank()) { - response = new AppLoginResponse(false, "E-Mail, Passwort und Client-ID sind erforderlich", null, null, - null); - } else { - AppUser user = appUserRepository.findByEmail(request.getEmail()); - if (user == null) { - response = new AppLoginResponse(false, "Benutzer nicht gefunden", null, null, null); - } else { - boolean ok = appUserService.verifyPassword(request.getPassword(), user.getPassword()); - if (!ok) { - response = new AppLoginResponse(false, "Ungültige Anmeldedaten", null, null, null); - } else { - response = new AppLoginResponse(true, "Anmeldung erfolgreich", null, null, user.getIdAsString()); - // Store clientId mapping for this user session - storeClientIdMapping(user.getIdAsString(), request.getClientId()); - // Register client for ping/pong monitoring - clientConnectionService.registerClient(request.getClientId(), user.getIdAsString()); - } - } + || request.getEmail().isBlank() || request.getPassword().isBlank()) { + return new AppLoginResponse(false, "E-Mail und Passwort sind erforderlich", null); } - // Send response via MQTT to specific client - if (request != null && request.getClientId() != null && !request.getClientId().isBlank()) { - mqttPublisher.publishAsJson("/client/" + request.getClientId() + "/auth", response, false); + AppUser user = appUserRepository.findByEmail(request.getEmail()); + if (user == null) { + return new AppLoginResponse(false, "Benutzer nicht gefunden", null); } + + boolean ok = appUserService.verifyPassword(request.getPassword(), user.getPassword()); + if (!ok) { + return new AppLoginResponse(false, "Ungültige Anmeldedaten", null); + } + + return new AppLoginResponse(true, "Anmeldung erfolgreich", user.getIdAsString()); } /** - * Endpoint to retrieve jobs assigned to a specific app user with related cargo - * items and tasks. Client sends to /server/{clientId}/jobs/assigned with - * payload { appUserId }. The response is sent back to the requesting client on - * /client/{clientId}/jobs + * Retrieve jobs assigned to a specific app user with related cargo items and + * tasks. The appUserId is determined from the authenticated WebSocket session. + * Response is sent back on /client/jobs. */ - public void handleGetAssignedJobs(Map request) { - if (request == null || !request.containsKey("appUserId")) { - return; - } - - String appUserId = request.get("appUserId").toString(); + public void handleGetAssignedJobs(String appUserId) { if (appUserId == null || appUserId.isBlank()) { return; } - // Attempt to get clientId from request (injected from topic) or from stored - // mapping - String clientId = null; - try { - Object cid = request.get("clientId"); - if (cid != null) - clientId = cid.toString(); - } catch (Exception e) { - // Ignore - } - if (clientId == null || clientId.isBlank()) { - clientId = getClientIdForUserId(appUserId); - } - - // Find jobs assigned to this app user List assignedJobs = jobRepository.findByAppUser(appUserId); - // For each job, fetch related cargo items and tasks (ordered by task order) List jobsWithRelatedData = assignedJobs.stream().map(job -> { List cargoItems = cargoItemRepository.findByJobId(job.getId()); List tasks = taskRepository.findByJobIdOrderByTaskOrderAsc(job.getId()); return new JobWithRelatedDataDTO(job, cargoItems, tasks); }).toList(); - // Publish to the requesting client's topic if clientId is known - if (clientId != null && !clientId.isBlank()) { - String topic = "/client/" + clientId + "/jobs"; - mqttPublisher.publishAsJson(topic, jobsWithRelatedData, false); - } + messagingPublisher.publishAsJson(appUserId, "jobs", jobsWithRelatedData); } /** @@ -460,49 +408,20 @@ public class MessageController { } /** - * Store the mapping between userId and clientId for active session - */ - private void storeClientIdMapping(String userId, String clientId) { - userClientIdMapping.put(userId, clientId); - clientIdUserMapping.put(clientId, userId); - } - - /** - * Get the clientId for a given userId - */ - private String getClientIdForUserId(String userId) { - return userClientIdMapping.get(userId); - } - - /** - * Handle pong response from a client. Client sends to /server/{clientId}/pong - * with payload { timestamp }. Used for connection monitoring. - */ - public void handlePong(Map payload) { - String clientId = payload.get("clientId") != null ? payload.get("clientId").toString() : null; - if (clientId != null && !clientId.isBlank()) { - clientConnectionService.handlePong(clientId); - } - } - - /** - * Handle incoming message from a client via MQTT. Client sends to - * /server/{clientId}/message with payload: { "content": "message payload", + * Handle incoming message from a client via WebSocket. Client sends to + * /server/message with payload: { "content": "message payload", * "contentType": "TEXT|IMAGE", "jobId": "optional job id", "jobNumber": * "optional job number" } * - * The clientId is extracted from the MQTT topic and represents the AppUser ID. - * This clientId is stored as the receiver field in the message. + * The appUserId is determined from the authenticated WebSocket session. */ - public void handleIncomingMessage(Map payload) { + public void handleIncomingMessage(String appUserId, Map payload) { try { - String clientId = payload.get("clientId") != null ? payload.get("clientId").toString() : null; - - if (clientId == null || clientId.isBlank()) { + if (appUserId == null || appUserId.isBlank()) { return; } - payload.put("receiver", clientId); + payload.put("receiver", appUserId); ChatMessageInboundPayload inboundPayload = ChatMessageInboundPayload.fromPayload(payload); messageService.receiveMessageFromClient(inboundPayload); } catch (Exception e) { diff --git a/src/main/java/de/assecutor/votianlt/dto/AppLoginRequest.java b/src/main/java/de/assecutor/votianlt/dto/AppLoginRequest.java index f6bdc28..fcd3cb7 100644 --- a/src/main/java/de/assecutor/votianlt/dto/AppLoginRequest.java +++ b/src/main/java/de/assecutor/votianlt/dto/AppLoginRequest.java @@ -1,5 +1,6 @@ package de.assecutor.votianlt.dto; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -7,8 +8,8 @@ import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor +@JsonIgnoreProperties(ignoreUnknown = true) public class AppLoginRequest { private String email; private String password; - private String clientId; } \ No newline at end of file diff --git a/src/main/java/de/assecutor/votianlt/dto/AppLoginResponse.java b/src/main/java/de/assecutor/votianlt/dto/AppLoginResponse.java index aeb0394..8cb614c 100644 --- a/src/main/java/de/assecutor/votianlt/dto/AppLoginResponse.java +++ b/src/main/java/de/assecutor/votianlt/dto/AppLoginResponse.java @@ -10,7 +10,6 @@ import lombok.NoArgsConstructor; public class AppLoginResponse { private boolean success; private String message; - private String token; - private String userId; + /** Only populated on success, for internal server-side routing. Not sent to client. */ private String appUserId; } \ No newline at end of file diff --git a/src/main/java/de/assecutor/votianlt/dto/ChatMessageInboundPayload.java b/src/main/java/de/assecutor/votianlt/dto/ChatMessageInboundPayload.java index e5eb7c5..50563f8 100644 --- a/src/main/java/de/assecutor/votianlt/dto/ChatMessageInboundPayload.java +++ b/src/main/java/de/assecutor/votianlt/dto/ChatMessageInboundPayload.java @@ -5,8 +5,8 @@ import java.util.Map; import org.bson.types.ObjectId; /** - * Normalized payload for chat messages sent by mobile clients via MQTT. - * receiver = AppUser ID (clientId) extracted from MQTT topic + * Normalized payload for chat messages sent by mobile clients via WebSocket. + * receiver = AppUser ID (clientId) extracted from topic */ public record ChatMessageInboundPayload(String receiver, String content, MessageContentType contentType, ObjectId jobId, String jobNumber) { diff --git a/src/main/java/de/assecutor/votianlt/dto/ChatMessageOutboundPayload.java b/src/main/java/de/assecutor/votianlt/dto/ChatMessageOutboundPayload.java index 854f6fe..ed550ec 100644 --- a/src/main/java/de/assecutor/votianlt/dto/ChatMessageOutboundPayload.java +++ b/src/main/java/de/assecutor/votianlt/dto/ChatMessageOutboundPayload.java @@ -7,8 +7,8 @@ import de.assecutor.votianlt.model.MessageType; import java.time.LocalDateTime; /** - * Outbound chat message payload published to MQTT subscribers. The receiver is - * implicit from the MQTT topic (/client/{appUserId}/message) + * Outbound chat message payload published to subscribers. The receiver is + * implicit from the WebSocket session (/client/message) */ public record ChatMessageOutboundPayload(String messageId, String content, MessageContentType contentType, MessageOrigin origin, MessageType messageType, LocalDateTime createdAt, String jobId, String jobNumber, diff --git a/src/main/java/de/assecutor/votianlt/messaging/MessagingConfig.java b/src/main/java/de/assecutor/votianlt/messaging/MessagingConfig.java new file mode 100644 index 0000000..2343c7d --- /dev/null +++ b/src/main/java/de/assecutor/votianlt/messaging/MessagingConfig.java @@ -0,0 +1,131 @@ +package de.assecutor.votianlt.messaging; + +import de.assecutor.votianlt.controller.MessageController; +import de.assecutor.votianlt.dto.AppLoginRequest; +import de.assecutor.votianlt.dto.AppLoginResponse; +import de.assecutor.votianlt.service.ClientConnectionService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.event.EventListener; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.nio.charset.StandardCharsets; +import java.util.Map; + +/** + * Configuration for the messaging system. Sets up message routing after + * application startup. + */ +@Configuration +@Slf4j +public class MessagingConfig { + + private final WebSocketService webSocketService; + private final ObjectMapper objectMapper; + + public MessagingConfig(WebSocketService webSocketService, ObjectMapper objectMapper) { + this.webSocketService = webSocketService; + this.objectMapper = objectMapper; + } + + /** + * Set up message routing after application startup. + */ + @EventListener(ApplicationReadyEvent.class) + public void setupMessaging(ApplicationReadyEvent event) { + try { + MessageController messageController = event.getApplicationContext().getBean(MessageController.class); + ClientConnectionService clientConnectionService = event.getApplicationContext() + .getBean(ClientConnectionService.class); + + setupSubscriptions(messageController, clientConnectionService); + + log.info("[Messaging] Message routing configured"); + + } catch (Exception e) { + log.error("[Messaging] Failed to initialize: {}", e.getMessage()); + throw new RuntimeException("Failed to initialize messaging", e); + } + } + + /** + * Setup message subscriptions on the WebSocket service. + */ + private void setupSubscriptions(MessageController messageController, + ClientConnectionService clientConnectionService) { + // Login handler: authenticate and register session + webSocketService.registerMessageHandler("login", (wsSessionId, payload) -> { + handleLoginMessage(wsSessionId, payload, messageController, clientConnectionService); + }); + + // Task completion handler + webSocketService.registerMessageHandler("task_completed", (appUserId, payload) -> { + handlePayload(payload, payloadMap -> { + String taskType = payloadMap.get("taskType") != null ? payloadMap.get("taskType").toString() : null; + messageController.handleTaskCompleted(payloadMap, taskType); + }); + }); + + // Jobs assigned handler + webSocketService.registerMessageHandler("jobs/assigned", (appUserId, payload) -> { + messageController.handleGetAssignedJobs(appUserId); + }); + + // Chat message handler + webSocketService.registerMessageHandler("message", (appUserId, payload) -> { + handlePayload(payload, payloadMap -> { + messageController.handleIncomingMessage(appUserId, payloadMap); + }); + }); + } + + /** + * Handle login message. The wsSessionId identifies the pending WebSocket + * session. On success, registers the session under the appUserId and sends an + * auth response. On failure, sends an error response to the pending session. + */ + @SuppressWarnings("unchecked") + private void handleLoginMessage(String wsSessionId, byte[] payload, MessageController messageController, + ClientConnectionService clientConnectionService) { + try { + String json = new String(payload, StandardCharsets.UTF_8); + Map payloadMap = objectMapper.readValue(json, Map.class); + AppLoginRequest loginRequest = objectMapper.convertValue(payloadMap, AppLoginRequest.class); + AppLoginResponse response = messageController.handleAppLogin(loginRequest); + + if (response.isSuccess()) { + String appUserId = response.getAppUserId(); + webSocketService.registerAuthenticatedSession(wsSessionId, appUserId); + clientConnectionService.registerClient(appUserId); + + // Send success response to the now-authenticated session + Map authResponse = Map.of("success", true, "message", response.getMessage()); + byte[] responseBytes = objectMapper.writeValueAsBytes(authResponse); + webSocketService.sendToClient(appUserId, "auth", responseBytes); + } else { + // Send failure response to the pending session + Map authResponse = Map.of("success", false, "message", response.getMessage()); + byte[] responseBytes = objectMapper.writeValueAsBytes(authResponse); + webSocketService.sendToSessionById(wsSessionId, "/client/auth", responseBytes); + } + } catch (Exception e) { + log.error("[Messaging] Login handling error: {}", e.getMessage()); + } + } + + /** + * Parse payload bytes to a Map and pass to the consumer. + */ + @SuppressWarnings("unchecked") + private void handlePayload(byte[] payload, java.util.function.Consumer> handler) { + try { + String json = new String(payload, StandardCharsets.UTF_8); + Map payloadMap = objectMapper.readValue(json, Map.class); + handler.accept(payloadMap); + } catch (Exception e) { + log.error("[Messaging] Error parsing payload: {}", e.getMessage()); + } + } +} diff --git a/src/main/java/de/assecutor/votianlt/messaging/MessagingPublisher.java b/src/main/java/de/assecutor/votianlt/messaging/MessagingPublisher.java new file mode 100644 index 0000000..d9119d0 --- /dev/null +++ b/src/main/java/de/assecutor/votianlt/messaging/MessagingPublisher.java @@ -0,0 +1,43 @@ +package de.assecutor.votianlt.messaging; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.nio.charset.StandardCharsets; + +/** + * Publishing helper to send JSON payloads to clients via WebSocket. + */ +public interface MessagingPublisher { + void publishAsJson(String clientId, String messageType, Object payload); +} + +@Component +@Slf4j +class MessagingPublisherImpl implements MessagingPublisher { + + private final WebSocketService webSocketService; + private final ObjectMapper objectMapper; + + public MessagingPublisherImpl(WebSocketService webSocketService, ObjectMapper objectMapper) { + this.webSocketService = webSocketService; + this.objectMapper = objectMapper; + } + + @Override + public void publishAsJson(String clientId, String messageType, Object payload) { + try { + String json = objectMapper.writeValueAsString(payload); + byte[] data = json.getBytes(StandardCharsets.UTF_8); + + webSocketService.sendToClient(clientId, messageType, data).exceptionally(ex -> { + log.error("[Messaging] Failed to deliver to {}/{}: {}", clientId, messageType, ex.getMessage()); + return null; + }); + + } catch (Exception e) { + log.error("[Messaging] Failed to publish to {}/{}: {}", clientId, messageType, e.getMessage()); + } + } +} diff --git a/src/main/java/de/assecutor/votianlt/messaging/WebSocketConfig.java b/src/main/java/de/assecutor/votianlt/messaging/WebSocketConfig.java new file mode 100644 index 0000000..7ae45f7 --- /dev/null +++ b/src/main/java/de/assecutor/votianlt/messaging/WebSocketConfig.java @@ -0,0 +1,35 @@ +package de.assecutor.votianlt.messaging; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.config.annotation.EnableWebSocket; +import org.springframework.web.socket.config.annotation.WebSocketConfigurer; +import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; +import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor; + +/** + * WebSocket configuration that registers the WebSocketService as a handler on + * the configured endpoint. + */ +@Configuration +@EnableWebSocket +public class WebSocketConfig implements WebSocketConfigurer { + + private final WebSocketService webSocketService; + + @Value("${app.messaging.websocket.path:/ws/messaging}") + private String wsPath; + + @Value("${app.messaging.websocket.allowed-origins:*}") + private String allowedOrigins; + + public WebSocketConfig(WebSocketService webSocketService) { + this.webSocketService = webSocketService; + } + + @Override + public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { + registry.addHandler(webSocketService, wsPath).setAllowedOrigins(allowedOrigins.split(",")) + .addInterceptors(new HttpSessionHandshakeInterceptor()); + } +} diff --git a/src/main/java/de/assecutor/votianlt/messaging/WebSocketService.java b/src/main/java/de/assecutor/votianlt/messaging/WebSocketService.java new file mode 100644 index 0000000..fcb9aed --- /dev/null +++ b/src/main/java/de/assecutor/votianlt/messaging/WebSocketService.java @@ -0,0 +1,384 @@ +package de.assecutor.votianlt.messaging; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.TextWebSocketHandler; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.*; + +/** + * WebSocket service for direct bidirectional communication with mobile clients. + * + * Wire Protocol: Each WebSocket message is a JSON document with a "topic" and + * "payload" field: + * + *
+ * {
+ *   "topic": "/server/login",
+ *   "payload": { ... }
+ * }
+ * 
+ * + * Topic Structure: + *
    + *
  • Server to Client: /client/{messageType}
  • + *
  • Client to Server: /server/{messageType}
  • + *
  • Login (special): /server/login (unauthenticated)
  • + *
+ */ +@Component +@Slf4j +public class WebSocketService extends TextWebSocketHandler { + + @FunctionalInterface + public interface MessageHandler { + void onMessageReceived(String clientId, byte[] payload); + } + + private static final String TOPIC_TO_CLIENT = "/client/%s"; + private static final long PENDING_SESSION_TIMEOUT_MS = 30_000; + + private final ObjectMapper objectMapper; + + // appUserId -> WebSocketSession + private final ConcurrentHashMap clientSessions = new ConcurrentHashMap<>(); + + // sessionId -> appUserId (reverse lookup for cleanup on disconnect) + private final ConcurrentHashMap sessionToClient = new ConcurrentHashMap<>(); + + // sessionId -> PendingSession (connected but not yet logged in) + private final ConcurrentHashMap pendingSessions = new ConcurrentHashMap<>(); + + private final Map messageHandlers = new ConcurrentHashMap<>(); + private volatile boolean initialized = false; + + private ScheduledExecutorService pendingSessionCleanup; + + public WebSocketService(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + // ========================================== + // Lifecycle + // ========================================== + + @PostConstruct + public void init() { + pendingSessionCleanup = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "ws-pending-cleanup"); + t.setDaemon(true); + return t; + }); + pendingSessionCleanup.scheduleAtFixedRate(this::cleanupPendingSessions, 30, 30, TimeUnit.SECONDS); + + initialized = true; + log.info("[WebSocket] Service initialized on endpoint /ws/messaging"); + } + + @PreDestroy + public void shutdown() { + if (pendingSessionCleanup != null) { + pendingSessionCleanup.shutdownNow(); + } + + for (var entry : clientSessions.entrySet()) { + try { + WebSocketSession session = entry.getValue(); + if (session.isOpen()) { + session.close(CloseStatus.GOING_AWAY); + } + } catch (Exception e) { + log.warn("[WebSocket] Error closing session for client {}: {}", entry.getKey(), e.getMessage()); + } + } + + for (var entry : pendingSessions.entrySet()) { + try { + if (entry.getValue().session.isOpen()) { + entry.getValue().session.close(CloseStatus.GOING_AWAY); + } + } catch (Exception ignored) { + } + } + + clientSessions.clear(); + sessionToClient.clear(); + pendingSessions.clear(); + messageHandlers.clear(); + initialized = false; + + log.info("[WebSocket] Service shut down"); + } + + // ========================================== + // Public API + // ========================================== + + public CompletableFuture sendToClient(String clientId, String messageType, byte[] payload) { + WebSocketSession session = clientSessions.get(clientId); + if (session == null || !session.isOpen()) { + return CompletableFuture + .failedFuture(new IOException("No active WebSocket session for client: " + clientId)); + } + + try { + String topic = String.format(TOPIC_TO_CLIENT, messageType); + String payloadJson = new String(payload, StandardCharsets.UTF_8); + + ObjectNode wireMessage = objectMapper.createObjectNode(); + wireMessage.put("topic", topic); + wireMessage.set("payload", objectMapper.readTree(payloadJson)); + + String wireJson = objectMapper.writeValueAsString(wireMessage); + log.info("[WebSocket OUT] {} -> {}", topic, wireJson); + + sendToSession(session, wireJson); + return CompletableFuture.completedFuture(null); + } catch (Exception e) { + log.error("[WebSocket] Failed to send to client {}: {}", clientId, e.getMessage()); + return CompletableFuture.failedFuture(new IOException("Failed to send WebSocket message", e)); + } + } + + public void registerMessageHandler(String messageType, MessageHandler handler) { + messageHandlers.put(messageType, handler); + log.debug("[WebSocket] Registered handler for messageType: {}", messageType); + } + + public boolean isConnected() { + return initialized; + } + + public boolean isClientConnected(String clientId) { + WebSocketSession session = clientSessions.get(clientId); + return session != null && session.isOpen(); + } + + public int getConnectedClientCount() { + return clientSessions.size(); + } + + // ========================================== + // WebSocket handler methods + // ========================================== + + @Override + public void afterConnectionEstablished(WebSocketSession session) { + pendingSessions.put(session.getId(), new PendingSession(session, Instant.now())); + log.info("[WebSocket] New connection: sessionId={}, remote={}", session.getId(), session.getRemoteAddress()); + } + + @Override + protected void handleTextMessage(WebSocketSession session, TextMessage message) { + try { + String json = message.getPayload(); + JsonNode wireMessage = objectMapper.readTree(json); + + JsonNode topicNode = wireMessage.get("topic"); + JsonNode payloadNode = wireMessage.get("payload"); + + if (topicNode == null || payloadNode == null) { + log.warn("[WebSocket] Invalid message format (missing topic or payload): {}", json); + return; + } + + String topic = topicNode.asText(); + byte[] payloadBytes = objectMapper.writeValueAsBytes(payloadNode); + + log.info("[WebSocket IN] {} <- {}", topic, json); + + // Login message (special: unauthenticated) + if ("/server/login".equals(topic)) { + handleLoginMessage(session, payloadBytes); + return; + } + + // Regular client message: /server/{messageType} + if (topic.startsWith("/server/")) { + // Verify session is authenticated + String appUserId = sessionToClient.get(session.getId()); + if (appUserId == null) { + log.warn("[WebSocket] Unauthenticated session {} tried to send: {}", session.getId(), topic); + return; + } + handleClientMessage(topic, appUserId, payloadBytes); + } + } catch (Exception e) { + log.error("[WebSocket] Error handling message: {}", e.getMessage(), e); + } + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { + String sessionId = session.getId(); + + // Remove from pending sessions + pendingSessions.remove(sessionId); + + // Remove from authenticated sessions + String clientId = sessionToClient.remove(sessionId); + if (clientId != null) { + clientSessions.remove(clientId, session); + log.info("[WebSocket] Client disconnected: clientId={}, reason={}", clientId, status); + } else { + log.info("[WebSocket] Unauthenticated session closed: sessionId={}", sessionId); + } + } + + @Override + public void handleTransportError(WebSocketSession session, Throwable exception) { + log.error("[WebSocket] Transport error for session {}: {}", session.getId(), exception.getMessage()); + } + + // ========================================== + // Internal message routing + // ========================================== + + private void handleLoginMessage(WebSocketSession session, byte[] payloadBytes) { + MessageHandler handler = messageHandlers.get("login"); + if (handler != null) { + handler.onMessageReceived(session.getId(), payloadBytes); + } + } + + /** + * Register a pending session as authenticated under the given appUserId. + * Called by MessagingConfig after successful login. + */ + public void registerAuthenticatedSession(String wsSessionId, String appUserId) { + PendingSession pending = pendingSessions.get(wsSessionId); + if (pending == null) { + log.warn("[WebSocket] No pending session for wsSessionId={}", wsSessionId); + return; + } + registerClientSession(appUserId, pending.session()); + } + + /** + * Send a wire-format message directly to a session by its WebSocket sessionId. + * Used for sending login responses to pending (not yet authenticated) sessions. + */ + public void sendToSessionById(String wsSessionId, String topic, byte[] payload) { + try { + // Check pending sessions first + PendingSession pending = pendingSessions.get(wsSessionId); + WebSocketSession session = pending != null ? pending.session() : null; + + // Fallback: check authenticated sessions via reverse lookup + if (session == null) { + String appUserId = sessionToClient.get(wsSessionId); + if (appUserId != null) { + session = clientSessions.get(appUserId); + } + } + + if (session == null || !session.isOpen()) { + log.warn("[WebSocket] Cannot send to session {}: not found or closed", wsSessionId); + return; + } + + String payloadJson = new String(payload, StandardCharsets.UTF_8); + ObjectNode wireMessage = objectMapper.createObjectNode(); + wireMessage.put("topic", topic); + wireMessage.set("payload", objectMapper.readTree(payloadJson)); + + String wireJson = objectMapper.writeValueAsString(wireMessage); + log.info("[WebSocket OUT] {} -> {}", topic, wireJson); + + sendToSession(session, wireJson); + } catch (Exception e) { + log.error("[WebSocket] Error sending to session {}: {}", wsSessionId, e.getMessage()); + } + } + + private void handleClientMessage(String topic, String appUserId, byte[] payload) { + String[] parts = topic.split("/"); + + // Handle /server/{messageType} where messageType can contain slashes + if (parts.length >= 3) { + String messageType = String.join("/", Arrays.copyOfRange(parts, 2, parts.length)); + + MessageHandler handler = messageHandlers.get(messageType); + if (handler != null) { + handler.onMessageReceived(appUserId, payload); + } else { + log.warn("[WebSocket] No handler registered for messageType: {}", messageType); + } + } + } + + // ========================================== + // Session management + // ========================================== + + private void registerClientSession(String clientId, WebSocketSession session) { + // Close old session if same clientId reconnects + WebSocketSession oldSession = clientSessions.put(clientId, session); + if (oldSession != null && oldSession.isOpen() && !oldSession.getId().equals(session.getId())) { + try { + String oldSessionId = oldSession.getId(); + sessionToClient.remove(oldSessionId); + oldSession.close(CloseStatus.NORMAL.withReason("Replaced by new connection")); + log.info("[WebSocket] Closed old session for clientId={} (replaced)", clientId); + } catch (IOException e) { + log.warn("[WebSocket] Error closing old session for client {}: {}", clientId, e.getMessage()); + } + } + + sessionToClient.put(session.getId(), clientId); + pendingSessions.remove(session.getId()); + + log.info("[WebSocket] Client registered: clientId={}, sessionId={}", clientId, session.getId()); + } + + private void cleanupPendingSessions() { + Instant cutoff = Instant.now().minusMillis(PENDING_SESSION_TIMEOUT_MS); + pendingSessions.entrySet().removeIf(entry -> { + if (entry.getValue().connectedAt.isBefore(cutoff)) { + try { + WebSocketSession session = entry.getValue().session; + if (session.isOpen()) { + session.close(CloseStatus.POLICY_VIOLATION.withReason("Login timeout")); + } + log.info("[WebSocket] Closed pending session (login timeout): sessionId={}", entry.getKey()); + } catch (IOException e) { + log.warn("[WebSocket] Error closing pending session: {}", e.getMessage()); + } + return true; + } + return false; + }); + } + + // ========================================== + // Utility methods + // ========================================== + + private void sendToSession(WebSocketSession session, String message) throws IOException { + synchronized (session) { + if (session.isOpen()) { + session.sendMessage(new TextMessage(message)); + } + } + } + + // ========================================== + // Internal types + // ========================================== + + private record PendingSession(WebSocketSession session, Instant connectedAt) { + } +} diff --git a/src/main/java/de/assecutor/votianlt/messaging/config/PluginMessagingConfig.java b/src/main/java/de/assecutor/votianlt/messaging/config/PluginMessagingConfig.java deleted file mode 100644 index f31a5b5..0000000 --- a/src/main/java/de/assecutor/votianlt/messaging/config/PluginMessagingConfig.java +++ /dev/null @@ -1,242 +0,0 @@ -package de.assecutor.votianlt.messaging.config; - -import de.assecutor.votianlt.controller.MessageController; -import de.assecutor.votianlt.dto.AppLoginRequest; -import de.assecutor.votianlt.messaging.delivery.MessageDeliveryService; -import de.assecutor.votianlt.messaging.model.AcknowledgmentMessage; -import de.assecutor.votianlt.messaging.model.MessageEnvelope; -import de.assecutor.votianlt.messaging.plugin.*; -import de.assecutor.votianlt.messaging.plugin.mqtt.MqttMessagingPlugin; -import de.assecutor.votianlt.service.ClientConnectionService; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.context.event.ApplicationReadyEvent; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.event.EventListener; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import java.nio.charset.StandardCharsets; -import java.util.Map; - -/** - * Configuration for the plugin-based messaging system. Initializes the selected - * plugin and sets up message routing. - */ -@Configuration -@Slf4j -public class PluginMessagingConfig { - - @Value("${app.messaging.plugin.type:mqtt}") - private String pluginType; - - @Value("${app.messaging.plugin.mqtt.broker.host:mqtt-2.assecutor.de}") - private String mqttBrokerHost; - - @Value("${app.messaging.plugin.mqtt.broker.port:1883}") - private int mqttBrokerPort; - - @Value("${app.messaging.plugin.mqtt.username:app}") - private String mqttUsername; - - @Value("${app.messaging.plugin.mqtt.password:apppwd}") - private String mqttPassword; - - @Value("${app.messaging.plugin.mqtt.client.id:votianlt-server}") - private String mqttClientId; - - private final PluginManager pluginManager; - private final ObjectMapper objectMapper; - - public PluginMessagingConfig(PluginManager pluginManager, ObjectMapper objectMapper) { - this.pluginManager = pluginManager; - this.objectMapper = objectMapper; - } - - /** - * Initialize the messaging plugin after application startup. This method is - * called after all beans are created, so we can safely access - * MessageDeliveryService. - */ - @EventListener(ApplicationReadyEvent.class) - public void initializePlugin(ApplicationReadyEvent event) { - try { - MessagingPlugin plugin = createPlugin(pluginType); - PluginConfig config = createPluginConfig(pluginType); - - // Get beans from context (after all beans are created) - MessageDeliveryService deliveryService = event.getApplicationContext() - .getBean(MessageDeliveryService.class); - MessageController messageController = event.getApplicationContext().getBean(MessageController.class); - ClientConnectionService clientConnectionService = event.getApplicationContext() - .getBean(ClientConnectionService.class); - - // Set up a listener to subscribe when connected - pluginManager.addStateListener(stateEvent -> { - if (stateEvent.isConnected()) { - try { - setupSubscriptions(deliveryService, messageController, clientConnectionService); - } catch (Exception e) { - log.error("[MQTT] Error setting up subscriptions: {}", e.getMessage()); - } - } else if (stateEvent.getState() == ConnectionStateEvent.ConnectionState.DISCONNECTED) { - log.info("[MQTT] Disconnected from broker"); - } else if (stateEvent.getState() == ConnectionStateEvent.ConnectionState.ERROR) { - log.error("[MQTT] Connection error: {}", stateEvent.getErrorMessage()); - } - }); - - // Activate plugin - pluginManager.activatePlugin(plugin, config); - - } catch (Exception e) { - log.error("[MQTT] Failed to initialize: {}", e.getMessage()); - throw new RuntimeException("Failed to initialize messaging plugin", e); - } - } - - /** - * Create a plugin instance based on the plugin type. - */ - private MessagingPlugin createPlugin(String type) { - return switch (type.toLowerCase()) { - case "mqtt" -> new MqttMessagingPlugin(); - // Add more plugin types here in the future - // case "websocket" -> new WebSocketMessagingPlugin(); - // case "grpc" -> new GrpcMessagingPlugin(); - default -> throw new IllegalArgumentException("Unknown plugin type: " + type); - }; - } - - /** - * Create plugin configuration based on the plugin type. - */ - private PluginConfig createPluginConfig(String type) { - PluginConfig config = new PluginConfig(); - - switch (type.toLowerCase()) { - case "mqtt" -> { - config.setProperty("broker.host", mqttBrokerHost); - config.setProperty("broker.port", mqttBrokerPort); - config.setProperty("username", mqttUsername); - config.setProperty("password", mqttPassword); - config.setProperty("client.id", mqttClientId); - config.setProperty("auto.reconnect", true); - config.setProperty("clean.start", true); - } - // Add more plugin configurations here - default -> throw new IllegalArgumentException("Unknown plugin type: " + type); - } - - return config; - } - - /** - * Setup message subscriptions using the new plugin API. - */ - private void setupSubscriptions(MessageDeliveryService deliveryService, MessageController messageController, - ClientConnectionService clientConnectionService) { - try { - // Register ACK handler - pluginManager.registerAckHandler((messageId, payload) -> { - try { - String json = new String(payload, StandardCharsets.UTF_8); - MessageEnvelope envelope = objectMapper.readValue(json, MessageEnvelope.class); - AcknowledgmentMessage ack = objectMapper.convertValue(envelope.getPayload(), - AcknowledgmentMessage.class); - deliveryService.handleAcknowledgment(ack); - } catch (Exception e) { - // Ignore ACK handling errors - } - }); - - // Register message handlers for different message types - String[] messageTypes = { "task_completed", "jobs/assigned", "message", "login", "pong" }; - - for (String messageType : messageTypes) { - pluginManager.registerMessageHandler(messageType, - (clientId, payload) -> handleEnvelopedMessage(clientId, payload, deliveryService, - messageController, clientConnectionService)); - } - - } catch (Exception e) { - log.error("[MQTT] Error setting up subscriptions: {}", e.getMessage()); - throw new RuntimeException("Failed to setup subscriptions", e); - } - } - - /** - * Handle incoming enveloped message. Supports both new envelope format and - * legacy format for backwards compatibility. - */ - private void handleEnvelopedMessage(String clientId, byte[] payload, MessageDeliveryService deliveryService, - MessageController messageController, ClientConnectionService clientConnectionService) { - try { - String json = new String(payload, StandardCharsets.UTF_8); - - // Try to parse as envelope first - try { - MessageEnvelope envelope = objectMapper.readValue(json, MessageEnvelope.class); - if (envelope.getMessageId() != null && envelope.getTopic() != null) { - deliveryService.handleIncomingMessage(envelope); - return; - } - } catch (Exception e) { - // Not a valid envelope, try legacy format - } - - // Handle legacy format (direct payload without envelope) - handleLegacyMessage(clientId, json, messageController, clientConnectionService); - - } catch (Exception e) { - // Ignore message handling errors - } - } - - /** - * Handle legacy message format (without envelope wrapper). This supports older - * clients that don't use the envelope format. - */ - @SuppressWarnings("unchecked") - private void handleLegacyMessage(String clientId, String json, MessageController messageController, - ClientConnectionService clientConnectionService) { - try { - Map payload = objectMapper.readValue(json, Map.class); - - // Check if this is a login request (has email, password, clientId) - if (payload.containsKey("email") && payload.containsKey("password") && payload.containsKey("clientId")) { - AppLoginRequest loginRequest = objectMapper.convertValue(payload, AppLoginRequest.class); - messageController.handleAppLogin(loginRequest); - return; - } - - // Check if this is a pong response - if ("pong".equals(payload.get("type"))) { - String pongClientId = clientId != null ? clientId : (String) payload.get("clientId"); - if (pongClientId != null) { - clientConnectionService.handlePong(pongClientId); - } - return; - } - - // Check if this is a task completion - if (payload.containsKey("taskType") || payload.containsKey("taskId")) { - String taskType = payload.get("taskType") != null ? payload.get("taskType").toString() : null; - messageController.handleTaskCompleted(payload, taskType); - return; - } - - // Check if this is a jobs/assigned request - if (payload.containsKey("appUserId")) { - if (clientId != null) { - payload.put("clientId", clientId); - } - messageController.handleGetAssignedJobs(payload); - return; - } - - } catch (Exception e) { - // Ignore legacy message handling errors - } - } -} diff --git a/src/main/java/de/assecutor/votianlt/messaging/delivery/AcknowledgmentHandler.java b/src/main/java/de/assecutor/votianlt/messaging/delivery/AcknowledgmentHandler.java deleted file mode 100644 index ca04805..0000000 --- a/src/main/java/de/assecutor/votianlt/messaging/delivery/AcknowledgmentHandler.java +++ /dev/null @@ -1,147 +0,0 @@ -package de.assecutor.votianlt.messaging.delivery; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import de.assecutor.votianlt.controller.MessageController; -import de.assecutor.votianlt.dto.AppLoginRequest; -import de.assecutor.votianlt.messaging.model.MessageEnvelope; -import lombok.extern.slf4j.Slf4j; -import org.springframework.context.annotation.Lazy; -import org.springframework.stereotype.Component; - -import java.util.Map; - -/** - * Handles acknowledgments and routes incoming messages to application layer. - * Acts as a bridge between the messaging layer and the application logic. - */ -@Component -@Slf4j -public class AcknowledgmentHandler { - - private final MessageController messageController; - private final ObjectMapper objectMapper; - - public AcknowledgmentHandler(@Lazy MessageController messageController, ObjectMapper objectMapper) { - this.messageController = messageController; - this.objectMapper = objectMapper; - } - - /** - * Route incoming message envelope to appropriate application handler. Unwraps - * the envelope and delegates to MessageController. - */ - public void routeIncomingMessage(MessageEnvelope envelope) { - try { - String topic = envelope.getTopic(); - Object payload = envelope.getPayload(); - - log.debug("[AckHandler] Routing message {} on topic {}", envelope.getMessageId(), topic); - - // Convert payload to Map for routing - Map payloadMap = objectMapper.convertValue(payload, - new TypeReference>() { - }); - - // Route based on topic pattern - if (topic.matches("/server/.+/task_completed")) { - handleTaskCompleted(payloadMap); - } else if (topic.matches("/server/.+/jobs/assigned")) { - handleJobsAssigned(topic, payloadMap); - } else if (topic.equals("/server/login")) { - handleLogin(payloadMap); - } else if (topic.matches("/server/.+/message")) { - handleIncomingMessage(topic, payloadMap); - } else if (topic.matches("/server/.+/pong")) { - handlePong(topic, payloadMap); - } else { - log.debug("[AckHandler] No route for topic {}", topic); - } - - } catch (Exception e) { - log.error("[AckHandler] Error routing message {}: {}", envelope.getMessageId(), e.getMessage(), e); - } - } - - /** - * Handle task completion message - */ - private void handleTaskCompleted(Map payload) { - try { - Object tt = payload.get("taskType"); - String taskType = tt != null ? tt.toString() : null; - messageController.handleTaskCompleted(payload, taskType); - } catch (Exception e) { - log.error("[AckHandler] Error handling task_completed: {}", e.getMessage(), e); - } - } - - /** - * Handle jobs assigned request - */ - private void handleJobsAssigned(String topic, Map payload) { - try { - // Extract clientId from topic: /server/{clientId}/jobs/assigned - String[] parts = topic.split("/"); - String clientId = parts.length > 2 ? parts[2] : null; - if (clientId != null && !clientId.isBlank()) { - payload.put("clientId", clientId); - } else { - log.warn("[AckHandler] Couldn't extract clientId from topic {} for jobs/assigned", topic); - } - messageController.handleGetAssignedJobs(payload); - } catch (Exception e) { - log.error("[AckHandler] Error handling jobs/assigned: {}", e.getMessage(), e); - } - } - - /** - * Handle login request Topic: /server/login - */ - private void handleLogin(Map payload) { - try { - AppLoginRequest req = objectMapper.convertValue(payload, AppLoginRequest.class); - messageController.handleAppLogin(req); - } catch (Exception e) { - log.error("[AckHandler] Error handling login: {}", e.getMessage(), e); - } - } - - /** - * Handle incoming chat message - */ - private void handleIncomingMessage(String topic, Map payload) { - try { - // Extract clientId from topic: /server/{clientId}/message - String[] parts = topic.split("/"); - String clientId = parts.length > 2 ? parts[2] : null; - if (clientId != null && !clientId.isBlank()) { - payload.put("clientId", clientId); - } else { - log.warn("[AckHandler] Couldn't extract clientId from topic {} for message", topic); - } - messageController.handleIncomingMessage(payload); - } catch (Exception e) { - log.error("[AckHandler] Error handling incoming message: {}", e.getMessage(), e); - } - } - - /** - * Handle pong response from client for connection monitoring - */ - private void handlePong(String topic, Map payload) { - try { - // Extract clientId from topic: /server/{clientId}/pong - String[] parts = topic.split("/"); - String clientId = parts.length > 2 ? parts[2] : null; - if (clientId != null && !clientId.isBlank()) { - payload.put("clientId", clientId); - } else { - log.warn("[AckHandler] Couldn't extract clientId from topic {} for pong", topic); - } - messageController.handlePong(payload); - } catch (Exception e) { - log.error("[AckHandler] Error handling pong: {}", e.getMessage(), e); - } - } -} diff --git a/src/main/java/de/assecutor/votianlt/messaging/delivery/DeliveryConfig.java b/src/main/java/de/assecutor/votianlt/messaging/delivery/DeliveryConfig.java deleted file mode 100644 index 8231a88..0000000 --- a/src/main/java/de/assecutor/votianlt/messaging/delivery/DeliveryConfig.java +++ /dev/null @@ -1,61 +0,0 @@ -package de.assecutor.votianlt.messaging.delivery; - -import lombok.Data; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.context.annotation.Configuration; - -import java.time.Duration; - -/** - * Configuration for message delivery service. - */ -@Configuration -@ConfigurationProperties(prefix = "app.messaging.delivery") -@Data -public class DeliveryConfig { - - /** - * Maximum number of retry attempts for failed deliveries - */ - private int maxRetries = 3; - - /** - * Initial delay before first retry - */ - private Duration retryInitialDelay = Duration.ofSeconds(5); - - /** - * Maximum delay between retries - */ - private Duration retryMaxDelay = Duration.ofMinutes(5); - - /** - * Backoff multiplier for exponential backoff - */ - private double retryBackoffMultiplier = 2.0; - - /** - * Timeout for waiting for acknowledgment - */ - private Duration ackTimeout = Duration.ofSeconds(30); - - /** - * Default message expiry duration - */ - private Duration messageExpiry = Duration.ofHours(24); - - /** - * Interval for cleanup task (in minutes) - */ - private int cleanupIntervalMinutes = 60; - - /** - * Interval for retry task (in seconds) - */ - private int retryIntervalSeconds = 30; - - /** - * Retention period for acknowledged deliveries (in days) - */ - private int acknowledgedRetentionDays = 7; -} diff --git a/src/main/java/de/assecutor/votianlt/messaging/delivery/MessageDeliveryService.java b/src/main/java/de/assecutor/votianlt/messaging/delivery/MessageDeliveryService.java deleted file mode 100644 index c6f400a..0000000 --- a/src/main/java/de/assecutor/votianlt/messaging/delivery/MessageDeliveryService.java +++ /dev/null @@ -1,135 +0,0 @@ -package de.assecutor.votianlt.messaging.delivery; - -import de.assecutor.votianlt.messaging.model.*; - -import java.util.Optional; -import java.util.concurrent.CompletableFuture; - -/** - * Service for reliable message delivery with acknowledgment tracking. Provides - * guaranteed delivery with retry mechanism and acknowledgment handling. - */ -public interface MessageDeliveryService { - - /** - * Send a message to a specific client with delivery tracking and - * acknowledgment. - * - * @param clientId - * The target client identifier - * @param messageType - * The type of message (e.g., "jobs", "message", "auth", "task") - * @param payload - * The message payload (will be serialized to JSON) - * @param options - * Delivery options (retries, timeout, etc.) - * @return CompletableFuture with delivery receipt - */ - CompletableFuture sendToClient(String clientId, String messageType, Object payload, - DeliveryOptions options); - - /** - * Send a message to a specific client with default delivery options. - * - * @param clientId - * The target client identifier - * @param messageType - * The type of message - * @param payload - * The message payload - * @return CompletableFuture with delivery receipt - */ - default CompletableFuture sendToClient(String clientId, String messageType, Object payload) { - return sendToClient(clientId, messageType, payload, DeliveryOptions.standard()); - } - - /** - * Send a message with delivery tracking and acknowledgment. - * - * @deprecated Use - * {@link #sendToClient(String, String, Object, DeliveryOptions)} - * instead - * - * @param topic - * The destination topic - * @param payload - * The message payload (will be serialized to JSON) - * @param options - * Delivery options (retries, timeout, etc.) - * @return CompletableFuture with delivery receipt - */ - @Deprecated - CompletableFuture sendMessage(String topic, Object payload, DeliveryOptions options); - - /** - * Send a message with default delivery options. - * - * @deprecated Use {@link #sendToClient(String, String, Object)} instead - * - * @param topic - * The destination topic - * @param payload - * The message payload - * @return CompletableFuture with delivery receipt - */ - @Deprecated - default CompletableFuture sendMessage(String topic, Object payload) { - return sendMessage(topic, payload, DeliveryOptions.standard()); - } - - /** - * Handle incoming message envelope from transport layer. Extracts payload and - * routes to application layer. - * - * @param envelope - * The received message envelope - */ - void handleIncomingMessage(MessageEnvelope envelope); - - /** - * Handle acknowledgment from client. Updates delivery status and removes from - * pending queue. - * - * @param ack - * The acknowledgment message - */ - void handleAcknowledgment(AcknowledgmentMessage ack); - - /** - * Get the current delivery status for a message. - * - * @param messageId - * The message ID - * @return Optional containing the delivery status, or empty if not found - */ - Optional getDeliveryStatus(String messageId); - - /** - * Get detailed pending delivery information. - * - * @param messageId - * The message ID - * @return Optional containing the pending delivery, or empty if not found - */ - Optional getPendingDelivery(String messageId); - - /** - * Retry all pending deliveries that are ready for retry. Called by scheduled - * task. - */ - void retryPendingDeliveries(); - - /** - * Retry pending deliveries for a specific client. Called when a client - * reconnects. - * - * @param clientId - * The client identifier - */ - void retryPendingDeliveriesForClient(String clientId); - - /** - * Clean up expired and completed deliveries. Called by scheduled task. - */ - void cleanupOldDeliveries(); -} diff --git a/src/main/java/de/assecutor/votianlt/messaging/delivery/MessageDeliveryServiceImpl.java b/src/main/java/de/assecutor/votianlt/messaging/delivery/MessageDeliveryServiceImpl.java deleted file mode 100644 index 4a09a71..0000000 --- a/src/main/java/de/assecutor/votianlt/messaging/delivery/MessageDeliveryServiceImpl.java +++ /dev/null @@ -1,403 +0,0 @@ -package de.assecutor.votianlt.messaging.delivery; - -import com.fasterxml.jackson.databind.ObjectMapper; -import de.assecutor.votianlt.messaging.model.*; -import de.assecutor.votianlt.messaging.plugin.PluginManager; -import de.assecutor.votianlt.messaging.plugin.SendOptions; -import de.assecutor.votianlt.repository.PendingDeliveryRepository; -import de.assecutor.votianlt.service.ClientConnectionService; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Service; - -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; - -import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.time.LocalDateTime; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * Implementation of MessageDeliveryService with reliable delivery guarantees. - */ -@Service -@Slf4j -public class MessageDeliveryServiceImpl implements MessageDeliveryService { - - @Value("${app.messaging.delivery.ack-retry-interval-seconds:5}") - private int ackRetryIntervalSeconds; - - @Value("${app.messaging.delivery.ack-max-retries:4}") - private int ackMaxRetries; - - private final PluginManager pluginManager; - private final PendingDeliveryRepository pendingDeliveryRepository; - private final AcknowledgmentHandler acknowledgmentHandler; - private final DeliveryConfig config; - private final ObjectMapper objectMapper; - private final ClientConnectionService clientConnectionService; - - private ScheduledExecutorService retryScheduler; - - public MessageDeliveryServiceImpl(PluginManager pluginManager, PendingDeliveryRepository pendingDeliveryRepository, - AcknowledgmentHandler acknowledgmentHandler, DeliveryConfig config, ObjectMapper objectMapper, - ClientConnectionService clientConnectionService) { - this.pluginManager = pluginManager; - this.pendingDeliveryRepository = pendingDeliveryRepository; - this.acknowledgmentHandler = acknowledgmentHandler; - this.config = config; - this.objectMapper = objectMapper; - this.clientConnectionService = clientConnectionService; - } - - @PostConstruct - public void startRetryScheduler() { - retryScheduler = Executors.newSingleThreadScheduledExecutor(r -> { - Thread t = new Thread(r, "message-retry-scheduler"); - t.setDaemon(true); - return t; - }); - retryScheduler.scheduleAtFixedRate(this::retryPendingDeliveries, ackRetryIntervalSeconds, - ackRetryIntervalSeconds, TimeUnit.SECONDS); - log.info("[MessageDelivery] Started retry scheduler (interval: {}s, max retries: {})", ackRetryIntervalSeconds, - ackMaxRetries); - } - - @PreDestroy - public void stopRetryScheduler() { - if (retryScheduler != null) { - retryScheduler.shutdown(); - try { - if (!retryScheduler.awaitTermination(5, TimeUnit.SECONDS)) { - retryScheduler.shutdownNow(); - } - } catch (InterruptedException e) { - retryScheduler.shutdownNow(); - Thread.currentThread().interrupt(); - } - log.info("[MessageDelivery] Stopped retry scheduler"); - } - } - - @Override - public CompletableFuture sendToClient(String clientId, String messageType, Object payload, - DeliveryOptions options) { - try { - String destination = clientId + "/" + messageType; - final LocalDateTime expiresAt = options.calculateExpiryTime(); - MessageEnvelope envelope = new MessageEnvelope(destination, payload, options.isRequiresAck(), expiresAt); - final String messageId = envelope.getMessageId(); - - String json = objectMapper.writeValueAsString(envelope); - byte[] envelopeData = json.getBytes(StandardCharsets.UTF_8); - - if (options.isRequiresAck()) { - PendingDelivery pending = new PendingDelivery(messageId, destination, envelopeData, - options.getMaxRetries(), expiresAt); - pendingDeliveryRepository.save(pending); - } - - SendOptions sendOptions = SendOptions.builder().qos(options.getQos()).retained(options.isRetained()) - .build(); - - final boolean requiresAck = options.isRequiresAck(); - final Duration ackTimeout = options.getAckTimeout(); - - log.info("[MessageDelivery] Sending message {} to client {} (type: {})", messageId, clientId, messageType); - - return pluginManager.sendToClient(clientId, messageType, envelopeData, sendOptions).thenApply(v -> { - if (requiresAck) { - updatePendingDeliveryAfterSend(messageId, ackTimeout); - } - return DeliveryReceipt.submitted(messageId, destination, expiresAt); - }).exceptionally(ex -> { - log.error("[MessageDelivery] Failed to send message {}: {}", messageId, ex.getMessage()); - if (requiresAck) { - markPendingDeliveryFailed(messageId, ex.getMessage()); - } - return DeliveryReceipt.failed(messageId, destination); - }); - - } catch (Exception e) { - log.error("[MessageDelivery] Error creating message for client {}: {}", clientId, e.getMessage()); - return CompletableFuture.completedFuture(DeliveryReceipt.failed("error", clientId + "/" + messageType)); - } - } - - @Override - @Deprecated - public CompletableFuture sendMessage(String topic, Object payload, DeliveryOptions options) { - String[] parts = topic.split("/"); - if (parts.length >= 4 && parts[1].equals("client")) { - String clientId = parts[2]; - String messageType = parts[3]; - return sendToClient(clientId, messageType, payload, options); - } - log.warn("[MessageDelivery] Using deprecated sendMessage with topic: {}", topic); - return CompletableFuture.completedFuture(DeliveryReceipt.failed("error", topic)); - } - - @Override - public void handleIncomingMessage(MessageEnvelope envelope) { - try { - log.info("[MessageDelivery] Received message {} on topic {}", envelope.getMessageId(), envelope.getTopic()); - - if (envelope.isRequiresAck()) { - sendAcknowledgment(envelope); - } - - acknowledgmentHandler.routeIncomingMessage(envelope); - - } catch (Exception e) { - log.error("[MessageDelivery] Error handling incoming message {}: {}", envelope.getMessageId(), - e.getMessage()); - } - } - - @Override - public void handleAcknowledgment(AcknowledgmentMessage ack) { - try { - log.info("[MessageDelivery] Received ACK for message {} (status: {})", ack.getMessageId(), ack.getStatus()); - - Optional pendingOpt = pendingDeliveryRepository.findByMessageId(ack.getMessageId()); - - if (pendingOpt.isEmpty()) { - return; - } - - PendingDelivery pending = pendingOpt.get(); - - switch (ack.getStatus()) { - case RECEIVED, PROCESSED -> { - pendingDeliveryRepository.delete(pending); - } - case FAILED -> { - pending.markAsFailed(ack.getErrorMessage()); - pendingDeliveryRepository.save(pending); - log.warn("[MessageDelivery] Message {} failed on client: {}", ack.getMessageId(), - ack.getErrorMessage()); - } - } - - } catch (Exception e) { - log.error("[MessageDelivery] Error handling ACK for message {}: {}", ack.getMessageId(), e.getMessage()); - } - } - - @Override - public Optional getDeliveryStatus(String messageId) { - return pendingDeliveryRepository.findByMessageId(messageId).map(PendingDelivery::getStatus); - } - - @Override - public Optional getPendingDelivery(String messageId) { - return pendingDeliveryRepository.findByMessageId(messageId); - } - - @Override - public void retryPendingDeliveries() { - try { - List readyForRetry = pendingDeliveryRepository - .findByStatusAndNextRetryAtBefore(DeliveryStatus.SENT, LocalDateTime.now()); - - if (readyForRetry.isEmpty()) { - return; - } - - for (PendingDelivery pending : readyForRetry) { - retryDelivery(pending); - } - - } catch (Exception e) { - log.error("[MessageDelivery] Error during retry process: {}", e.getMessage()); - } - } - - @Override - public void retryPendingDeliveriesForClient(String clientId) { - if (clientId == null || clientId.isBlank()) { - return; - } - - try { - List pendingDeliveries = pendingDeliveryRepository - .findByStatusIn(List.of(DeliveryStatus.PENDING, DeliveryStatus.SENT)); - - for (PendingDelivery pending : pendingDeliveries) { - String topic = pending.getTopic(); - if (topic != null && topic.startsWith(clientId + "/")) { - retryDelivery(pending); - } - } - - } catch (Exception e) { - log.error("[MessageDelivery] Error retrying deliveries for client {}: {}", clientId, e.getMessage()); - } - } - - @Override - public void cleanupOldDeliveries() { - try { - LocalDateTime cutoff = LocalDateTime.now().minus(Duration.ofDays(7)); - List oldAcknowledged = pendingDeliveryRepository - .findByStatusAndAcknowledgedAtBefore(DeliveryStatus.ACKNOWLEDGED, cutoff); - - if (!oldAcknowledged.isEmpty()) { - pendingDeliveryRepository.deleteAll(oldAcknowledged); - } - - List expired = pendingDeliveryRepository.findByStatusInAndExpiresAtBefore( - List.of(DeliveryStatus.PENDING, DeliveryStatus.SENT), LocalDateTime.now()); - - for (PendingDelivery pending : expired) { - pending.markAsExpired(); - pendingDeliveryRepository.save(pending); - } - - } catch (Exception e) { - log.error("[MessageDelivery] Error during cleanup: {}", e.getMessage()); - } - } - - private void updatePendingDeliveryAfterSend(String messageId, Duration ackTimeout) { - try { - Optional pendingOpt = pendingDeliveryRepository.findByMessageId(messageId); - if (pendingOpt.isPresent()) { - PendingDelivery pending = pendingOpt.get(); - LocalDateTime nextRetry = LocalDateTime.now().plus(ackTimeout); - pending.markAsSent(nextRetry); - pendingDeliveryRepository.save(pending); - } - } catch (Exception e) { - log.error("[MessageDelivery] Error updating pending delivery {}: {}", messageId, e.getMessage()); - } - } - - private void markPendingDeliveryFailed(String messageId, String reason) { - try { - Optional pendingOpt = pendingDeliveryRepository.findByMessageId(messageId); - if (pendingOpt.isPresent()) { - PendingDelivery pending = pendingOpt.get(); - pending.markAsFailed(reason); - pendingDeliveryRepository.save(pending); - } - } catch (Exception e) { - log.error("[MessageDelivery] Error marking delivery as failed {}: {}", messageId, e.getMessage()); - } - } - - private void retryDelivery(PendingDelivery pending) { - try { - if (pending.isExpired()) { - pending.markAsExpired(); - pendingDeliveryRepository.save(pending); - return; - } - - if (pending.hasReachedMaxRetries()) { - pending.markAsFailed("Max retries reached"); - pendingDeliveryRepository.save(pending); - return; - } - - String topic = pending.getTopic(); - String[] parts = topic.split("/"); - if (parts.length < 2) { - pending.markAsFailed("Invalid topic format"); - pendingDeliveryRepository.save(pending); - return; - } - - String clientId = parts[0]; - String messageType = parts[1]; - - Duration backoffDelay = calculateBackoff(pending.getRetryCount() + 1); - LocalDateTime nextRetry = LocalDateTime.now().plus(backoffDelay); - - if (!clientConnectionService.isClientConnected(clientId)) { - pending.setNextRetryAt(nextRetry); - pendingDeliveryRepository.save(pending); - return; - } - - pending.incrementRetryCount(); - - SendOptions options = SendOptions.reliable(); - pluginManager.sendToClient(clientId, messageType, pending.getEnvelopeData(), options).thenAccept(v -> { - pending.markAsSent(nextRetry); - pendingDeliveryRepository.save(pending); - }).exceptionally(ex -> { - log.error("[MessageDelivery] Retry failed for message {}: {}", pending.getMessageId(), ex.getMessage()); - pending.markAsFailed(ex.getMessage()); - pendingDeliveryRepository.save(pending); - return null; - }); - - } catch (Exception e) { - log.error("[MessageDelivery] Error retrying delivery {}: {}", pending.getMessageId(), e.getMessage()); - } - } - - private void sendAcknowledgment(MessageEnvelope envelope) { - try { - String clientId = extractClientIdFromTopic(envelope.getTopic()); - if (clientId == null) { - return; - } - - AcknowledgmentMessage ack = new AcknowledgmentMessage(envelope.getMessageId(), AckStatus.RECEIVED, - "server"); - - String ackJson = objectMapper.writeValueAsString(ack); - byte[] ackData = ackJson.getBytes(StandardCharsets.UTF_8); - - log.info("[MessageDelivery] Sending ACK for message {} to client {}", envelope.getMessageId(), clientId); - - pluginManager.sendAckToClient(clientId, envelope.getMessageId(), ackData, SendOptions.fireAndForget()) - .exceptionally(ex -> { - log.error("[MessageDelivery] Failed to send ACK for message {}: {}", envelope.getMessageId(), - ex.getMessage()); - return null; - }); - - } catch (Exception e) { - log.error("[MessageDelivery] Error sending ACK for message {}: {}", envelope.getMessageId(), - e.getMessage()); - } - } - - private Duration calculateBackoff(int retryCount) { - long delayMs = (long) (config.getRetryInitialDelay().toMillis() - * Math.pow(config.getRetryBackoffMultiplier(), retryCount - 1)); - long maxDelayMs = config.getRetryMaxDelay().toMillis(); - return Duration.ofMillis(Math.min(delayMs, maxDelayMs)); - } - - private String extractClientIdFromTopic(String topic) { - if (topic == null) { - return null; - } - - if (topic.startsWith("/server/")) { - String[] parts = topic.split("/"); - if (parts.length > 2) { - return parts[2]; - } - } - - if (topic.contains("/")) { - String[] parts = topic.split("/"); - if (parts.length >= 1) { - return parts[0]; - } - } - - return null; - } -} diff --git a/src/main/java/de/assecutor/votianlt/messaging/delivery/RetryScheduler.java b/src/main/java/de/assecutor/votianlt/messaging/delivery/RetryScheduler.java deleted file mode 100644 index ec3ded1..0000000 --- a/src/main/java/de/assecutor/votianlt/messaging/delivery/RetryScheduler.java +++ /dev/null @@ -1,41 +0,0 @@ -package de.assecutor.votianlt.messaging.delivery; - -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -/** - * Scheduled tasks for message delivery retry and cleanup. - */ -@Component -public class RetryScheduler { - - private final MessageDeliveryService deliveryService; - - public RetryScheduler(MessageDeliveryService deliveryService) { - this.deliveryService = deliveryService; - } - - /** - * Retry pending deliveries every 30 seconds (configurable) - */ - @Scheduled(fixedDelayString = "${app.messaging.delivery.retry-interval-seconds:30}000") - public void retryPendingDeliveries() { - try { - deliveryService.retryPendingDeliveries(); - } catch (Exception e) { - // Ignore retry errors - } - } - - /** - * Cleanup old deliveries every hour (configurable) - */ - @Scheduled(fixedDelayString = "${app.messaging.delivery.cleanup-interval-minutes:60}000") - public void cleanupOldDeliveries() { - try { - deliveryService.cleanupOldDeliveries(); - } catch (Exception e) { - // Ignore cleanup errors - } - } -} diff --git a/src/main/java/de/assecutor/votianlt/messaging/model/AckStatus.java b/src/main/java/de/assecutor/votianlt/messaging/model/AckStatus.java deleted file mode 100644 index 5d3182e..0000000 --- a/src/main/java/de/assecutor/votianlt/messaging/model/AckStatus.java +++ /dev/null @@ -1,21 +0,0 @@ -package de.assecutor.votianlt.messaging.model; - -/** - * Status of message acknowledgment from client. - */ -public enum AckStatus { - /** - * Message was received by the client - */ - RECEIVED, - - /** - * Message was successfully processed by the client - */ - PROCESSED, - - /** - * Message processing failed on the client side - */ - FAILED -} diff --git a/src/main/java/de/assecutor/votianlt/messaging/model/AcknowledgmentMessage.java b/src/main/java/de/assecutor/votianlt/messaging/model/AcknowledgmentMessage.java deleted file mode 100644 index 7d2b31b..0000000 --- a/src/main/java/de/assecutor/votianlt/messaging/model/AcknowledgmentMessage.java +++ /dev/null @@ -1,62 +0,0 @@ -package de.assecutor.votianlt.messaging.model; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.time.LocalDateTime; - -/** - * Acknowledgment message sent by clients to confirm message receipt/processing. - */ -@Data -@NoArgsConstructor -@AllArgsConstructor -public class AcknowledgmentMessage { - - /** - * ID of the message being acknowledged - */ - private String messageId; - - /** - * Status of the acknowledgment - */ - private AckStatus status; - - /** - * Timestamp when the acknowledgment was created - */ - private LocalDateTime timestamp; - - /** - * ID of the client sending the acknowledgment - */ - private String clientId; - - /** - * Optional error message if status is FAILED - */ - private String errorMessage; - - /** - * Constructor for successful acknowledgment - */ - public AcknowledgmentMessage(String messageId, AckStatus status, String clientId) { - this.messageId = messageId; - this.status = status; - this.timestamp = LocalDateTime.now(); - this.clientId = clientId; - } - - /** - * Constructor for failed acknowledgment with error message - */ - public AcknowledgmentMessage(String messageId, String clientId, String errorMessage) { - this.messageId = messageId; - this.status = AckStatus.FAILED; - this.timestamp = LocalDateTime.now(); - this.clientId = clientId; - this.errorMessage = errorMessage; - } -} diff --git a/src/main/java/de/assecutor/votianlt/messaging/model/DeliveryOptions.java b/src/main/java/de/assecutor/votianlt/messaging/model/DeliveryOptions.java deleted file mode 100644 index 51b9ee9..0000000 --- a/src/main/java/de/assecutor/votianlt/messaging/model/DeliveryOptions.java +++ /dev/null @@ -1,84 +0,0 @@ -package de.assecutor.votianlt.messaging.model; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.time.Duration; -import java.time.LocalDateTime; - -/** - * Options for message delivery configuration. - */ -@Data -@Builder -@NoArgsConstructor -@AllArgsConstructor -public class DeliveryOptions { - - /** - * Whether this message requires acknowledgment - */ - @Builder.Default - private boolean requiresAck = true; - - /** - * Maximum number of retry attempts - */ - @Builder.Default - private int maxRetries = 3; - - /** - * Timeout for acknowledgment - */ - @Builder.Default - private Duration ackTimeout = Duration.ofSeconds(30); - - /** - * Message expiry duration from now - */ - @Builder.Default - private Duration expiryDuration = Duration.ofHours(24); - - /** - * QoS level for transport (MQTT specific, but kept generic) - */ - @Builder.Default - private int qos = 2; - - /** - * Whether message should be retained by broker - */ - @Builder.Default - private boolean retained = false; - - /** - * Calculate expiry timestamp from duration - */ - public LocalDateTime calculateExpiryTime() { - return LocalDateTime.now().plus(expiryDuration); - } - - /** - * Default options for standard messages - */ - public static DeliveryOptions standard() { - return DeliveryOptions.builder().build(); - } - - /** - * Options for fire-and-forget messages (no acknowledgment required) - */ - public static DeliveryOptions fireAndForget() { - return DeliveryOptions.builder().requiresAck(false).maxRetries(0).build(); - } - - /** - * Options for critical messages with extended retry - */ - public static DeliveryOptions critical() { - return DeliveryOptions.builder().requiresAck(true).maxRetries(5).ackTimeout(Duration.ofMinutes(2)) - .expiryDuration(Duration.ofDays(7)).build(); - } -} diff --git a/src/main/java/de/assecutor/votianlt/messaging/model/DeliveryReceipt.java b/src/main/java/de/assecutor/votianlt/messaging/model/DeliveryReceipt.java deleted file mode 100644 index 8f02b3f..0000000 --- a/src/main/java/de/assecutor/votianlt/messaging/model/DeliveryReceipt.java +++ /dev/null @@ -1,55 +0,0 @@ -package de.assecutor.votianlt.messaging.model; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.time.LocalDateTime; - -/** - * Receipt returned when a message is submitted for delivery. - */ -@Data -@NoArgsConstructor -@AllArgsConstructor -public class DeliveryReceipt { - - /** - * Unique message identifier - */ - private String messageId; - - /** - * Topic where message was sent - */ - private String topic; - - /** - * When the message was submitted - */ - private LocalDateTime submittedAt; - - /** - * Initial delivery status - */ - private DeliveryStatus status; - - /** - * When the message will expire - */ - private LocalDateTime expiresAt; - - /** - * Create a receipt for a successfully submitted message - */ - public static DeliveryReceipt submitted(String messageId, String topic, LocalDateTime expiresAt) { - return new DeliveryReceipt(messageId, topic, LocalDateTime.now(), DeliveryStatus.PENDING, expiresAt); - } - - /** - * Create a receipt for a failed submission - */ - public static DeliveryReceipt failed(String messageId, String topic) { - return new DeliveryReceipt(messageId, topic, LocalDateTime.now(), DeliveryStatus.FAILED, null); - } -} diff --git a/src/main/java/de/assecutor/votianlt/messaging/model/DeliveryStatus.java b/src/main/java/de/assecutor/votianlt/messaging/model/DeliveryStatus.java deleted file mode 100644 index b923142..0000000 --- a/src/main/java/de/assecutor/votianlt/messaging/model/DeliveryStatus.java +++ /dev/null @@ -1,31 +0,0 @@ -package de.assecutor.votianlt.messaging.model; - -/** - * Status of a message delivery attempt. - */ -public enum DeliveryStatus { - /** - * Message is queued but not yet sent - */ - PENDING, - - /** - * Message has been sent to the transport layer - */ - SENT, - - /** - * Client has acknowledged receipt of the message - */ - ACKNOWLEDGED, - - /** - * Delivery failed after all retry attempts - */ - FAILED, - - /** - * Message expired before delivery could be confirmed - */ - EXPIRED -} diff --git a/src/main/java/de/assecutor/votianlt/messaging/model/MessageEnvelope.java b/src/main/java/de/assecutor/votianlt/messaging/model/MessageEnvelope.java deleted file mode 100644 index 9ceecda..0000000 --- a/src/main/java/de/assecutor/votianlt/messaging/model/MessageEnvelope.java +++ /dev/null @@ -1,101 +0,0 @@ -package de.assecutor.votianlt.messaging.model; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.time.LocalDateTime; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -/** - * Envelope that wraps all messages sent through the messaging system. Contains - * metadata for delivery tracking and acknowledgment. This is a DTO class - not - * persisted to MongoDB. - */ -@Data -@NoArgsConstructor -@AllArgsConstructor -@JsonIgnoreProperties(ignoreUnknown = true) -public class MessageEnvelope { - - /** - * Unique identifier for this message (UUID) - */ - private String messageId; - - /** - * Timestamp when the envelope was created - */ - private LocalDateTime timestamp; - - /** - * Target topic for this message - */ - private String topic; - - /** - * The actual message payload (can be any serializable object) - */ - private Object payload; - - /** - * Whether this message requires acknowledgment from the receiver - */ - private boolean requiresAck; - - /** - * Number of times this message has been retried - */ - private int retryCount; - - /** - * When this message expires and should no longer be delivered - */ - private LocalDateTime expiresAt; - - /** - * Additional metadata for the message - */ - private Map metadata; - - /** - * Constructor for creating a new envelope with payload - */ - public MessageEnvelope(String topic, Object payload, boolean requiresAck, LocalDateTime expiresAt) { - this.messageId = UUID.randomUUID().toString(); - this.timestamp = LocalDateTime.now(); - this.topic = topic; - this.payload = payload; - this.requiresAck = requiresAck; - this.retryCount = 0; - this.expiresAt = expiresAt; - this.metadata = new HashMap<>(); - } - - /** - * Check if this message has expired - */ - public boolean isExpired() { - return expiresAt != null && LocalDateTime.now().isAfter(expiresAt); - } - - /** - * Increment the retry counter - */ - public void incrementRetryCount() { - this.retryCount++; - } - - /** - * Add metadata to the envelope - */ - public void addMetadata(String key, String value) { - if (this.metadata == null) { - this.metadata = new HashMap<>(); - } - this.metadata.put(key, value); - } -} diff --git a/src/main/java/de/assecutor/votianlt/messaging/model/PendingDelivery.java b/src/main/java/de/assecutor/votianlt/messaging/model/PendingDelivery.java deleted file mode 100644 index 9853839..0000000 --- a/src/main/java/de/assecutor/votianlt/messaging/model/PendingDelivery.java +++ /dev/null @@ -1,210 +0,0 @@ -package de.assecutor.votianlt.messaging.model; - -import com.fasterxml.jackson.annotation.JsonGetter; -import com.fasterxml.jackson.annotation.JsonIgnore; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; -import org.bson.types.ObjectId; -import org.springframework.data.annotation.Id; -import org.springframework.data.mongodb.core.index.Indexed; -import org.springframework.data.mongodb.core.mapping.Document; -import org.springframework.data.mongodb.core.mapping.Field; - -import java.time.LocalDateTime; - -/** - * Represents a message delivery that is pending acknowledgment. Stored in - * MongoDB for retry and tracking purposes. - */ -@Data -@NoArgsConstructor -@AllArgsConstructor -@Document(collection = "pending_deliveries") -public class PendingDelivery { - - @Id - @JsonIgnore - private ObjectId id; - - /** - * Unique message identifier - */ - @Field("message_id") - @Indexed(unique = true) - private String messageId; - - /** - * Target topic for this message - */ - @Field("topic") - private String topic; - - /** - * Serialized envelope data (JSON bytes) - */ - @Field("envelope_data") - private byte[] envelopeData; - - /** - * Current delivery status - */ - @Field("status") - @Indexed - private DeliveryStatus status; - - /** - * When the delivery record was created - */ - @Field("created_at") - private LocalDateTime createdAt; - - /** - * When the message was last sent - */ - @Field("sent_at") - private LocalDateTime sentAt; - - /** - * When acknowledgment was received - */ - @Field("acknowledged_at") - private LocalDateTime acknowledgedAt; - - /** - * Number of retry attempts made - */ - @Field("retry_count") - private int retryCount; - - /** - * Maximum number of retries allowed - */ - @Field("max_retries") - private int maxRetries; - - /** - * When the next retry should be attempted - */ - @Field("next_retry_at") - @Indexed - private LocalDateTime nextRetryAt; - - /** - * When this delivery expires - */ - @Field("expires_at") - @Indexed - private LocalDateTime expiresAt; - - /** - * Reason for failure (if status is FAILED) - */ - @Field("failure_reason") - private String failureReason; - - /** - * Client ID (extracted from topic if available) - */ - @Field("client_id") - private String clientId; - - /** - * Constructor for new pending delivery - */ - public PendingDelivery(String messageId, String topic, byte[] envelopeData, int maxRetries, - LocalDateTime expiresAt) { - this.messageId = messageId; - this.topic = topic; - this.envelopeData = envelopeData; - this.status = DeliveryStatus.PENDING; - this.createdAt = LocalDateTime.now(); - this.retryCount = 0; - this.maxRetries = maxRetries; - this.expiresAt = expiresAt; - this.clientId = extractClientIdFromTopic(topic); - } - - /** - * Mark as sent and schedule next retry - */ - public void markAsSent(LocalDateTime nextRetryAt) { - this.status = DeliveryStatus.SENT; - this.sentAt = LocalDateTime.now(); - this.nextRetryAt = nextRetryAt; - } - - /** - * Mark as acknowledged - */ - public void markAsAcknowledged() { - this.status = DeliveryStatus.ACKNOWLEDGED; - this.acknowledgedAt = LocalDateTime.now(); - } - - /** - * Mark as failed with reason - */ - public void markAsFailed(String reason) { - this.status = DeliveryStatus.FAILED; - this.failureReason = reason; - } - - /** - * Mark as expired - */ - public void markAsExpired() { - this.status = DeliveryStatus.EXPIRED; - this.failureReason = "Message expired before delivery could be confirmed"; - } - - /** - * Increment retry count - */ - public void incrementRetryCount() { - this.retryCount++; - } - - /** - * Check if max retries reached - */ - public boolean hasReachedMaxRetries() { - return retryCount >= maxRetries; - } - - /** - * Check if expired - */ - public boolean isExpired() { - return expiresAt != null && LocalDateTime.now().isAfter(expiresAt); - } - - /** - * Check if ready for retry - */ - public boolean isReadyForRetry() { - return status == DeliveryStatus.SENT && nextRetryAt != null && LocalDateTime.now().isAfter(nextRetryAt) - && !hasReachedMaxRetries() && !isExpired(); - } - - /** - * Extract client ID from topic pattern /client/{clientId}/... - */ - private String extractClientIdFromTopic(String topic) { - if (topic != null && topic.startsWith("/client/")) { - String[] parts = topic.split("/"); - if (parts.length > 2) { - return parts[2]; - } - } - return null; - } - - /** - * Returns the ObjectId as string for JSON serialization - */ - @JsonGetter("id") - public String getIdAsString() { - return id != null ? id.toString() : null; - } -} diff --git a/src/main/java/de/assecutor/votianlt/messaging/plugin/ConnectionStateEvent.java b/src/main/java/de/assecutor/votianlt/messaging/plugin/ConnectionStateEvent.java deleted file mode 100644 index 8263100..0000000 --- a/src/main/java/de/assecutor/votianlt/messaging/plugin/ConnectionStateEvent.java +++ /dev/null @@ -1,107 +0,0 @@ -package de.assecutor.votianlt.messaging.plugin; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.time.LocalDateTime; - -/** - * Event representing a connection state change. - */ -@Data -@Builder -@NoArgsConstructor -@AllArgsConstructor -public class ConnectionStateEvent { - - /** - * Connection state - */ - private ConnectionState state; - - /** - * Previous connection state - */ - private ConnectionState previousState; - - /** - * Timestamp of the state change - */ - @Builder.Default - private LocalDateTime timestamp = LocalDateTime.now(); - - /** - * Optional error message if state is ERROR or DISCONNECTED - */ - private String errorMessage; - - /** - * Optional exception if state is ERROR - */ - private Throwable exception; - - /** - * Plugin that generated this event - */ - private String pluginName; - - /** - * Connection states - */ - public enum ConnectionState { - /** - * Plugin is initializing - */ - INITIALIZING, - - /** - * Plugin is connecting to the transport - */ - CONNECTING, - - /** - * Plugin is connected and ready - */ - CONNECTED, - - /** - * Plugin is disconnecting - */ - DISCONNECTING, - - /** - * Plugin is disconnected - */ - DISCONNECTED, - - /** - * Plugin encountered an error - */ - ERROR, - - /** - * Plugin is reconnecting after a failure - */ - RECONNECTING - } - - /** - * Check if the connection is active. - * - * @return true if connected - */ - public boolean isConnected() { - return state == ConnectionState.CONNECTED; - } - - /** - * Check if there was an error. - * - * @return true if state is ERROR - */ - public boolean isError() { - return state == ConnectionState.ERROR; - } -} diff --git a/src/main/java/de/assecutor/votianlt/messaging/plugin/MessagingPlugin.java b/src/main/java/de/assecutor/votianlt/messaging/plugin/MessagingPlugin.java deleted file mode 100644 index 0c422cb..0000000 --- a/src/main/java/de/assecutor/votianlt/messaging/plugin/MessagingPlugin.java +++ /dev/null @@ -1,182 +0,0 @@ -package de.assecutor.votianlt.messaging.plugin; - -import java.util.concurrent.CompletableFuture; - -/** - * Interface for messaging transport plugins. Plugins implement specific - * transport protocols (MQTT, WebSocket, gRPC, etc.) and provide a unified - * interface for the messaging layer. - * - * The plugin is responsible for managing the internal topic/channel structure. - * The messaging layer only uses clientId and messageType as identifiers. - */ -public interface MessagingPlugin { - - /** - * Initialize the plugin with configuration. Called once during application - * startup. - * - * @param config - * Plugin-specific configuration - * @throws PluginException - * if initialization fails - */ - void init(PluginConfig config) throws PluginException; - - /** - * Shutdown the plugin and release resources. Called during application - * shutdown. - * - * @throws PluginException - * if shutdown fails - */ - void exit() throws PluginException; - - /** - * Callback when connection state changes. The plugin should call this method - * when the underlying transport connection state changes (connected, - * disconnected, error). - * - * @param listener - * Connection state listener - */ - void setConnectionListener(ConnectionStateListener listener); - - /** - * Send a message to a specific client. The plugin is responsible for - * determining the correct topic/channel based on the messageType. - * - * @param clientId - * Target client identifier - * @param messageType - * Type of message (e.g., "jobs", "message", "auth", "task") - * @param payload - * Message payload as byte array - * @param options - * Transport-specific options - * @return CompletableFuture that completes when message is sent - * @throws PluginException - * if sending fails - */ - CompletableFuture sendToClient(String clientId, String messageType, byte[] payload, SendOptions options) - throws PluginException; - - /** - * Send an acknowledgment to a specific client. The plugin is responsible for - * determining the correct ACK topic/channel. - * - * @param clientId - * Target client identifier - * @param messageId - * Message ID being acknowledged - * @param payload - * ACK payload as byte array - * @param options - * Transport-specific options - * @return CompletableFuture that completes when ACK is sent - * @throws PluginException - * if sending fails - */ - CompletableFuture sendAckToClient(String clientId, String messageId, byte[] payload, SendOptions options) - throws PluginException; - - /** - * Register a handler for incoming messages of a specific type from clients. The - * plugin is responsible for subscribing to the appropriate topics/channels. - * - * @param messageType - * Type of message to handle (e.g., "task_completed", "message", - * "jobs/assigned", "login") - * @param handler - * Message handler to be called when a message is received - * @throws PluginException - * if registration fails - */ - void registerMessageHandler(String messageType, ClientMessageHandler handler) throws PluginException; - - /** - * Register a handler for incoming acknowledgments from clients. The plugin is - * responsible for subscribing to the appropriate ACK topics/channels. - * - * @param handler - * ACK handler to be called when an ACK is received - * @throws PluginException - * if registration fails - */ - void registerAckHandler(AckHandler handler) throws PluginException; - - /** - * Check if the plugin is currently connected. - * - * @return true if connected, false otherwise - */ - boolean isConnected(); - - /** - * Get the plugin name/type identifier. - * - * @return Plugin name (e.g., "mqtt", "websocket", "grpc") - */ - String getPluginName(); - - /** - * Get plugin version. - * - * @return Plugin version string - */ - String getPluginVersion(); - - /** - * Get plugin metadata/information. - * - * @return Plugin metadata - */ - PluginMetadata getMetadata(); - - /** - * Callback interface for connection state changes. - */ - @FunctionalInterface - interface ConnectionStateListener { - /** - * Called when connection state changes. - * - * @param event - * Connection state event - */ - void onConnectionStateChanged(ConnectionStateEvent event); - } - - /** - * Handler for received messages from clients. Includes the clientId extracted - * from the topic/channel. - */ - @FunctionalInterface - interface ClientMessageHandler { - /** - * Called when a message is received from a client. - * - * @param clientId - * Client identifier extracted from the topic/channel - * @param payload - * Message payload as byte array - */ - void onMessageReceived(String clientId, byte[] payload); - } - - /** - * Handler for received acknowledgments from clients. - */ - @FunctionalInterface - interface AckHandler { - /** - * Called when an ACK is received from a client. - * - * @param messageId - * Message ID being acknowledged - * @param payload - * ACK payload as byte array - */ - void onAckReceived(String messageId, byte[] payload); - } -} diff --git a/src/main/java/de/assecutor/votianlt/messaging/plugin/PluginConfig.java b/src/main/java/de/assecutor/votianlt/messaging/plugin/PluginConfig.java deleted file mode 100644 index 9f7dcc1..0000000 --- a/src/main/java/de/assecutor/votianlt/messaging/plugin/PluginConfig.java +++ /dev/null @@ -1,141 +0,0 @@ -package de.assecutor.votianlt.messaging.plugin; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.util.HashMap; -import java.util.Map; - -/** - * Configuration for messaging plugins. Provides a flexible key-value store for - * plugin-specific settings. - */ -@Data -@Builder -@NoArgsConstructor -@AllArgsConstructor -public class PluginConfig { - - /** - * Plugin-specific properties - */ - @Builder.Default - private Map properties = new HashMap<>(); - - /** - * Get a string property. - * - * @param key - * Property key - * @return Property value or null if not found - */ - public String getString(String key) { - Object value = properties.get(key); - return value != null ? value.toString() : null; - } - - /** - * Get a string property with default value. - * - * @param key - * Property key - * @param defaultValue - * Default value if property not found - * @return Property value or default - */ - public String getString(String key, String defaultValue) { - String value = getString(key); - return value != null ? value : defaultValue; - } - - /** - * Get an integer property. - * - * @param key - * Property key - * @return Property value or null if not found - */ - public Integer getInt(String key) { - Object value = properties.get(key); - if (value instanceof Integer) { - return (Integer) value; - } else if (value instanceof String) { - try { - return Integer.parseInt((String) value); - } catch (NumberFormatException e) { - return null; - } - } - return null; - } - - /** - * Get an integer property with default value. - * - * @param key - * Property key - * @param defaultValue - * Default value if property not found - * @return Property value or default - */ - public int getInt(String key, int defaultValue) { - Integer value = getInt(key); - return value != null ? value : defaultValue; - } - - /** - * Get a boolean property. - * - * @param key - * Property key - * @return Property value or null if not found - */ - public Boolean getBoolean(String key) { - Object value = properties.get(key); - if (value instanceof Boolean) { - return (Boolean) value; - } else if (value instanceof String) { - return Boolean.parseBoolean((String) value); - } - return null; - } - - /** - * Get a boolean property with default value. - * - * @param key - * Property key - * @param defaultValue - * Default value if property not found - * @return Property value or default - */ - public boolean getBoolean(String key, boolean defaultValue) { - Boolean value = getBoolean(key); - return value != null ? value : defaultValue; - } - - /** - * Set a property. - * - * @param key - * Property key - * @param value - * Property value - */ - public void setProperty(String key, Object value) { - properties.put(key, value); - } - - /** - * Check if a property exists. - * - * @param key - * Property key - * @return true if property exists - */ - public boolean hasProperty(String key) { - return properties.containsKey(key); - } -} diff --git a/src/main/java/de/assecutor/votianlt/messaging/plugin/PluginException.java b/src/main/java/de/assecutor/votianlt/messaging/plugin/PluginException.java deleted file mode 100644 index 1362bbe..0000000 --- a/src/main/java/de/assecutor/votianlt/messaging/plugin/PluginException.java +++ /dev/null @@ -1,19 +0,0 @@ -package de.assecutor.votianlt.messaging.plugin; - -/** - * Exception thrown by messaging plugins. - */ -public class PluginException extends Exception { - - public PluginException(String message) { - super(message); - } - - public PluginException(String message, Throwable cause) { - super(message, cause); - } - - public PluginException(Throwable cause) { - super(cause); - } -} diff --git a/src/main/java/de/assecutor/votianlt/messaging/plugin/PluginManager.java b/src/main/java/de/assecutor/votianlt/messaging/plugin/PluginManager.java deleted file mode 100644 index 76a1b2d..0000000 --- a/src/main/java/de/assecutor/votianlt/messaging/plugin/PluginManager.java +++ /dev/null @@ -1,271 +0,0 @@ -package de.assecutor.votianlt.messaging.plugin; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -import jakarta.annotation.PreDestroy; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; - -/** - * Manager for messaging plugins. Handles plugin lifecycle, registration, and - * delegation. - */ -@Component -@Slf4j -public class PluginManager { - - private MessagingPlugin activePlugin; - private final List connectionHistory = new ArrayList<>(); - private final List stateListeners = new ArrayList<>(); - - /** - * Initialize and activate a plugin. - * - * @param plugin - * Plugin to activate - * @param config - * Plugin configuration - * @throws PluginException - * if initialization fails - */ - public void activatePlugin(MessagingPlugin plugin, PluginConfig config) throws PluginException { - log.info("[PluginManager] Activating plugin: {}", plugin.getPluginName()); - - // Shutdown existing plugin if any - if (activePlugin != null) { - log.info("[PluginManager] Shutting down existing plugin: {}", activePlugin.getPluginName()); - try { - activePlugin.exit(); - } catch (Exception e) { - log.error("[PluginManager] Error shutting down existing plugin: {}", e.getMessage(), e); - } - } - - // Set connection listener - plugin.setConnectionListener(event -> { - String previousState = event.getPreviousState() != null ? event.getPreviousState().toString() : "NONE"; - log.info("[PluginManager] Connection state changed: {} -> {}", previousState, event.getState()); - connectionHistory.add(event); - notifyStateListeners(event); - }); - - // Initialize plugin - plugin.init(config); - activePlugin = plugin; - - log.info("[PluginManager] Plugin activated: {} v{}", plugin.getPluginName(), plugin.getPluginVersion()); - } - - /** - * Get the currently active plugin. - * - * @return Active plugin or empty if none - */ - public Optional getActivePlugin() { - return Optional.ofNullable(activePlugin); - } - - /** - * Send a message to a specific client via the active plugin. - * - * @param clientId - * Target client identifier - * @param messageType - * Type of message (e.g., "jobs", "message", "auth", "task") - * @param payload - * Message payload - * @param options - * Send options - * @return CompletableFuture that completes when message is sent - * @throws PluginException - * if no plugin is active or sending fails - */ - public CompletableFuture sendToClient(String clientId, String messageType, byte[] payload, - SendOptions options) throws PluginException { - if (activePlugin == null) { - return CompletableFuture.failedFuture(new PluginException("No active plugin")); - } - - if (!activePlugin.isConnected()) { - return CompletableFuture.failedFuture(new PluginException("Plugin is not connected")); - } - - return activePlugin.sendToClient(clientId, messageType, payload, options); - } - - /** - * Send an acknowledgment to a specific client via the active plugin. - * - * @param clientId - * Target client identifier - * @param messageId - * Message ID being acknowledged - * @param payload - * ACK payload - * @param options - * Send options - * @return CompletableFuture that completes when ACK is sent - * @throws PluginException - * if no plugin is active or sending fails - */ - public CompletableFuture sendAckToClient(String clientId, String messageId, byte[] payload, - SendOptions options) throws PluginException { - if (activePlugin == null) { - return CompletableFuture.failedFuture(new PluginException("No active plugin")); - } - - if (!activePlugin.isConnected()) { - return CompletableFuture.failedFuture(new PluginException("Plugin is not connected")); - } - - return activePlugin.sendAckToClient(clientId, messageId, payload, options); - } - - /** - * Register a handler for incoming messages of a specific type from clients. - * - * @param messageType - * Type of message to handle - * @param handler - * Message handler - * @throws PluginException - * if no plugin is active or registration fails - */ - public void registerMessageHandler(String messageType, MessagingPlugin.ClientMessageHandler handler) - throws PluginException { - if (activePlugin == null) { - throw new PluginException("No active plugin"); - } - - activePlugin.registerMessageHandler(messageType, handler); - } - - /** - * Register a handler for incoming acknowledgments from clients. - * - * @param handler - * ACK handler - * @throws PluginException - * if no plugin is active or registration fails - */ - public void registerAckHandler(MessagingPlugin.AckHandler handler) throws PluginException { - if (activePlugin == null) { - throw new PluginException("No active plugin"); - } - - activePlugin.registerAckHandler(handler); - } - - /** - * Check if the active plugin is connected. - * - * @return true if connected, false otherwise - */ - public boolean isConnected() { - return activePlugin != null && activePlugin.isConnected(); - } - - /** - * Get metadata of the active plugin. - * - * @return Plugin metadata or empty if no plugin is active - */ - public Optional getActivePluginMetadata() { - return Optional.ofNullable(activePlugin).map(MessagingPlugin::getMetadata); - } - - /** - * Get connection history. - * - * @return List of connection state events - */ - public List getConnectionHistory() { - return new ArrayList<>(connectionHistory); - } - - /** - * Get the last connection state event. - * - * @return Last connection state event or empty if none - */ - public Optional getLastConnectionState() { - if (connectionHistory.isEmpty()) { - return Optional.empty(); - } - return Optional.of(connectionHistory.get(connectionHistory.size() - 1)); - } - - /** - * Add a plugin state listener. - * - * @param listener - * State listener - */ - public void addStateListener(PluginStateListener listener) { - stateListeners.add(listener); - } - - /** - * Remove a plugin state listener. - * - * @param listener - * State listener - */ - public void removeStateListener(PluginStateListener listener) { - stateListeners.remove(listener); - } - - /** - * Notify all state listeners of a connection state change. - * - * @param event - * Connection state event - */ - private void notifyStateListeners(ConnectionStateEvent event) { - for (PluginStateListener listener : stateListeners) { - try { - listener.onConnectionStateChanged(event); - } catch (Exception e) { - log.error("[PluginManager] Error in state listener: {}", e.getMessage(), e); - } - } - } - - /** - * Shutdown the plugin manager and active plugin. - */ - @PreDestroy - public void shutdown() { - log.info("[PluginManager] Shutting down plugin manager"); - - if (activePlugin != null) { - try { - activePlugin.exit(); - log.info("[PluginManager] Active plugin shut down successfully"); - } catch (Exception e) { - log.error("[PluginManager] Error shutting down active plugin: {}", e.getMessage(), e); - } - activePlugin = null; - } - - stateListeners.clear(); - connectionHistory.clear(); - } - - /** - * Listener interface for plugin state changes. - */ - @FunctionalInterface - public interface PluginStateListener { - /** - * Called when plugin connection state changes. - * - * @param event - * Connection state event - */ - void onConnectionStateChanged(ConnectionStateEvent event); - } -} diff --git a/src/main/java/de/assecutor/votianlt/messaging/plugin/PluginMetadata.java b/src/main/java/de/assecutor/votianlt/messaging/plugin/PluginMetadata.java deleted file mode 100644 index 54a72cc..0000000 --- a/src/main/java/de/assecutor/votianlt/messaging/plugin/PluginMetadata.java +++ /dev/null @@ -1,92 +0,0 @@ -package de.assecutor.votianlt.messaging.plugin; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.util.ArrayList; -import java.util.List; - -/** - * Metadata about a messaging plugin. - */ -@Data -@Builder -@NoArgsConstructor -@AllArgsConstructor -public class PluginMetadata { - - /** - * Plugin name - */ - private String name; - - /** - * Plugin version - */ - private String version; - - /** - * Plugin description - */ - private String description; - - /** - * Plugin author/vendor - */ - private String author; - - /** - * Supported features - */ - @Builder.Default - private List supportedFeatures = new ArrayList<>(); - - /** - * Whether the plugin supports wildcards in topic patterns - */ - @Builder.Default - private boolean supportsWildcards = false; - - /** - * Whether the plugin supports retained messages - */ - @Builder.Default - private boolean supportsRetainedMessages = false; - - /** - * Whether the plugin supports QoS levels - */ - @Builder.Default - private boolean supportsQos = false; - - /** - * Maximum QoS level supported (0, 1, 2) - */ - @Builder.Default - private int maxQosLevel = 0; - - /** - * Check if a feature is supported. - * - * @param feature - * Feature name - * @return true if supported - */ - public boolean supportsFeature(String feature) { - return supportedFeatures.contains(feature); - } - - /** - * Add a supported feature. - * - * @param feature - * Feature name - */ - public void addSupportedFeature(String feature) { - if (!supportedFeatures.contains(feature)) { - supportedFeatures.add(feature); - } - } -} diff --git a/src/main/java/de/assecutor/votianlt/messaging/plugin/ReceivedMessage.java b/src/main/java/de/assecutor/votianlt/messaging/plugin/ReceivedMessage.java deleted file mode 100644 index a44eba1..0000000 --- a/src/main/java/de/assecutor/votianlt/messaging/plugin/ReceivedMessage.java +++ /dev/null @@ -1,84 +0,0 @@ -package de.assecutor.votianlt.messaging.plugin; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.time.LocalDateTime; -import java.util.HashMap; -import java.util.Map; - -/** - * Represents a message received from a messaging plugin. - */ -@Data -@Builder -@NoArgsConstructor -@AllArgsConstructor -public class ReceivedMessage { - - /** - * Topic/channel the message was received on - */ - private String topic; - - /** - * Message payload - */ - private byte[] payload; - - /** - * Quality of Service level (if applicable) - */ - private int qos; - - /** - * Whether the message was retained - */ - private boolean retained; - - /** - * Timestamp when message was received - */ - @Builder.Default - private LocalDateTime receivedAt = LocalDateTime.now(); - - /** - * Additional metadata from the transport - */ - @Builder.Default - private Map metadata = new HashMap<>(); - - /** - * Get metadata value. - * - * @param key - * Metadata key - * @return Metadata value or null - */ - public Object getMetadata(String key) { - return metadata.get(key); - } - - /** - * Set metadata value. - * - * @param key - * Metadata key - * @param value - * Metadata value - */ - public void setMetadata(String key, Object value) { - metadata.put(key, value); - } - - /** - * Get payload as UTF-8 string. - * - * @return Payload as string - */ - public String getPayloadAsString() { - return payload != null ? new String(payload, java.nio.charset.StandardCharsets.UTF_8) : null; - } -} diff --git a/src/main/java/de/assecutor/votianlt/messaging/plugin/SendOptions.java b/src/main/java/de/assecutor/votianlt/messaging/plugin/SendOptions.java deleted file mode 100644 index 8bb064c..0000000 --- a/src/main/java/de/assecutor/votianlt/messaging/plugin/SendOptions.java +++ /dev/null @@ -1,99 +0,0 @@ -package de.assecutor.votianlt.messaging.plugin; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.util.HashMap; -import java.util.Map; - -/** - * Options for sending messages via plugins. Provides transport-agnostic options - * with extensibility for plugin-specific settings. - */ -@Data -@Builder -@NoArgsConstructor -@AllArgsConstructor -public class SendOptions { - - /** - * Quality of Service level (0, 1, 2 for MQTT-like transports) - */ - @Builder.Default - private int qos = 1; - - /** - * Whether the message should be retained by the broker/server - */ - @Builder.Default - private boolean retained = false; - - /** - * Message priority (if supported by transport) - */ - @Builder.Default - private int priority = 0; - - /** - * Message expiry time in seconds (if supported by transport) - */ - private Long expirySeconds; - - /** - * Additional plugin-specific options - */ - @Builder.Default - private Map additionalOptions = new HashMap<>(); - - /** - * Get an additional option. - * - * @param key - * Option key - * @return Option value or null - */ - public Object getAdditionalOption(String key) { - return additionalOptions.get(key); - } - - /** - * Set an additional option. - * - * @param key - * Option key - * @param value - * Option value - */ - public void setAdditionalOption(String key, Object value) { - additionalOptions.put(key, value); - } - - /** - * Create default send options. - * - * @return Default options - */ - public static SendOptions defaults() { - return SendOptions.builder().build(); - } - - /** - * Create options for fire-and-forget messages. - * - * @return Fire-and-forget options - */ - public static SendOptions fireAndForget() { - return SendOptions.builder().qos(0).retained(false).build(); - } - - /** - * Create options for reliable delivery. - * - * @return Reliable delivery options - */ - public static SendOptions reliable() { - return SendOptions.builder().qos(2).retained(false).build(); - } -} diff --git a/src/main/java/de/assecutor/votianlt/messaging/plugin/mqtt/MqttMessagingPlugin.java b/src/main/java/de/assecutor/votianlt/messaging/plugin/mqtt/MqttMessagingPlugin.java deleted file mode 100644 index 1894c17..0000000 --- a/src/main/java/de/assecutor/votianlt/messaging/plugin/mqtt/MqttMessagingPlugin.java +++ /dev/null @@ -1,416 +0,0 @@ -package de.assecutor.votianlt.messaging.plugin.mqtt; - -import com.hivemq.client.mqtt.MqttClient; -import com.hivemq.client.mqtt.datatypes.MqttQos; -import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; -import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; -import de.assecutor.votianlt.messaging.plugin.*; -import de.assecutor.votianlt.messaging.plugin.ConnectionStateEvent.ConnectionState; -import lombok.extern.slf4j.Slf4j; - -import java.nio.charset.StandardCharsets; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; - -/** - * MQTT implementation of the MessagingPlugin interface. Uses HiveMQ MQTT 5 - * client for communication. - * - * Topic Structure (managed internally): - Server -> Client: - * /client/{clientId}/{messageType} - Client -> Server: - * /server/{clientId}/{messageType} - ACK Server -> Client: - * /client/{clientId}/ack (messageId in payload) - ACK Client -> Server: - * /server/{clientId}/ack (messageId in payload) - */ -@Slf4j -public class MqttMessagingPlugin implements MessagingPlugin { - - private static final String PLUGIN_NAME = "mqtt"; - private static final String PLUGIN_VERSION = "2.0.0"; - - // Topic templates - private static final String TOPIC_TO_CLIENT = "/client/%s/%s"; // /client/{clientId}/{messageType} - private static final String TOPIC_ACK_TO_CLIENT = "/client/%s/ack"; // /client/{clientId}/ack (messageId in payload) - - // Subscription patterns - private static final String PATTERN_FROM_CLIENT = "/server/+/%s"; // /server/+/{messageType} - private static final String PATTERN_ACK_FROM_CLIENT = "/server/+/ack"; // /server/+/ack - - private Mqtt5AsyncClient mqttClient; - private ConnectionStateListener connectionListener; - private final Map messageHandlers = new ConcurrentHashMap<>(); - private AckHandler ackHandler; - private volatile boolean connected = false; - - // Configuration keys - private static final String CONFIG_BROKER_HOST = "broker.host"; - private static final String CONFIG_BROKER_PORT = "broker.port"; - private static final String CONFIG_USERNAME = "username"; - private static final String CONFIG_PASSWORD = "password"; - private static final String CONFIG_CLIENT_ID = "client.id"; - private static final String CONFIG_CLEAN_START = "clean.start"; - private static final String CONFIG_CONNECTION_TIMEOUT = "connection.timeout.seconds"; - private static final String CONFIG_KEEP_ALIVE = "keep.alive.seconds"; - - @Override - public void init(PluginConfig config) throws PluginException { - try { - notifyConnectionState(ConnectionState.INITIALIZING, null); - - // Extract configuration - String brokerHost = config.getString(CONFIG_BROKER_HOST, "localhost"); - int brokerPort = config.getInt(CONFIG_BROKER_PORT, 1883); - String username = config.getString(CONFIG_USERNAME); - String password = config.getString(CONFIG_PASSWORD); - String clientId = config.getString(CONFIG_CLIENT_ID, "votianlt-" + UUID.randomUUID()); - boolean cleanStart = config.getBoolean(CONFIG_CLEAN_START, true); - int connectionTimeout = config.getInt(CONFIG_CONNECTION_TIMEOUT, 60); - int keepAlive = config.getInt(CONFIG_KEEP_ALIVE, 60); - - // Build MQTT client - var clientBuilder = MqttClient.builder().useMqttVersion5().identifier(clientId).serverHost(brokerHost) - .serverPort(brokerPort).automaticReconnect().initialDelay(1, java.util.concurrent.TimeUnit.SECONDS) - .maxDelay(30, java.util.concurrent.TimeUnit.SECONDS).applyAutomaticReconnect(); - - mqttClient = clientBuilder.buildAsync(); - - // Build connect options - var connectBuilder = com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect.builder() - .cleanStart(cleanStart).keepAlive(keepAlive); - - if (username != null && password != null) { - connectBuilder.simpleAuth().username(username).password(password.getBytes(StandardCharsets.UTF_8)) - .applySimpleAuth(); - } - - // Connect asynchronously - notifyConnectionState(ConnectionState.CONNECTING, null); - - mqttClient.connect(connectBuilder.build()) - .orTimeout(connectionTimeout, java.util.concurrent.TimeUnit.SECONDS) - .whenComplete((connAck, throwable) -> { - if (throwable != null) { - String errorMsg = String.format("Connection to %s:%d failed: %s", brokerHost, brokerPort, - throwable.getMessage()); - log.error("[MQTT] Connection failed: {}", errorMsg); - connected = false; - notifyConnectionState(ConnectionState.ERROR, errorMsg); - } else { - log.info("[MQTT] Server connected to {}:{}", brokerHost, brokerPort); - connected = true; - setupGlobalMessageHandler(); - notifyConnectionState(ConnectionState.CONNECTED, null); - } - }); - - } catch (Exception e) { - log.error("[MQTT] Initialization failed: {}", e.getMessage(), e); - throw new PluginException("Failed to initialize MQTT plugin", e); - } - } - - @Override - public void exit() throws PluginException { - try { - notifyConnectionState(ConnectionState.DISCONNECTING, null); - - if (mqttClient != null) { - var clientState = mqttClient.getState(); - if (clientState.isConnected()) { - try { - mqttClient.disconnect().join(); - } catch (Exception disconnectEx) { - // Client may already be disconnected - } - } - } - - connected = false; - messageHandlers.clear(); - ackHandler = null; - notifyConnectionState(ConnectionState.DISCONNECTED, null); - - } catch (Exception e) { - log.error("[MQTT] Shutdown failed: {}", e.getMessage()); - connected = false; - messageHandlers.clear(); - ackHandler = null; - } - } - - @Override - public void setConnectionListener(ConnectionStateListener listener) { - this.connectionListener = listener; - } - - @Override - public CompletableFuture sendToClient(String clientId, String messageType, byte[] payload, - SendOptions options) throws PluginException { - if (!connected) { - return CompletableFuture.failedFuture(new PluginException("MQTT client is not connected")); - } - - String topic = String.format(TOPIC_TO_CLIENT, clientId, messageType); - String json = new String(payload, StandardCharsets.UTF_8); - log.info("[MQTT OUT] {} -> {}", topic, json); - - return sendToTopic(topic, payload, options); - } - - @Override - public CompletableFuture sendAckToClient(String clientId, String messageId, byte[] payload, - SendOptions options) throws PluginException { - if (!connected) { - return CompletableFuture.failedFuture(new PluginException("MQTT client is not connected")); - } - - String topic = String.format(TOPIC_ACK_TO_CLIENT, clientId); - return sendToTopic(topic, payload, options); - } - - @Override - public void registerMessageHandler(String messageType, ClientMessageHandler handler) throws PluginException { - if (!connected) { - throw new PluginException("MQTT client is not connected"); - } - - messageHandlers.put(messageType, handler); - - // Special case for login: subscribe to /server/login (without clientId) - if ("login".equals(messageType)) { - String loginTopic = "/server/login"; - mqttClient.subscribeWith().topicFilter(loginTopic).qos(MqttQos.EXACTLY_ONCE).send() - .whenComplete((subAck, throwable) -> { - if (throwable != null) { - log.error("[MQTT] Subscription to {} failed: {}", loginTopic, throwable.getMessage()); - messageHandlers.remove(messageType); - } - }); - } else { - // Standard pattern: /server/+/{messageType} - String topicPattern = String.format(PATTERN_FROM_CLIENT, messageType); - mqttClient.subscribeWith().topicFilter(topicPattern).qos(MqttQos.EXACTLY_ONCE).send() - .whenComplete((subAck, throwable) -> { - if (throwable != null) { - log.error("[MQTT] Subscription to {} failed: {}", topicPattern, throwable.getMessage()); - messageHandlers.remove(messageType); - } - }); - } - } - - @Override - public void registerAckHandler(AckHandler handler) throws PluginException { - if (!connected) { - throw new PluginException("MQTT client is not connected"); - } - - this.ackHandler = handler; - - // Subscribe to ACK topic pattern - mqttClient.subscribeWith().topicFilter(PATTERN_ACK_FROM_CLIENT).qos(MqttQos.EXACTLY_ONCE).send() - .whenComplete((subAck, throwable) -> { - if (throwable != null) { - log.error("[MQTT] Subscription to {} failed: {}", PATTERN_ACK_FROM_CLIENT, - throwable.getMessage()); - this.ackHandler = null; - } - }); - } - - @Override - public boolean isConnected() { - return connected; - } - - @Override - public String getPluginName() { - return PLUGIN_NAME; - } - - @Override - public String getPluginVersion() { - return PLUGIN_VERSION; - } - - @Override - public PluginMetadata getMetadata() { - return PluginMetadata.builder().name(PLUGIN_NAME).version(PLUGIN_VERSION) - .description("MQTT v5 messaging plugin using HiveMQ client").supportsWildcards(true) - .supportsRetainedMessages(true).supportsQos(true).maxQosLevel(2).build(); - } - - /** - * Setup global message handler to route incoming messages to registered - * handlers. - */ - private void setupGlobalMessageHandler() { - mqttClient.publishes(com.hivemq.client.mqtt.MqttGlobalPublishFilter.ALL, publish -> { - handleIncomingMessage(publish); - }); - } - - /** - * Handle incoming MQTT message and route to appropriate handler. - */ - private void handleIncomingMessage(Mqtt5Publish publish) { - String topic = publish.getTopic().toString(); - byte[] payload = publish.getPayloadAsBytes(); - String json = new String(payload, StandardCharsets.UTF_8); - - // Log incoming message with topic and JSON - log.info("[MQTT IN] {} <- {}", topic, json); - - try { - // Check if it's an ACK message (topic ends with /ack) - if (topic.startsWith("/server/") && topic.endsWith("/ack")) { - handleAckMessage(topic, payload); - } - // Check if it's a client message - else if (topic.startsWith("/server/")) { - handleClientMessage(topic, payload); - } - } catch (Exception e) { - log.error("[MQTT] Error handling message on topic {}: {}", topic, e.getMessage()); - } - } - - /** - * Handle ACK message from client. Topic format: /server/{clientId}/ack - * (messageId in payload) - */ - private void handleAckMessage(String topic, byte[] payload) { - if (ackHandler == null) { - return; - } - - // Extract clientId from topic: /server/{clientId}/ack - String[] parts = topic.split("/"); - if (parts.length >= 4) { - // Extract messageId from payload - String payloadStr = new String(payload, StandardCharsets.UTF_8); - String messageId = extractMessageIdFromPayload(payloadStr); - - if (messageId != null) { - ackHandler.onAckReceived(messageId, payload); - } - } - } - - /** - * Extract messageId from ACK payload. Expected payload format: JSON with - * "messageId" field, e.g., {"messageId": "abc-123"} or plain messageId string. - */ - private String extractMessageIdFromPayload(String payload) { - if (payload == null || payload.isBlank()) { - return null; - } - - payload = payload.trim(); - - // Try to extract from JSON format: {"messageId": "..."} - if (payload.startsWith("{")) { - // Simple JSON parsing for messageId field - int keyIndex = payload.indexOf("\"messageId\""); - if (keyIndex == -1) { - keyIndex = payload.indexOf("'messageId'"); - } - if (keyIndex >= 0) { - int colonIndex = payload.indexOf(":", keyIndex); - if (colonIndex >= 0) { - int valueStart = payload.indexOf("\"", colonIndex); - if (valueStart == -1) { - valueStart = payload.indexOf("'", colonIndex); - } - if (valueStart >= 0) { - char quote = payload.charAt(valueStart); - int valueEnd = payload.indexOf(quote, valueStart + 1); - if (valueEnd > valueStart) { - return payload.substring(valueStart + 1, valueEnd); - } - } - } - } - } - - // If not JSON, treat the entire payload as the messageId - return payload; - } - - /** - * Handle client message. Topic format: /server/{clientId}/{messageType} or - * /server/{messageType} (for login) messageType can contain slashes, e.g., - * "jobs/assigned" - */ - private void handleClientMessage(String topic, byte[] payload) { - // Extract clientId and messageType from topic - String[] parts = topic.split("/"); - - // Handle /server/login (without clientId) - if (parts.length == 3 && "login".equals(parts[2])) { - String messageType = parts[2]; - ClientMessageHandler handler = messageHandlers.get(messageType); - if (handler != null) { - handler.onMessageReceived(null, payload); - } - return; - } - - // Handle /server/{clientId}/{messageType} where messageType can contain slashes - if (parts.length >= 4) { - String clientId = parts[2]; - // Join all parts from index 3 onwards to form the full messageType - // e.g., /server/clientId/jobs/assigned -> messageType = "jobs/assigned" - String messageType = String.join("/", java.util.Arrays.copyOfRange(parts, 3, parts.length)); - - ClientMessageHandler handler = messageHandlers.get(messageType); - if (handler != null) { - handler.onMessageReceived(clientId, payload); - } - } - } - - /** - * Send message to a specific MQTT topic. - */ - private CompletableFuture sendToTopic(String topic, byte[] payload, SendOptions options) { - try { - var publishBuilder = Mqtt5Publish.builder().topic(topic).payload(payload).qos(mapQos(options.getQos())) - .retain(options.isRetained()); - - return mqttClient.publish(publishBuilder.build()).thenApply(publishResult -> null); - } catch (Exception e) { - log.error("[MQTT] Failed to publish to topic {}: {}", topic, e.getMessage()); - return CompletableFuture.failedFuture(new PluginException("Failed to publish message", e)); - } - } - - /** - * Map QoS level to MQTT QoS. - */ - private MqttQos mapQos(int qos) { - return switch (qos) { - case 0 -> MqttQos.AT_MOST_ONCE; - case 1 -> MqttQos.AT_LEAST_ONCE; - case 2 -> MqttQos.EXACTLY_ONCE; - default -> MqttQos.AT_LEAST_ONCE; - }; - } - - /** - * Notify connection state listener. - */ - private void notifyConnectionState(ConnectionState state, String message) { - if (connectionListener != null) { - ConnectionStateEvent event = ConnectionStateEvent.builder().state(state).previousState(null) - .errorMessage(message).pluginName(PLUGIN_NAME).build(); - try { - connectionListener.onConnectionStateChanged(event); - } catch (Exception e) { - log.error("[MQTT] Error in connection listener: {}", e.getMessage()); - } - } - } -} diff --git a/src/main/java/de/assecutor/votianlt/model/PendingMqttMessage.java b/src/main/java/de/assecutor/votianlt/model/PendingMqttMessage.java deleted file mode 100644 index 3af6c27..0000000 --- a/src/main/java/de/assecutor/votianlt/model/PendingMqttMessage.java +++ /dev/null @@ -1,56 +0,0 @@ -package de.assecutor.votianlt.model; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; -import org.bson.types.ObjectId; -import org.springframework.data.annotation.Id; -import org.springframework.data.mongodb.core.mapping.Document; -import org.springframework.data.mongodb.core.mapping.Field; - -import java.time.LocalDateTime; - -@Data -@NoArgsConstructor -@AllArgsConstructor -@Document(collection = "pending_mqtt_messages") -public class PendingMqttMessage { - - @Id - private ObjectId id; - - @Field("topic") - private String topic; - - @Field("payload") - private byte[] payload; - - @Field("qos") - private int qos; - - @Field("retained") - private boolean retained; - - @Field("created_at") - private LocalDateTime createdAt; - - @Field("retry_count") - private int retryCount = 0; - - @Field("last_retry_at") - private LocalDateTime lastRetryAt; - - public PendingMqttMessage(String topic, byte[] payload, int qos, boolean retained) { - this.topic = topic; - this.payload = payload; - this.qos = qos; - this.retained = retained; - this.createdAt = LocalDateTime.now(); - this.retryCount = 0; - } - - public void incrementRetryCount() { - this.retryCount++; - this.lastRetryAt = LocalDateTime.now(); - } -} \ No newline at end of file diff --git a/src/main/java/de/assecutor/votianlt/mqtt/MqttPublisher.java b/src/main/java/de/assecutor/votianlt/mqtt/MqttPublisher.java deleted file mode 100644 index 59bcb00..0000000 --- a/src/main/java/de/assecutor/votianlt/mqtt/MqttPublisher.java +++ /dev/null @@ -1,68 +0,0 @@ -package de.assecutor.votianlt.mqtt; - -import com.fasterxml.jackson.databind.ObjectMapper; -import de.assecutor.votianlt.messaging.delivery.MessageDeliveryService; -import de.assecutor.votianlt.messaging.model.DeliveryOptions; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; -import org.springframework.context.annotation.Lazy; - -/** - * Simple MQTT publishing helper to send JSON payloads. - * - * This implementation now uses MessageDeliveryService for reliable delivery - * with acknowledgment tracking and retry mechanism. - * - * Note: In environments where Spring Integration MQTT is unavailable (e.g., - * offline CI), this implementation degrades to a no-op publisher that logs the - * intended message. - */ -public interface MqttPublisher { - void publishAsJson(String topic, Object payload); - - void publishAsJson(String topic, Object payload, boolean retained); -} - -@Component -@Slf4j -class MqttPublisherImpl implements MqttPublisher { - - private final ObjectMapper objectMapper; - private final MessageDeliveryService deliveryService; - - public MqttPublisherImpl(@Lazy MessageDeliveryService deliveryService, ObjectMapper objectMapper) { - this.deliveryService = deliveryService; - this.objectMapper = objectMapper; - } - - @Override - public void publishAsJson(String topic, Object payload) { - publishAsJson(topic, payload, false); - } - - @Override - public void publishAsJson(String topic, Object payload, boolean retained) { - try { - // Parse topic to extract clientId and messageType - // Expected format: /client/{clientId}/{messageType} - String[] parts = topic.split("/"); - if (parts.length < 4 || !"client".equals(parts[1])) { - log.warn("Invalid topic format: {}. Expected /client/{clientId}/{messageType}", topic); - return; - } - String clientId = parts[2]; - String messageType = parts[3]; - - // Use MessageDeliveryService for reliable delivery - DeliveryOptions options = DeliveryOptions.builder().requiresAck(true).retained(retained).build(); - - deliveryService.sendToClient(clientId, messageType, payload, options).exceptionally(ex -> { - log.error("[MQTT] Failed to deliver to {}: {}", topic, ex.getMessage()); - return null; - }); - - } catch (Exception e) { - log.error("[MQTT] Failed to publish to {}: {}", topic, e.getMessage()); - } - } -} diff --git a/src/main/java/de/assecutor/votianlt/pages/service/AddJobService.java b/src/main/java/de/assecutor/votianlt/pages/service/AddJobService.java index cb7b664..13ea44f 100644 --- a/src/main/java/de/assecutor/votianlt/pages/service/AddJobService.java +++ b/src/main/java/de/assecutor/votianlt/pages/service/AddJobService.java @@ -5,7 +5,7 @@ import de.assecutor.votianlt.model.CargoItem; import de.assecutor.votianlt.model.Job; import de.assecutor.votianlt.model.JobStatus; import de.assecutor.votianlt.model.task.BaseTask; -import de.assecutor.votianlt.mqtt.MqttPublisher; +import de.assecutor.votianlt.messaging.MessagingPublisher; import de.assecutor.votianlt.repository.JobRepository; import de.assecutor.votianlt.repository.TaskRepository; import de.assecutor.votianlt.security.SecurityService; @@ -36,7 +36,7 @@ public class AddJobService { private final JobHistoryService jobHistoryService; private final EmailService emailService; private final ClientConnectionService clientConnectionService; - private final MqttPublisher mqttPublisher; + private final MessagingPublisher messagingPublisher; /** * Speichert einen neuen Auftrag samt CargoItems und Tasks @@ -123,7 +123,7 @@ public class AddJobService { e.getMessage()); } - // MQTT-Benachrichtigung an Client senden, wenn online + // Benachrichtigung an Client senden, wenn online notifyClientJobCreated(savedJob); log.info("Auftrag erfolgreich gespeichert: {}", savedJob.getJobNumber()); @@ -182,7 +182,7 @@ public class AddJobService { } /** - * Sendet den neu erstellten Job per MQTT an den zugewiesenen Client, falls dieser + * Sendet den neu erstellten Job per WebSocket an den zugewiesenen Client, falls dieser * online ist. */ private void notifyClientJobCreated(Job job) { @@ -208,10 +208,9 @@ public class AddJobService { // Erstelle DTO mit allen Daten JobWithRelatedDataDTO jobData = new JobWithRelatedDataDTO(job, cargoItems, tasks); - String topic = "/client/" + appUserId + "/job_created"; - log.info("[JOB] Sending job_created to {}: jobId={}, jobNumber={}", topic, job.getId().toHexString(), + log.info("[JOB] Sending job_created to {}: jobId={}, jobNumber={}", appUserId, job.getId().toHexString(), job.getJobNumber()); - mqttPublisher.publishAsJson(topic, jobData, false); + messagingPublisher.publishAsJson(appUserId, "job_created", jobData); } catch (Exception e) { log.warn("[JOB] Failed to send job_created notification: {}", e.getMessage()); } diff --git a/src/main/java/de/assecutor/votianlt/pages/view/AdminDashboardView.java b/src/main/java/de/assecutor/votianlt/pages/view/AdminDashboardView.java index 1032ec3..3f6f27e 100644 --- a/src/main/java/de/assecutor/votianlt/pages/view/AdminDashboardView.java +++ b/src/main/java/de/assecutor/votianlt/pages/view/AdminDashboardView.java @@ -40,7 +40,6 @@ public class AdminDashboardView extends Main { private final BarcodeRepository barcodeRepository; private final SignatureRepository signatureRepository; private final CommentRepository commentRepository; - private final PendingMqttMessageRepository pendingMqttMessageRepository; private final Div statisticsContainer; @@ -48,8 +47,7 @@ public class AdminDashboardView extends Main { public AdminDashboardView(JobRepository jobRepository, TaskRepository taskRepository, UserRepository userRepository, AppUserRepository appUserRepository, CargoItemRepository cargoItemRepository, PhotoRepository photoRepository, BarcodeRepository barcodeRepository, - SignatureRepository signatureRepository, CommentRepository commentRepository, - PendingMqttMessageRepository pendingMqttMessageRepository) { + SignatureRepository signatureRepository, CommentRepository commentRepository) { this.jobRepository = jobRepository; this.taskRepository = taskRepository; @@ -60,7 +58,6 @@ public class AdminDashboardView extends Main { this.barcodeRepository = barcodeRepository; this.signatureRepository = signatureRepository; this.commentRepository = commentRepository; - this.pendingMqttMessageRepository = pendingMqttMessageRepository; setSizeFull(); addClassNames(LumoUtility.BoxSizing.BORDER, LumoUtility.Display.FLEX, LumoUtility.FlexDirection.COLUMN, @@ -291,11 +288,8 @@ public class AdminDashboardView extends Main { cards.add(createStatCard("Datenbank", "Fehler", VaadinIcon.DATABASE, "red")); } - // Pending MQTT messages - long pendingMqttMessages = pendingMqttMessageRepository.count(); - String mqttStatus = pendingMqttMessages == 0 ? "OK" : "Warteschlange: " + pendingMqttMessages; - String mqttColor = pendingMqttMessages == 0 ? "green" : "orange"; - cards.add(createStatCard("MQTT", mqttStatus, VaadinIcon.CONNECT, mqttColor)); + // Messaging status + cards.add(createStatCard("WebSocket", "Aktiv", VaadinIcon.CONNECT, "green")); // System uptime (placeholder) cards.add(createStatCard("Anwendung", "Läuft", VaadinIcon.HEART, "green")); diff --git a/src/main/java/de/assecutor/votianlt/pages/view/ShowJobsView.java b/src/main/java/de/assecutor/votianlt/pages/view/ShowJobsView.java index 5061975..03cfabb 100644 --- a/src/main/java/de/assecutor/votianlt/pages/view/ShowJobsView.java +++ b/src/main/java/de/assecutor/votianlt/pages/view/ShowJobsView.java @@ -20,7 +20,7 @@ import com.vaadin.flow.router.PageTitle; import com.vaadin.flow.router.Route; import de.assecutor.votianlt.model.Job; import de.assecutor.votianlt.model.JobStatus; -import de.assecutor.votianlt.mqtt.MqttPublisher; +import de.assecutor.votianlt.messaging.MessagingPublisher; import de.assecutor.votianlt.util.DateTimeFormatUtil; import de.assecutor.votianlt.repository.JobRepository; import de.assecutor.votianlt.security.SecurityService; @@ -47,18 +47,18 @@ public class ShowJobsView extends VerticalLayout { private final JobHistoryService jobHistoryService; private final SecurityService securityService; private final ClientConnectionService clientConnectionService; - private final MqttPublisher mqttPublisher; + private final MessagingPublisher messagingPublisher; private final Grid grid = new Grid<>(Job.class, false); @Autowired public ShowJobsView(JobRepository jobRepository, JobHistoryService jobHistoryService, SecurityService securityService, ClientConnectionService clientConnectionService, - MqttPublisher mqttPublisher) { + MessagingPublisher messagingPublisher) { this.jobRepository = jobRepository; this.jobHistoryService = jobHistoryService; this.securityService = securityService; this.clientConnectionService = clientConnectionService; - this.mqttPublisher = mqttPublisher; + this.messagingPublisher = messagingPublisher; setSizeFull(); setPadding(true); setSpacing(true); @@ -230,9 +230,8 @@ public class ShowJobsView extends VerticalLayout { "jobNumber", job.getJobNumber() != null ? job.getJobNumber() : "", "deletedAt", LocalDateTime.now().toString()); - String topic = "/client/" + appUserId + "/job_deleted"; - log.info("[JOB] Sending job_deleted to {}: {}", topic, payload); - mqttPublisher.publishAsJson(topic, payload, false); + log.info("[JOB] Sending job_deleted to {}: {}", appUserId, payload); + messagingPublisher.publishAsJson(appUserId, "job_deleted", payload); } private void loadData() { diff --git a/src/main/java/de/assecutor/votianlt/repository/PendingDeliveryRepository.java b/src/main/java/de/assecutor/votianlt/repository/PendingDeliveryRepository.java deleted file mode 100644 index 5310f6c..0000000 --- a/src/main/java/de/assecutor/votianlt/repository/PendingDeliveryRepository.java +++ /dev/null @@ -1,69 +0,0 @@ -package de.assecutor.votianlt.repository; - -import de.assecutor.votianlt.messaging.model.DeliveryStatus; -import de.assecutor.votianlt.messaging.model.PendingDelivery; -import org.bson.types.ObjectId; -import org.springframework.data.mongodb.repository.MongoRepository; -import org.springframework.stereotype.Repository; - -import java.time.LocalDateTime; -import java.util.List; -import java.util.Optional; - -/** - * Repository for PendingDelivery entities. - */ -@Repository -public interface PendingDeliveryRepository extends MongoRepository { - - /** - * Find pending delivery by message ID - */ - Optional findByMessageId(String messageId); - - /** - * Find all deliveries with a specific status - */ - List findByStatus(DeliveryStatus status); - - /** - * Find deliveries ready for retry (status = SENT and nextRetryAt is in the - * past) - */ - List findByStatusAndNextRetryAtBefore(DeliveryStatus status, LocalDateTime dateTime); - - /** - * Find acknowledged deliveries older than specified time - */ - List findByStatusAndAcknowledgedAtBefore(DeliveryStatus status, LocalDateTime dateTime); - - /** - * Find deliveries with specific statuses that have expired - */ - List findByStatusInAndExpiresAtBefore(List statuses, LocalDateTime dateTime); - - /** - * Find deliveries with specific statuses - */ - List findByStatusIn(List statuses); - - /** - * Find all deliveries for a specific client - */ - List findByClientId(String clientId); - - /** - * Find all deliveries for a specific topic - */ - List findByTopic(String topic); - - /** - * Count deliveries by status - */ - long countByStatus(DeliveryStatus status); - - /** - * Delete deliveries older than specified time - */ - void deleteByCreatedAtBefore(LocalDateTime dateTime); -} diff --git a/src/main/java/de/assecutor/votianlt/repository/PendingMqttMessageRepository.java b/src/main/java/de/assecutor/votianlt/repository/PendingMqttMessageRepository.java deleted file mode 100644 index bed2c28..0000000 --- a/src/main/java/de/assecutor/votianlt/repository/PendingMqttMessageRepository.java +++ /dev/null @@ -1,33 +0,0 @@ -package de.assecutor.votianlt.repository; - -import de.assecutor.votianlt.model.PendingMqttMessage; -import org.bson.types.ObjectId; -import org.springframework.data.mongodb.repository.MongoRepository; -import org.springframework.stereotype.Repository; - -import java.time.LocalDateTime; -import java.util.List; - -@Repository -public interface PendingMqttMessageRepository extends MongoRepository { - - /** - * Find all pending messages ordered by creation time (oldest first) - */ - List findAllByOrderByCreatedAtAsc(); - - /** - * Find messages that haven't been retried for a while (for cleanup) - */ - List findByLastRetryAtBeforeOrLastRetryAtIsNull(LocalDateTime before); - - /** - * Count pending messages - */ - long count(); - - /** - * Delete messages older than specified date (for cleanup) - */ - void deleteByCreatedAtBefore(LocalDateTime before); -} \ No newline at end of file diff --git a/src/main/java/de/assecutor/votianlt/security/SecurityConfig.java b/src/main/java/de/assecutor/votianlt/security/SecurityConfig.java index 43fb777..1724e80 100644 --- a/src/main/java/de/assecutor/votianlt/security/SecurityConfig.java +++ b/src/main/java/de/assecutor/votianlt/security/SecurityConfig.java @@ -27,11 +27,13 @@ public class SecurityConfig extends VaadinWebSecurity { new AntPathRequestMatcher("/frontend/**"), new AntPathRequestMatcher("/webjars/**"), new AntPathRequestMatcher("/h2-console/**"), new AntPathRequestMatcher("/frontend-es5/**", "/frontend-es6/**"), - new AntPathRequestMatcher("/mcp/**")) + new AntPathRequestMatcher("/mcp/**"), + new AntPathRequestMatcher("/ws/**")) .permitAll()); // Standard-CSRF-Konfiguration - http.csrf(csrf -> csrf.ignoringRequestMatchers(new AntPathRequestMatcher("/h2-console/**"))); + http.csrf(csrf -> csrf.ignoringRequestMatchers(new AntPathRequestMatcher("/h2-console/**"), + new AntPathRequestMatcher("/ws/**"))); // Delegiere die Basis-Konfiguration an VaadinWebSecurity // Dies fügt automatisch .anyRequest().authenticated() hinzu diff --git a/src/main/java/de/assecutor/votianlt/service/ClientConnectionService.java b/src/main/java/de/assecutor/votianlt/service/ClientConnectionService.java index 7d1c79a..9e8065c 100644 --- a/src/main/java/de/assecutor/votianlt/service/ClientConnectionService.java +++ b/src/main/java/de/assecutor/votianlt/service/ClientConnectionService.java @@ -1,248 +1,46 @@ package de.assecutor.votianlt.service; -import com.fasterxml.jackson.databind.ObjectMapper; -import de.assecutor.votianlt.messaging.delivery.MessageDeliveryService; -import de.assecutor.votianlt.messaging.plugin.PluginManager; -import de.assecutor.votianlt.messaging.plugin.SendOptions; +import de.assecutor.votianlt.messaging.WebSocketService; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Lazy; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; -import java.nio.charset.StandardCharsets; -import java.time.Instant; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - /** - * Service for managing client connections via Ping/Pong mechanism. Tracks - * connected clients and periodically checks their connectivity. + * Service for managing client connections. Connection state is determined + * directly from the WebSocket session lifecycle. */ @Service @Slf4j public class ClientConnectionService { - /** - * Represents the connection state of a client. - */ - public record ClientState(String clientId, String userId, boolean connected, Instant lastPingSent, - Instant lastPongReceived, Instant connectedAt) { - public ClientState withPingSent(Instant pingSent) { - return new ClientState(clientId, userId, connected, pingSent, lastPongReceived, connectedAt); - } + private final WebSocketService webSocketService; - public ClientState withPongReceived(Instant pongReceived) { - return new ClientState(clientId, userId, true, lastPingSent, pongReceived, connectedAt); - } - - public ClientState withConnected(boolean isConnected) { - return new ClientState(clientId, userId, isConnected, lastPingSent, lastPongReceived, connectedAt); - } - } - - private final Map connectedClients = new ConcurrentHashMap<>(); - private final PluginManager pluginManager; - private final ObjectMapper objectMapper; - private final MessageDeliveryService messageDeliveryService; - - @Value("${app.client.ping.interval-seconds:15}") - private int pingIntervalSeconds; - - @Value("${app.client.ping.timeout-seconds:5}") - private int pingTimeoutSeconds; - - public ClientConnectionService(PluginManager pluginManager, ObjectMapper objectMapper, - @Lazy MessageDeliveryService messageDeliveryService) { - this.pluginManager = pluginManager; - this.objectMapper = objectMapper; - this.messageDeliveryService = messageDeliveryService; + public ClientConnectionService(WebSocketService webSocketService) { + this.webSocketService = webSocketService; } /** - * Registers a client as connected after successful login. + * Called after successful login. * - * @param clientId - * The unique client identifier - * @param userId - * The user ID associated with this client + * @param appUserId + * The app user ID (MongoDB ObjectId) */ - public void registerClient(String clientId, String userId) { - if (clientId == null || clientId.isBlank()) { + public void registerClient(String appUserId) { + if (appUserId == null || appUserId.isBlank()) { return; } - ClientState previousState = connectedClients.get(clientId); - boolean wasDisconnected = previousState != null && !previousState.connected(); - - Instant now = Instant.now(); - ClientState state = new ClientState(clientId, userId, true, null, now, now); - connectedClients.put(clientId, state); - log.info("[CLIENT] Connected: {}", clientId); - - // If client was previously disconnected, retry pending messages - if (wasDisconnected) { - messageDeliveryService.retryPendingDeliveriesForClient(clientId); - } + log.info("[CLIENT] Registered: {}", appUserId); } /** - * Unregisters a client (e.g., on explicit logout). + * Checks if a client is currently connected via WebSocket. * - * @param clientId - * The client identifier to unregister + * @param appUserId + * The app user ID + * @return true if the client has an active WebSocket session */ - public void unregisterClient(String clientId) { - ClientState removed = connectedClients.remove(clientId); - if (removed != null) { - log.info("[CLIENT] Disconnected: {}", clientId); - } - } - - /** - * Handles a pong response from a client. Searches by both clientId and userId - * since pong is sent to /server/{userId}/pong. - * - * @param id - * The client or user identifier that sent the pong - */ - public void handlePong(String id) { - if (id == null || id.isBlank()) { - return; - } - - // First try direct lookup by clientId - String clientId = id; - ClientState state = connectedClients.get(id); - - // If not found, search by userId - if (state == null) { - for (Map.Entry entry : connectedClients.entrySet()) { - if (id.equals(entry.getValue().userId())) { - clientId = entry.getKey(); - state = entry.getValue(); - break; - } - } - } - - if (state != null) { - log.info("[PONG] Received from {}", clientId); - boolean wasDisconnected = !state.connected(); - ClientState updatedState = state.withPongReceived(Instant.now()); - connectedClients.put(clientId, updatedState); - - // If client was disconnected and is now reconnected, retry pending messages - if (wasDisconnected) { - messageDeliveryService.retryPendingDeliveriesForClient(clientId); - } - } - } - - /** - * Checks if a client is currently connected. Searches by both clientId and - * userId. - * - * @param id - * The client or user identifier - * @return true if the client is connected - */ - public boolean isClientConnected(String id) { - if (id == null || id.isBlank()) { - return false; - } - // First try direct lookup by clientId - ClientState state = connectedClients.get(id); - if (state != null && state.connected()) { - return true; - } - // Then search by userId - return connectedClients.values().stream().anyMatch(s -> s.connected() && id.equals(s.userId())); - } - - /** - * Gets all connected client IDs. - * - * @return Set of connected client IDs - */ - public Set getConnectedClientIds() { - return connectedClients.entrySet().stream().filter(e -> e.getValue().connected()).map(Map.Entry::getKey) - .collect(java.util.stream.Collectors.toSet()); - } - - /** - * Gets the connection state for a specific client. - * - * @param clientId - * The client identifier - * @return ClientState or null if not found - */ - public ClientState getClientState(String clientId) { - return connectedClients.get(clientId); - } - - /** - * Scheduled task to send pings to all connected clients. Runs based on the - * configured interval (app.client.ping.interval-seconds). - */ - @Scheduled(fixedRateString = "${app.client.ping.interval-seconds:15}000") - public void sendPingsToAllClients() { - if (!pluginManager.isConnected() || getConnectedClientCount() == 0) { - return; - } - - Instant now = Instant.now(); - - for (Map.Entry entry : connectedClients.entrySet()) { - String clientId = entry.getKey(); - ClientState state = entry.getValue(); - - // Check if previous ping timed out - if (state.lastPingSent() != null && state.connected()) { - Instant expectedPongBy = state.lastPingSent().plusSeconds(pingTimeoutSeconds); - boolean pongReceivedAfterPing = state.lastPongReceived() != null - && state.lastPongReceived().isAfter(state.lastPingSent()); - - if (now.isAfter(expectedPongBy) && !pongReceivedAfterPing) { - // Client did not respond in time - mark as disconnected - ClientState disconnectedState = state.withConnected(false); - connectedClients.put(clientId, disconnectedState); - log.info("[CLIENT] Timeout: {}", clientId); - continue; - } - } - - // Send ping to connected clients (use userId for topic) - if (state.connected() && state.userId() != null) { - sendPing(state.userId()); - ClientState updatedState = state.withPingSent(now); - connectedClients.put(clientId, updatedState); - } - } - } - - /** - * Sends a ping message to a specific user. - * - * @param userId - * The target user ID (MongoDB ObjectId) - */ - private void sendPing(String userId) { - try { - Map pingPayload = Map.of("type", "ping", "timestamp", Instant.now().toEpochMilli()); - - String json = objectMapper.writeValueAsString(pingPayload); - byte[] payload = json.getBytes(StandardCharsets.UTF_8); - - SendOptions options = SendOptions.builder().qos(1).retained(false).build(); - - log.info("[PING] Sent to {}", userId); - pluginManager.sendToClient(userId, "ping", payload, options); - - } catch (Exception e) { - log.error("[PING] Error sending to {}: {}", userId, e.getMessage()); - } + public boolean isClientConnected(String appUserId) { + return webSocketService.isClientConnected(appUserId); } /** @@ -251,15 +49,6 @@ public class ClientConnectionService { * @return Number of connected clients */ public int getConnectedClientCount() { - return (int) connectedClients.values().stream().filter(ClientState::connected).count(); - } - - /** - * Gets the total number of registered clients (connected and disconnected). - * - * @return Total number of registered clients - */ - public int getTotalClientCount() { - return connectedClients.size(); + return webSocketService.getConnectedClientCount(); } } diff --git a/src/main/java/de/assecutor/votianlt/service/MessageService.java b/src/main/java/de/assecutor/votianlt/service/MessageService.java index d14d344..364d001 100644 --- a/src/main/java/de/assecutor/votianlt/service/MessageService.java +++ b/src/main/java/de/assecutor/votianlt/service/MessageService.java @@ -8,7 +8,7 @@ import de.assecutor.votianlt.model.MessageType; import de.assecutor.votianlt.dto.ChatMessageInboundPayload; import de.assecutor.votianlt.dto.ChatMessageOutboundPayload; import de.assecutor.votianlt.event.MessageReceivedEvent; -import de.assecutor.votianlt.mqtt.MqttPublisher; +import de.assecutor.votianlt.messaging.MessagingPublisher; import de.assecutor.votianlt.repository.JobRepository; import de.assecutor.votianlt.repository.MessageRepository; import lombok.extern.slf4j.Slf4j; @@ -26,14 +26,14 @@ public class MessageService { private final MessageRepository messageRepository; private final JobRepository jobRepository; - private final MqttPublisher mqttPublisher; + private final MessagingPublisher messagingPublisher; private final ApplicationEventPublisher eventPublisher; - public MessageService(MessageRepository messageRepository, JobRepository jobRepository, MqttPublisher mqttPublisher, + public MessageService(MessageRepository messageRepository, JobRepository jobRepository, MessagingPublisher messagingPublisher, ApplicationEventPublisher eventPublisher) { this.messageRepository = messageRepository; this.jobRepository = jobRepository; - this.mqttPublisher = mqttPublisher; + this.messagingPublisher = messagingPublisher; this.eventPublisher = eventPublisher; } @@ -45,7 +45,7 @@ public class MessageService { } /** - * Send a general message to a client via MQTT + * Send a general message to a client via WebSocket * * @param content * Message content @@ -59,12 +59,12 @@ public class MessageService { public Message sendGeneralMessageToClient(String content, String receiver, MessageContentType contentType) { Message message = new Message(content, receiver, MessageOrigin.SERVER, contentType); message = saveMessage(message); - publishMessageToMqtt(message, receiver); + publishMessage(message, receiver); return message; } /** - * Send a job-related message to a client via MQTT + * Send a job-related message to a client via WebSocket * * @param content * Message content @@ -85,7 +85,7 @@ public class MessageService { Message message = new Message(content, receiver, MessageOrigin.SERVER, contentType, context.jobId(), context.jobNumber()); message = saveMessage(message); - publishMessageToMqtt(message, receiver); + publishMessage(message, receiver); return message; } @@ -111,15 +111,14 @@ public class MessageService { } /** - * Publish message to MQTT topic for the receiver + * Publish message to topic for the receiver */ - private void publishMessageToMqtt(Message message, String receiver) { + private void publishMessage(Message message, String receiver) { try { - String topic = "/client/" + receiver + "/message"; ChatMessageOutboundPayload payload = ChatMessageOutboundPayload.fromMessage(message); - mqttPublisher.publishAsJson(topic, payload, false); + messagingPublisher.publishAsJson(receiver, "message", payload); } catch (Exception e) { - log.error("[MQTT] Error publishing message: {}", e.getMessage()); + log.error("[Messaging] Error publishing message: {}", e.getMessage()); } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 1c53cd3..82fcbcf 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -59,45 +59,11 @@ spring.jackson.default-property-inclusion=non_null # 2FA Configuration (global toggle - individual users can disable in their profile) app.security.two-factor.enabled=true -# Message Delivery Layer Configuration -app.messaging.delivery.max-retries=3 -app.messaging.delivery.retry-initial-delay=5s -app.messaging.delivery.retry-max-delay=5m -app.messaging.delivery.retry-backoff-multiplier=2.0 -app.messaging.delivery.ack-timeout=30s -app.messaging.delivery.message-expiry=24h -app.messaging.delivery.cleanup-interval-minutes=60 -app.messaging.delivery.retry-interval-seconds=30 -app.messaging.delivery.acknowledged-retention-days=7 -app.messaging.delivery.ack-retry-interval-seconds=5 -app.messaging.delivery.ack-max-retries=4 - -# Messaging Plugin Configuration -app.messaging.plugin.type=mqtt -app.messaging.plugin.mqtt.broker.host=mqtt-2.assecutor.de -app.messaging.plugin.mqtt.broker.port=42099 -app.messaging.plugin.mqtt.username=app -app.messaging.plugin.mqtt.password=apppwd -app.messaging.plugin.mqtt.client.id=votianlt-server - -# Client Connection Monitoring (Ping/Pong) -# Server sends ping to: /client/{clientId}/ping -# Client responds to: /server/{clientId}/pong -# -# Ping JSON (Server -> Client): -# { -# "type": "ping", -# "timestamp": 1702835000000 -# } -# -# Pong JSON (Client -> Server): -# { -# "type": "pong", -# "timestamp": 1702835000000 -# } -# -app.client.ping.interval-seconds=15 -app.client.ping.timeout-seconds=5 +# WebSocket Configuration +app.messaging.websocket.path=/ws/messaging +app.messaging.websocket.max-text-message-size=65536 +app.messaging.websocket.max-session-idle-timeout=300000 +app.messaging.websocket.allowed-origins=* # Application Version - automatically set from pom.xml during build app.version=@project.version@