From dff69aa4f9dc969abe9cbac08a34fea6962e5590 Mon Sep 17 00:00:00 2001 From: Sven Carstensen Date: Tue, 10 Feb 2026 11:57:04 +0100 Subject: [PATCH] Erweiterungen --- .../votianlt/messaging/MessagingConfig.java | 52 +++++-- .../votianlt/model/LocationPosition.java | 92 +++++++++++ .../de/assecutor/votianlt/model/Message.java | 29 ++++ .../votianlt/model/MessageDeliveryStatus.java | 25 +++ .../votianlt/pages/view/JobSummaryView.java | 7 +- .../LocationPositionRepository.java | 41 +++++ .../repository/MessageRepository.java | 14 ++ .../service/ClientConnectionService.java | 89 ++++++++++- .../votianlt/service/LocationService.java | 144 ++++++++++++++++++ .../votianlt/service/MessageService.java | 96 +++++++++++- 10 files changed, 567 insertions(+), 22 deletions(-) create mode 100644 src/main/java/de/assecutor/votianlt/model/LocationPosition.java create mode 100644 src/main/java/de/assecutor/votianlt/model/MessageDeliveryStatus.java create mode 100644 src/main/java/de/assecutor/votianlt/repository/LocationPositionRepository.java create mode 100644 src/main/java/de/assecutor/votianlt/service/LocationService.java diff --git a/src/main/java/de/assecutor/votianlt/messaging/MessagingConfig.java b/src/main/java/de/assecutor/votianlt/messaging/MessagingConfig.java index 2343c7d..7c4d80e 100644 --- a/src/main/java/de/assecutor/votianlt/messaging/MessagingConfig.java +++ b/src/main/java/de/assecutor/votianlt/messaging/MessagingConfig.java @@ -4,6 +4,7 @@ import de.assecutor.votianlt.controller.MessageController; import de.assecutor.votianlt.dto.AppLoginRequest; import de.assecutor.votianlt.dto.AppLoginResponse; import de.assecutor.votianlt.service.ClientConnectionService; +import de.assecutor.votianlt.service.LocationService; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.annotation.Configuration; @@ -39,8 +40,9 @@ public class MessagingConfig { MessageController messageController = event.getApplicationContext().getBean(MessageController.class); ClientConnectionService clientConnectionService = event.getApplicationContext() .getBean(ClientConnectionService.class); + LocationService locationService = event.getApplicationContext().getBean(LocationService.class); - setupSubscriptions(messageController, clientConnectionService); + setupSubscriptions(messageController, clientConnectionService, locationService); log.info("[Messaging] Message routing configured"); @@ -54,7 +56,7 @@ public class MessagingConfig { * Setup message subscriptions on the WebSocket service. */ private void setupSubscriptions(MessageController messageController, - ClientConnectionService clientConnectionService) { + ClientConnectionService clientConnectionService, LocationService locationService) { // Login handler: authenticate and register session webSocketService.registerMessageHandler("login", (wsSessionId, payload) -> { handleLoginMessage(wsSessionId, payload, messageController, clientConnectionService); @@ -68,17 +70,27 @@ public class MessagingConfig { }); }); - // Jobs assigned handler - webSocketService.registerMessageHandler("jobs/assigned", (appUserId, payload) -> { - messageController.handleGetAssignedJobs(appUserId); - }); - // Chat message handler webSocketService.registerMessageHandler("message", (appUserId, payload) -> { handlePayload(payload, payloadMap -> { messageController.handleIncomingMessage(appUserId, payloadMap); }); }); + + // Buffer flushed handler - client is ready to receive pending messages + webSocketService.registerMessageHandler("buffer_flushed", (appUserId, payload) -> { + handlePayload(payload, payloadMap -> { + int messageCount = extractMessageCount(payloadMap); + clientConnectionService.onBufferFlushed(appUserId, messageCount); + }); + }); + + // Location handler - client sends position updates + webSocketService.registerMessageHandler("location", (appUserId, payload) -> { + handlePayload(payload, payloadMap -> { + locationService.savePosition(appUserId, payloadMap); + }); + }); } /** @@ -98,12 +110,19 @@ public class MessagingConfig { if (response.isSuccess()) { String appUserId = response.getAppUserId(); webSocketService.registerAuthenticatedSession(wsSessionId, appUserId); - clientConnectionService.registerClient(appUserId); // Send success response to the now-authenticated session - Map authResponse = Map.of("success", true, "message", response.getMessage()); + // locationTrackingEnabled: true = client should send position updates + Map authResponse = Map.of( + "success", true, + "message", response.getMessage(), + "locationTrackingEnabled", true); byte[] responseBytes = objectMapper.writeValueAsBytes(authResponse); webSocketService.sendToClient(appUserId, "auth", responseBytes); + + // Register client - pending messages and jobs will be sent after + // client confirms buffer_flushed + clientConnectionService.registerClient(appUserId); } else { // Send failure response to the pending session Map authResponse = Map.of("success", false, "message", response.getMessage()); @@ -115,6 +134,21 @@ public class MessagingConfig { } } + /** + * Extract message count from buffer_flushed payload. + */ + private int extractMessageCount(Map payloadMap) { + try { + Object countObj = payloadMap.get("messageCount"); + if (countObj instanceof Number) { + return ((Number) countObj).intValue(); + } + return 0; + } catch (Exception e) { + return 0; + } + } + /** * Parse payload bytes to a Map and pass to the consumer. */ diff --git a/src/main/java/de/assecutor/votianlt/model/LocationPosition.java b/src/main/java/de/assecutor/votianlt/model/LocationPosition.java new file mode 100644 index 0000000..03e8803 --- /dev/null +++ b/src/main/java/de/assecutor/votianlt/model/LocationPosition.java @@ -0,0 +1,92 @@ +package de.assecutor.votianlt.model; + +import lombok.Data; +import lombok.NoArgsConstructor; +import org.bson.types.ObjectId; +import org.springframework.data.annotation.Id; +import org.springframework.data.mongodb.core.index.Indexed; +import org.springframework.data.mongodb.core.mapping.Document; +import org.springframework.data.mongodb.core.mapping.Field; + +import java.time.Instant; + +/** + * Represents a GPS position reported by a mobile client. + */ +@Data +@NoArgsConstructor +@Document(collection = "location_positions") +public class LocationPosition { + + @Id + private ObjectId id; + + /** + * AppUser ID (clientId) - the user who sent this position + */ + @Field("app_user_id") + @Indexed + private String appUserId; + + /** + * Latitude in decimal degrees + */ + @Field("latitude") + private Double latitude; + + /** + * Longitude in decimal degrees + */ + @Field("longitude") + private Double longitude; + + /** + * Accuracy of the position in meters + */ + @Field("accuracy") + private Double accuracy; + + /** + * Altitude in meters above sea level (optional) + */ + @Field("altitude") + private Double altitude; + + /** + * Speed in meters per second (optional) + */ + @Field("speed") + private Double speed; + + /** + * Heading in degrees (0-360) (optional) + */ + @Field("heading") + private Double heading; + + /** + * Timestamp when the position was reported (from client) + */ + @Field("timestamp") + private Instant timestamp; + + /** + * Timestamp when the position was received by the server + */ + @Field("received_at") + @Indexed(expireAfterSeconds = 3600) // TTL index: auto-delete after 60 minutes + private Instant receivedAt; + + public LocationPosition(String appUserId, Double latitude, Double longitude, Double accuracy, + Double altitude, Double speed, Double heading, Instant timestamp) { + this.appUserId = appUserId; + this.latitude = latitude; + this.longitude = longitude; + this.accuracy = accuracy; + this.altitude = altitude; + this.speed = speed; + this.heading = heading; + this.timestamp = timestamp; + this.receivedAt = Instant.now(); + } +} diff --git a/src/main/java/de/assecutor/votianlt/model/Message.java b/src/main/java/de/assecutor/votianlt/model/Message.java index ee8f651..1f5da37 100644 --- a/src/main/java/de/assecutor/votianlt/model/Message.java +++ b/src/main/java/de/assecutor/votianlt/model/Message.java @@ -85,6 +85,12 @@ public class Message { @Field("read_at") private LocalDateTime readAt; + /** + * Delivery status: NOTSEND (failed to deliver), SEND (successfully delivered) + */ + @Field("delivery_status") + private MessageDeliveryStatus deliveryStatus; + /** * Constructor for general messages */ @@ -126,6 +132,27 @@ public class Message { this.readAt = LocalDateTime.now(); } + /** + * Mark the message as sent (successfully delivered) + */ + public void markAsSent() { + this.deliveryStatus = MessageDeliveryStatus.SEND; + } + + /** + * Mark the message as not sent (delivery failed) + */ + public void markAsNotSent() { + this.deliveryStatus = MessageDeliveryStatus.NOTSEND; + } + + /** + * Check if message was successfully delivered + */ + public boolean isDelivered() { + return deliveryStatus == MessageDeliveryStatus.SEND; + } + /** * Returns the ObjectId as string for JSON serialization */ @@ -157,5 +184,7 @@ public class Message { this.createdAt = LocalDateTime.now(); this.isRead = false; this.contentType = contentType != null ? contentType : MessageContentType.TEXT; + // Server messages start as NOTSEND until confirmed delivered + this.deliveryStatus = (origin == MessageOrigin.SERVER) ? MessageDeliveryStatus.NOTSEND : null; } } diff --git a/src/main/java/de/assecutor/votianlt/model/MessageDeliveryStatus.java b/src/main/java/de/assecutor/votianlt/model/MessageDeliveryStatus.java new file mode 100644 index 0000000..c03541c --- /dev/null +++ b/src/main/java/de/assecutor/votianlt/model/MessageDeliveryStatus.java @@ -0,0 +1,25 @@ +package de.assecutor.votianlt.model; + +/** + * Delivery status for messages sent to clients. + * Tracks whether a message was successfully delivered via WebSocket. + */ +public enum MessageDeliveryStatus { + NOTSEND("Nicht gesendet"), + SEND("Gesendet"); + + private final String displayName; + + MessageDeliveryStatus(String displayName) { + this.displayName = displayName; + } + + public String getDisplayName() { + return displayName; + } + + @Override + public String toString() { + return displayName; + } +} diff --git a/src/main/java/de/assecutor/votianlt/pages/view/JobSummaryView.java b/src/main/java/de/assecutor/votianlt/pages/view/JobSummaryView.java index 229acac..202db8c 100644 --- a/src/main/java/de/assecutor/votianlt/pages/view/JobSummaryView.java +++ b/src/main/java/de/assecutor/votianlt/pages/view/JobSummaryView.java @@ -377,9 +377,10 @@ public class JobSummaryView extends Main implements HasUrlParameter { } private String dimString(CargoItem ci) { - String len = ci.getLengthMm() != null ? ci.getLengthMm().intValue() + " mm" : ""; - String wid = ci.getWidthMm() != null ? ci.getWidthMm().intValue() + " mm" : ""; - String hei = ci.getHeightMm() != null ? ci.getHeightMm().intValue() + " mm" : ""; + // Values are stored in cm (not mm), so display directly without division + String len = ci.getLengthMm() != null ? ci.getLengthMm().intValue() + " cm" : ""; + String wid = ci.getWidthMm() != null ? ci.getWidthMm().intValue() + " cm" : ""; + String hei = ci.getHeightMm() != null ? ci.getHeightMm().intValue() + " cm" : ""; String combined = String.join(" x ", java.util.stream.Stream.of(len, wid, hei).filter(s -> !s.isBlank()).toList()); return combined.isBlank() ? "" : combined; diff --git a/src/main/java/de/assecutor/votianlt/repository/LocationPositionRepository.java b/src/main/java/de/assecutor/votianlt/repository/LocationPositionRepository.java new file mode 100644 index 0000000..cae40ab --- /dev/null +++ b/src/main/java/de/assecutor/votianlt/repository/LocationPositionRepository.java @@ -0,0 +1,41 @@ +package de.assecutor.votianlt.repository; + +import de.assecutor.votianlt.model.LocationPosition; +import org.bson.types.ObjectId; +import org.springframework.data.mongodb.repository.MongoRepository; +import org.springframework.stereotype.Repository; + +import java.time.Instant; +import java.util.List; + +/** + * Repository for location positions reported by mobile clients. + */ +@Repository +public interface LocationPositionRepository extends MongoRepository { + + /** + * Find all positions for a specific app user, ordered by timestamp descending + */ + List findByAppUserIdOrderByTimestampDesc(String appUserId); + + /** + * Find all positions received after a specific time + */ + List findByReceivedAtAfterOrderByTimestampDesc(Instant receivedAt); + + /** + * Find all positions for a user received after a specific time + */ + List findByAppUserIdAndReceivedAtAfterOrderByTimestampDesc(String appUserId, Instant receivedAt); + + /** + * Delete all positions older than the specified timestamp + */ + void deleteByReceivedAtBefore(Instant cutoff); + + /** + * Find latest position for a specific app user + */ + List findTop1ByAppUserIdOrderByTimestampDesc(String appUserId); +} diff --git a/src/main/java/de/assecutor/votianlt/repository/MessageRepository.java b/src/main/java/de/assecutor/votianlt/repository/MessageRepository.java index 3727993..5b6d2f3 100644 --- a/src/main/java/de/assecutor/votianlt/repository/MessageRepository.java +++ b/src/main/java/de/assecutor/votianlt/repository/MessageRepository.java @@ -1,6 +1,7 @@ package de.assecutor.votianlt.repository; import de.assecutor.votianlt.model.Message; +import de.assecutor.votianlt.model.MessageDeliveryStatus; import de.assecutor.votianlt.model.MessageOrigin; import de.assecutor.votianlt.model.MessageType; import org.bson.types.ObjectId; @@ -58,4 +59,17 @@ public interface MessageRepository extends MongoRepository { * Find all messages (for admin/overview), ordered by creation time descending */ List findAllByOrderByCreatedAtDesc(); + + /** + * Find all messages with a specific delivery status for a receiver + */ + List findByReceiverAndDeliveryStatusOrderByCreatedAtAsc(String receiver, + MessageDeliveryStatus deliveryStatus); + + /** + * Find all undelivered messages (NOTSEND status) for a receiver + */ + default List findUndeliveredMessagesForReceiver(String receiver) { + return findByReceiverAndDeliveryStatusOrderByCreatedAtAsc(receiver, MessageDeliveryStatus.NOTSEND); + } } diff --git a/src/main/java/de/assecutor/votianlt/service/ClientConnectionService.java b/src/main/java/de/assecutor/votianlt/service/ClientConnectionService.java index 9e8065c..1fcb895 100644 --- a/src/main/java/de/assecutor/votianlt/service/ClientConnectionService.java +++ b/src/main/java/de/assecutor/votianlt/service/ClientConnectionService.java @@ -1,9 +1,15 @@ package de.assecutor.votianlt.service; +import de.assecutor.votianlt.controller.MessageController; import de.assecutor.votianlt.messaging.WebSocketService; import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; +import java.time.Instant; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + /** * Service for managing client connections. Connection state is determined * directly from the WebSocket session lifecycle. @@ -13,13 +19,22 @@ import org.springframework.stereotype.Service; public class ClientConnectionService { private final WebSocketService webSocketService; + private final MessageService messageService; + private final MessageController messageController; - public ClientConnectionService(WebSocketService webSocketService) { + // Track clients that are connected but not yet ready (buffer not flushed) + private final Map pendingClients = new ConcurrentHashMap<>(); + + public ClientConnectionService(WebSocketService webSocketService, @Lazy MessageService messageService, + @Lazy MessageController messageController) { this.webSocketService = webSocketService; + this.messageService = messageService; + this.messageController = messageController; } /** - * Called after successful login. + * Called after successful login. Client is registered but messages/jobs are + * only sent after buffer_flushed confirmation. * * @param appUserId * The app user ID (MongoDB ObjectId) @@ -29,7 +44,69 @@ public class ClientConnectionService { return; } - log.info("[CLIENT] Registered: {}", appUserId); + log.info("[CLIENT] Registered: {} (waiting for buffer_flushed)", appUserId); + + // Store as pending - messages and jobs will be sent after buffer_flushed + pendingClients.put(appUserId, new PendingClient(appUserId, Instant.now())); + } + + /** + * Called when client confirms that its buffer has been flushed and it's ready + * to receive messages. + * + * @param appUserId + * The app user ID + * @param messageCount + * Number of messages client processed (for logging) + */ + public void onBufferFlushed(String appUserId, int messageCount) { + if (appUserId == null || appUserId.isBlank()) { + return; + } + + PendingClient pending = pendingClients.remove(appUserId); + if (pending == null) { + log.debug("[CLIENT] Buffer flushed for {} but not in pending list", appUserId); + } else { + log.info("[CLIENT] Buffer flushed for {} (processed {} messages), sending pending data", appUserId, + messageCount); + } + + // Now send pending messages and jobs + sendPendingMessages(appUserId); + sendAssignedJobs(appUserId); + } + + /** + * Send pending messages to a client. + * + * @param appUserId + * The app user ID + */ + private void sendPendingMessages(String appUserId) { + try { + int sentCount = messageService.sendPendingMessages(appUserId); + if (sentCount > 0) { + log.info("[CLIENT] Sent {} pending messages to {}", sentCount, appUserId); + } + } catch (Exception e) { + log.error("[CLIENT] Error sending pending messages to {}: {}", appUserId, e.getMessage()); + } + } + + /** + * Send assigned jobs to a client. + * + * @param appUserId + * The app user ID + */ + private void sendAssignedJobs(String appUserId) { + try { + messageController.handleGetAssignedJobs(appUserId); + log.debug("[CLIENT] Sent assigned jobs to {}", appUserId); + } catch (Exception e) { + log.error("[CLIENT] Error sending assigned jobs to {}: {}", appUserId, e.getMessage()); + } } /** @@ -51,4 +128,10 @@ public class ClientConnectionService { public int getConnectedClientCount() { return webSocketService.getConnectedClientCount(); } + + /** + * Internal record to track pending clients waiting for buffer flush. + */ + private record PendingClient(String appUserId, Instant registeredAt) { + } } diff --git a/src/main/java/de/assecutor/votianlt/service/LocationService.java b/src/main/java/de/assecutor/votianlt/service/LocationService.java new file mode 100644 index 0000000..2454750 --- /dev/null +++ b/src/main/java/de/assecutor/votianlt/service/LocationService.java @@ -0,0 +1,144 @@ +package de.assecutor.votianlt.service; + +import de.assecutor.votianlt.model.LocationPosition; +import de.assecutor.votianlt.repository.LocationPositionRepository; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Map; + +/** + * Service for handling location positions reported by mobile clients. + * Automatically cleans up positions older than 60 minutes. + */ +@Service +@Slf4j +public class LocationService { + + private final LocationPositionRepository locationPositionRepository; + + public LocationService(LocationPositionRepository locationPositionRepository) { + this.locationPositionRepository = locationPositionRepository; + } + + /** + * Save a position reported by a client. + * + * @param appUserId + * The app user ID + * @param payload + * The position payload from the client + */ + public void savePosition(String appUserId, Map payload) { + try { + Double latitude = extractDouble(payload.get("latitude")); + Double longitude = extractDouble(payload.get("longitude")); + + if (latitude == null || longitude == null) { + log.warn("[Location] Missing latitude or longitude from {}", appUserId); + return; + } + + Double accuracy = extractDouble(payload.get("accuracy")); + Double altitude = extractDouble(payload.get("altitude")); + Double speed = extractDouble(payload.get("speed")); + Double heading = extractDouble(payload.get("heading")); + Instant timestamp = extractInstant(payload.get("timestamp")); + + LocationPosition position = new LocationPosition( + appUserId, latitude, longitude, accuracy, altitude, speed, heading, timestamp); + + locationPositionRepository.save(position); + log.debug("[Location] Saved position for {}: lat={}, lon={}", appUserId, latitude, longitude); + + } catch (Exception e) { + log.error("[Location] Error saving position for {}: {}", appUserId, e.getMessage()); + } + } + + /** + * Get the latest position for a specific app user. + * + * @param appUserId + * The app user ID + * @return The latest position or null if none found + */ + public LocationPosition getLatestPosition(String appUserId) { + List positions = locationPositionRepository.findTop1ByAppUserIdOrderByTimestampDesc(appUserId); + return positions.isEmpty() ? null : positions.get(0); + } + + /** + * Get all positions for a specific app user from the last N minutes. + * + * @param appUserId + * The app user ID + * @param minutes + * Number of minutes to look back + * @return List of positions + */ + public List getRecentPositions(String appUserId, int minutes) { + Instant cutoff = Instant.now().minus(minutes, ChronoUnit.MINUTES); + return locationPositionRepository.findByAppUserIdAndReceivedAtAfterOrderByTimestampDesc(appUserId, cutoff); + } + + /** + * Cleanup old positions. Runs every 5 minutes. + * Note: Positions also have a TTL index that auto-deletes after 60 minutes, + * but this scheduled cleanup ensures immediate removal and logging. + */ + @Scheduled(fixedRate = 300000) // 5 minutes + public void cleanupOldPositions() { + try { + Instant cutoff = Instant.now().minus(60, ChronoUnit.MINUTES); + long countBefore = locationPositionRepository.count(); + locationPositionRepository.deleteByReceivedAtBefore(cutoff); + long countAfter = locationPositionRepository.count(); + long deleted = countBefore - countAfter; + + if (deleted > 0) { + log.info("[Location] Cleaned up {} positions older than 60 minutes", deleted); + } + } catch (Exception e) { + log.error("[Location] Error during cleanup: {}", e.getMessage()); + } + } + + /** + * Extract Double from various number types. + */ + private Double extractDouble(Object value) { + if (value == null) { + return null; + } + if (value instanceof Number) { + return ((Number) value).doubleValue(); + } + try { + return Double.parseDouble(value.toString()); + } catch (NumberFormatException e) { + return null; + } + } + + /** + * Extract Instant from various formats. + */ + private Instant extractInstant(Object value) { + if (value == null) { + return Instant.now(); + } + if (value instanceof Instant) { + return (Instant) value; + } + try { + return Instant.parse(value.toString()); + } catch (Exception e) { + return Instant.now(); + } + } +} diff --git a/src/main/java/de/assecutor/votianlt/service/MessageService.java b/src/main/java/de/assecutor/votianlt/service/MessageService.java index 364d001..4dd2924 100644 --- a/src/main/java/de/assecutor/votianlt/service/MessageService.java +++ b/src/main/java/de/assecutor/votianlt/service/MessageService.java @@ -1,5 +1,6 @@ package de.assecutor.votianlt.service; +import com.fasterxml.jackson.databind.ObjectMapper; import de.assecutor.votianlt.model.Job; import de.assecutor.votianlt.model.Message; import de.assecutor.votianlt.model.MessageContentType; @@ -8,7 +9,7 @@ import de.assecutor.votianlt.model.MessageType; import de.assecutor.votianlt.dto.ChatMessageInboundPayload; import de.assecutor.votianlt.dto.ChatMessageOutboundPayload; import de.assecutor.votianlt.event.MessageReceivedEvent; -import de.assecutor.votianlt.messaging.MessagingPublisher; +import de.assecutor.votianlt.messaging.WebSocketService; import de.assecutor.votianlt.repository.JobRepository; import de.assecutor.votianlt.repository.MessageRepository; import lombok.extern.slf4j.Slf4j; @@ -16,6 +17,7 @@ import org.bson.types.ObjectId; import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Service; +import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -26,15 +28,17 @@ public class MessageService { private final MessageRepository messageRepository; private final JobRepository jobRepository; - private final MessagingPublisher messagingPublisher; private final ApplicationEventPublisher eventPublisher; + private final WebSocketService webSocketService; + private final ObjectMapper objectMapper; - public MessageService(MessageRepository messageRepository, JobRepository jobRepository, MessagingPublisher messagingPublisher, - ApplicationEventPublisher eventPublisher) { + public MessageService(MessageRepository messageRepository, JobRepository jobRepository, + ApplicationEventPublisher eventPublisher, WebSocketService webSocketService, ObjectMapper objectMapper) { this.messageRepository = messageRepository; this.jobRepository = jobRepository; - this.messagingPublisher = messagingPublisher; this.eventPublisher = eventPublisher; + this.webSocketService = webSocketService; + this.objectMapper = objectMapper; } /** @@ -111,17 +115,95 @@ public class MessageService { } /** - * Publish message to topic for the receiver + * Publish message to topic for the receiver. + * Only sends if client is connected, otherwise keeps NOTSEND status. */ private void publishMessage(Message message, String receiver) { try { + // Check if client is connected before attempting to send + if (!webSocketService.isClientConnected(receiver)) { + log.debug("[Messaging] Client {} not connected, message {} saved as NOTSEND", receiver, + message.getIdAsString()); + // Message already has NOTSEND status by default + return; + } + ChatMessageOutboundPayload payload = ChatMessageOutboundPayload.fromMessage(message); - messagingPublisher.publishAsJson(receiver, "message", payload); + byte[] data = objectMapper.writeValueAsString(payload).getBytes(StandardCharsets.UTF_8); + + // Use WebSocketService directly to get CompletableFuture for delivery tracking + webSocketService.sendToClient(receiver, "message", data) + .thenRun(() -> { + // Success: mark as sent + message.markAsSent(); + messageRepository.save(message); + log.debug("[Messaging] Message {} delivered to client {}, marked as SEND", + message.getIdAsString(), receiver); + }) + .exceptionally(ex -> { + // Failed to deliver: keep NOTSEND status + log.debug("[Messaging] Failed to deliver message {} to client {}: {}", + message.getIdAsString(), receiver, ex.getMessage()); + return null; + }); + } catch (Exception e) { log.error("[Messaging] Error publishing message: {}", e.getMessage()); } } + /** + * Get all undelivered messages (NOTSEND status) for a receiver + */ + public List getUndeliveredMessagesForReceiver(String receiver) { + return messageRepository.findUndeliveredMessagesForReceiver(receiver); + } + + /** + * Send pending messages to a client that just connected. + * Called after successful authentication. + * + * @param receiver + * AppUser ID (clientId) + * @return Number of messages sent + */ + public int sendPendingMessages(String receiver) { + if (!webSocketService.isClientConnected(receiver)) { + log.debug("[Messaging] Cannot send pending messages, client {} not connected", receiver); + return 0; + } + + List pendingMessages = getUndeliveredMessagesForReceiver(receiver); + if (pendingMessages.isEmpty()) { + return 0; + } + + int sentCount = 0; + for (Message message : pendingMessages) { + try { + ChatMessageOutboundPayload payload = ChatMessageOutboundPayload.fromMessage(message); + byte[] data = objectMapper.writeValueAsString(payload).getBytes(StandardCharsets.UTF_8); + + webSocketService.sendToClient(receiver, "message", data) + .thenRun(() -> { + message.markAsSent(); + messageRepository.save(message); + }) + .exceptionally(ex -> { + log.error("[Messaging] Failed to send pending message {}: {}", + message.getIdAsString(), ex.getMessage()); + return null; + }); + sentCount++; + } catch (Exception e) { + log.error("[Messaging] Failed to send pending message {}: {}", message.getIdAsString(), e.getMessage()); + } + } + + log.info("[Messaging] Sent {} pending messages to client {}", sentCount, receiver); + return sentCount; + } + /** * Get all messages for a specific receiver */