Erweiterungen

This commit is contained in:
2026-02-10 11:57:04 +01:00
parent 47938dda70
commit dff69aa4f9
10 changed files with 567 additions and 22 deletions

View File

@@ -4,6 +4,7 @@ import de.assecutor.votianlt.controller.MessageController;
import de.assecutor.votianlt.dto.AppLoginRequest; import de.assecutor.votianlt.dto.AppLoginRequest;
import de.assecutor.votianlt.dto.AppLoginResponse; import de.assecutor.votianlt.dto.AppLoginResponse;
import de.assecutor.votianlt.service.ClientConnectionService; import de.assecutor.votianlt.service.ClientConnectionService;
import de.assecutor.votianlt.service.LocationService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
@@ -39,8 +40,9 @@ public class MessagingConfig {
MessageController messageController = event.getApplicationContext().getBean(MessageController.class); MessageController messageController = event.getApplicationContext().getBean(MessageController.class);
ClientConnectionService clientConnectionService = event.getApplicationContext() ClientConnectionService clientConnectionService = event.getApplicationContext()
.getBean(ClientConnectionService.class); .getBean(ClientConnectionService.class);
LocationService locationService = event.getApplicationContext().getBean(LocationService.class);
setupSubscriptions(messageController, clientConnectionService); setupSubscriptions(messageController, clientConnectionService, locationService);
log.info("[Messaging] Message routing configured"); log.info("[Messaging] Message routing configured");
@@ -54,7 +56,7 @@ public class MessagingConfig {
* Setup message subscriptions on the WebSocket service. * Setup message subscriptions on the WebSocket service.
*/ */
private void setupSubscriptions(MessageController messageController, private void setupSubscriptions(MessageController messageController,
ClientConnectionService clientConnectionService) { ClientConnectionService clientConnectionService, LocationService locationService) {
// Login handler: authenticate and register session // Login handler: authenticate and register session
webSocketService.registerMessageHandler("login", (wsSessionId, payload) -> { webSocketService.registerMessageHandler("login", (wsSessionId, payload) -> {
handleLoginMessage(wsSessionId, payload, messageController, clientConnectionService); handleLoginMessage(wsSessionId, payload, messageController, clientConnectionService);
@@ -68,17 +70,27 @@ public class MessagingConfig {
}); });
}); });
// Jobs assigned handler
webSocketService.registerMessageHandler("jobs/assigned", (appUserId, payload) -> {
messageController.handleGetAssignedJobs(appUserId);
});
// Chat message handler // Chat message handler
webSocketService.registerMessageHandler("message", (appUserId, payload) -> { webSocketService.registerMessageHandler("message", (appUserId, payload) -> {
handlePayload(payload, payloadMap -> { handlePayload(payload, payloadMap -> {
messageController.handleIncomingMessage(appUserId, payloadMap); messageController.handleIncomingMessage(appUserId, payloadMap);
}); });
}); });
// Buffer flushed handler - client is ready to receive pending messages
webSocketService.registerMessageHandler("buffer_flushed", (appUserId, payload) -> {
handlePayload(payload, payloadMap -> {
int messageCount = extractMessageCount(payloadMap);
clientConnectionService.onBufferFlushed(appUserId, messageCount);
});
});
// Location handler - client sends position updates
webSocketService.registerMessageHandler("location", (appUserId, payload) -> {
handlePayload(payload, payloadMap -> {
locationService.savePosition(appUserId, payloadMap);
});
});
} }
/** /**
@@ -98,12 +110,19 @@ public class MessagingConfig {
if (response.isSuccess()) { if (response.isSuccess()) {
String appUserId = response.getAppUserId(); String appUserId = response.getAppUserId();
webSocketService.registerAuthenticatedSession(wsSessionId, appUserId); webSocketService.registerAuthenticatedSession(wsSessionId, appUserId);
clientConnectionService.registerClient(appUserId);
// Send success response to the now-authenticated session // Send success response to the now-authenticated session
Map<String, Object> authResponse = Map.of("success", true, "message", response.getMessage()); // locationTrackingEnabled: true = client should send position updates
Map<String, Object> authResponse = Map.of(
"success", true,
"message", response.getMessage(),
"locationTrackingEnabled", true);
byte[] responseBytes = objectMapper.writeValueAsBytes(authResponse); byte[] responseBytes = objectMapper.writeValueAsBytes(authResponse);
webSocketService.sendToClient(appUserId, "auth", responseBytes); webSocketService.sendToClient(appUserId, "auth", responseBytes);
// Register client - pending messages and jobs will be sent after
// client confirms buffer_flushed
clientConnectionService.registerClient(appUserId);
} else { } else {
// Send failure response to the pending session // Send failure response to the pending session
Map<String, Object> authResponse = Map.of("success", false, "message", response.getMessage()); Map<String, Object> authResponse = Map.of("success", false, "message", response.getMessage());
@@ -115,6 +134,21 @@ public class MessagingConfig {
} }
} }
/**
* Extract message count from buffer_flushed payload.
*/
private int extractMessageCount(Map<String, Object> payloadMap) {
try {
Object countObj = payloadMap.get("messageCount");
if (countObj instanceof Number) {
return ((Number) countObj).intValue();
}
return 0;
} catch (Exception e) {
return 0;
}
}
/** /**
* Parse payload bytes to a Map and pass to the consumer. * Parse payload bytes to a Map and pass to the consumer.
*/ */

View File

@@ -0,0 +1,92 @@
package de.assecutor.votianlt.model;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.bson.types.ObjectId;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.index.Indexed;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.core.mapping.Field;
import java.time.Instant;
/**
* Represents a GPS position reported by a mobile client.
*/
@Data
@NoArgsConstructor
@Document(collection = "location_positions")
public class LocationPosition {
@Id
private ObjectId id;
/**
* AppUser ID (clientId) - the user who sent this position
*/
@Field("app_user_id")
@Indexed
private String appUserId;
/**
* Latitude in decimal degrees
*/
@Field("latitude")
private Double latitude;
/**
* Longitude in decimal degrees
*/
@Field("longitude")
private Double longitude;
/**
* Accuracy of the position in meters
*/
@Field("accuracy")
private Double accuracy;
/**
* Altitude in meters above sea level (optional)
*/
@Field("altitude")
private Double altitude;
/**
* Speed in meters per second (optional)
*/
@Field("speed")
private Double speed;
/**
* Heading in degrees (0-360) (optional)
*/
@Field("heading")
private Double heading;
/**
* Timestamp when the position was reported (from client)
*/
@Field("timestamp")
private Instant timestamp;
/**
* Timestamp when the position was received by the server
*/
@Field("received_at")
@Indexed(expireAfterSeconds = 3600) // TTL index: auto-delete after 60 minutes
private Instant receivedAt;
public LocationPosition(String appUserId, Double latitude, Double longitude, Double accuracy,
Double altitude, Double speed, Double heading, Instant timestamp) {
this.appUserId = appUserId;
this.latitude = latitude;
this.longitude = longitude;
this.accuracy = accuracy;
this.altitude = altitude;
this.speed = speed;
this.heading = heading;
this.timestamp = timestamp;
this.receivedAt = Instant.now();
}
}

View File

@@ -85,6 +85,12 @@ public class Message {
@Field("read_at") @Field("read_at")
private LocalDateTime readAt; private LocalDateTime readAt;
/**
* Delivery status: NOTSEND (failed to deliver), SEND (successfully delivered)
*/
@Field("delivery_status")
private MessageDeliveryStatus deliveryStatus;
/** /**
* Constructor for general messages * Constructor for general messages
*/ */
@@ -126,6 +132,27 @@ public class Message {
this.readAt = LocalDateTime.now(); this.readAt = LocalDateTime.now();
} }
/**
* Mark the message as sent (successfully delivered)
*/
public void markAsSent() {
this.deliveryStatus = MessageDeliveryStatus.SEND;
}
/**
* Mark the message as not sent (delivery failed)
*/
public void markAsNotSent() {
this.deliveryStatus = MessageDeliveryStatus.NOTSEND;
}
/**
* Check if message was successfully delivered
*/
public boolean isDelivered() {
return deliveryStatus == MessageDeliveryStatus.SEND;
}
/** /**
* Returns the ObjectId as string for JSON serialization * Returns the ObjectId as string for JSON serialization
*/ */
@@ -157,5 +184,7 @@ public class Message {
this.createdAt = LocalDateTime.now(); this.createdAt = LocalDateTime.now();
this.isRead = false; this.isRead = false;
this.contentType = contentType != null ? contentType : MessageContentType.TEXT; this.contentType = contentType != null ? contentType : MessageContentType.TEXT;
// Server messages start as NOTSEND until confirmed delivered
this.deliveryStatus = (origin == MessageOrigin.SERVER) ? MessageDeliveryStatus.NOTSEND : null;
} }
} }

View File

@@ -0,0 +1,25 @@
package de.assecutor.votianlt.model;
/**
* Delivery status for messages sent to clients.
* Tracks whether a message was successfully delivered via WebSocket.
*/
public enum MessageDeliveryStatus {
NOTSEND("Nicht gesendet"),
SEND("Gesendet");
private final String displayName;
MessageDeliveryStatus(String displayName) {
this.displayName = displayName;
}
public String getDisplayName() {
return displayName;
}
@Override
public String toString() {
return displayName;
}
}

View File

@@ -377,9 +377,10 @@ public class JobSummaryView extends Main implements HasUrlParameter<String> {
} }
private String dimString(CargoItem ci) { private String dimString(CargoItem ci) {
String len = ci.getLengthMm() != null ? ci.getLengthMm().intValue() + " mm" : ""; // Values are stored in cm (not mm), so display directly without division
String wid = ci.getWidthMm() != null ? ci.getWidthMm().intValue() + " mm" : ""; String len = ci.getLengthMm() != null ? ci.getLengthMm().intValue() + " cm" : "";
String hei = ci.getHeightMm() != null ? ci.getHeightMm().intValue() + " mm" : ""; String wid = ci.getWidthMm() != null ? ci.getWidthMm().intValue() + " cm" : "";
String hei = ci.getHeightMm() != null ? ci.getHeightMm().intValue() + " cm" : "";
String combined = String.join(" x ", String combined = String.join(" x ",
java.util.stream.Stream.of(len, wid, hei).filter(s -> !s.isBlank()).toList()); java.util.stream.Stream.of(len, wid, hei).filter(s -> !s.isBlank()).toList());
return combined.isBlank() ? "" : combined; return combined.isBlank() ? "" : combined;

View File

@@ -0,0 +1,41 @@
package de.assecutor.votianlt.repository;
import de.assecutor.votianlt.model.LocationPosition;
import org.bson.types.ObjectId;
import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.stereotype.Repository;
import java.time.Instant;
import java.util.List;
/**
* Repository for location positions reported by mobile clients.
*/
@Repository
public interface LocationPositionRepository extends MongoRepository<LocationPosition, ObjectId> {
/**
* Find all positions for a specific app user, ordered by timestamp descending
*/
List<LocationPosition> findByAppUserIdOrderByTimestampDesc(String appUserId);
/**
* Find all positions received after a specific time
*/
List<LocationPosition> findByReceivedAtAfterOrderByTimestampDesc(Instant receivedAt);
/**
* Find all positions for a user received after a specific time
*/
List<LocationPosition> findByAppUserIdAndReceivedAtAfterOrderByTimestampDesc(String appUserId, Instant receivedAt);
/**
* Delete all positions older than the specified timestamp
*/
void deleteByReceivedAtBefore(Instant cutoff);
/**
* Find latest position for a specific app user
*/
List<LocationPosition> findTop1ByAppUserIdOrderByTimestampDesc(String appUserId);
}

View File

@@ -1,6 +1,7 @@
package de.assecutor.votianlt.repository; package de.assecutor.votianlt.repository;
import de.assecutor.votianlt.model.Message; import de.assecutor.votianlt.model.Message;
import de.assecutor.votianlt.model.MessageDeliveryStatus;
import de.assecutor.votianlt.model.MessageOrigin; import de.assecutor.votianlt.model.MessageOrigin;
import de.assecutor.votianlt.model.MessageType; import de.assecutor.votianlt.model.MessageType;
import org.bson.types.ObjectId; import org.bson.types.ObjectId;
@@ -58,4 +59,17 @@ public interface MessageRepository extends MongoRepository<Message, ObjectId> {
* Find all messages (for admin/overview), ordered by creation time descending * Find all messages (for admin/overview), ordered by creation time descending
*/ */
List<Message> findAllByOrderByCreatedAtDesc(); List<Message> findAllByOrderByCreatedAtDesc();
/**
* Find all messages with a specific delivery status for a receiver
*/
List<Message> findByReceiverAndDeliveryStatusOrderByCreatedAtAsc(String receiver,
MessageDeliveryStatus deliveryStatus);
/**
* Find all undelivered messages (NOTSEND status) for a receiver
*/
default List<Message> findUndeliveredMessagesForReceiver(String receiver) {
return findByReceiverAndDeliveryStatusOrderByCreatedAtAsc(receiver, MessageDeliveryStatus.NOTSEND);
}
} }

View File

@@ -1,9 +1,15 @@
package de.assecutor.votianlt.service; package de.assecutor.votianlt.service;
import de.assecutor.votianlt.controller.MessageController;
import de.assecutor.votianlt.messaging.WebSocketService; import de.assecutor.votianlt.messaging.WebSocketService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** /**
* Service for managing client connections. Connection state is determined * Service for managing client connections. Connection state is determined
* directly from the WebSocket session lifecycle. * directly from the WebSocket session lifecycle.
@@ -13,13 +19,22 @@ import org.springframework.stereotype.Service;
public class ClientConnectionService { public class ClientConnectionService {
private final WebSocketService webSocketService; private final WebSocketService webSocketService;
private final MessageService messageService;
private final MessageController messageController;
public ClientConnectionService(WebSocketService webSocketService) { // Track clients that are connected but not yet ready (buffer not flushed)
private final Map<String, PendingClient> pendingClients = new ConcurrentHashMap<>();
public ClientConnectionService(WebSocketService webSocketService, @Lazy MessageService messageService,
@Lazy MessageController messageController) {
this.webSocketService = webSocketService; this.webSocketService = webSocketService;
this.messageService = messageService;
this.messageController = messageController;
} }
/** /**
* Called after successful login. * Called after successful login. Client is registered but messages/jobs are
* only sent after buffer_flushed confirmation.
* *
* @param appUserId * @param appUserId
* The app user ID (MongoDB ObjectId) * The app user ID (MongoDB ObjectId)
@@ -29,7 +44,69 @@ public class ClientConnectionService {
return; return;
} }
log.info("[CLIENT] Registered: {}", appUserId); log.info("[CLIENT] Registered: {} (waiting for buffer_flushed)", appUserId);
// Store as pending - messages and jobs will be sent after buffer_flushed
pendingClients.put(appUserId, new PendingClient(appUserId, Instant.now()));
}
/**
* Called when client confirms that its buffer has been flushed and it's ready
* to receive messages.
*
* @param appUserId
* The app user ID
* @param messageCount
* Number of messages client processed (for logging)
*/
public void onBufferFlushed(String appUserId, int messageCount) {
if (appUserId == null || appUserId.isBlank()) {
return;
}
PendingClient pending = pendingClients.remove(appUserId);
if (pending == null) {
log.debug("[CLIENT] Buffer flushed for {} but not in pending list", appUserId);
} else {
log.info("[CLIENT] Buffer flushed for {} (processed {} messages), sending pending data", appUserId,
messageCount);
}
// Now send pending messages and jobs
sendPendingMessages(appUserId);
sendAssignedJobs(appUserId);
}
/**
* Send pending messages to a client.
*
* @param appUserId
* The app user ID
*/
private void sendPendingMessages(String appUserId) {
try {
int sentCount = messageService.sendPendingMessages(appUserId);
if (sentCount > 0) {
log.info("[CLIENT] Sent {} pending messages to {}", sentCount, appUserId);
}
} catch (Exception e) {
log.error("[CLIENT] Error sending pending messages to {}: {}", appUserId, e.getMessage());
}
}
/**
* Send assigned jobs to a client.
*
* @param appUserId
* The app user ID
*/
private void sendAssignedJobs(String appUserId) {
try {
messageController.handleGetAssignedJobs(appUserId);
log.debug("[CLIENT] Sent assigned jobs to {}", appUserId);
} catch (Exception e) {
log.error("[CLIENT] Error sending assigned jobs to {}: {}", appUserId, e.getMessage());
}
} }
/** /**
@@ -51,4 +128,10 @@ public class ClientConnectionService {
public int getConnectedClientCount() { public int getConnectedClientCount() {
return webSocketService.getConnectedClientCount(); return webSocketService.getConnectedClientCount();
} }
/**
* Internal record to track pending clients waiting for buffer flush.
*/
private record PendingClient(String appUserId, Instant registeredAt) {
}
} }

View File

@@ -0,0 +1,144 @@
package de.assecutor.votianlt.service;
import de.assecutor.votianlt.model.LocationPosition;
import de.assecutor.votianlt.repository.LocationPositionRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
/**
* Service for handling location positions reported by mobile clients.
* Automatically cleans up positions older than 60 minutes.
*/
@Service
@Slf4j
public class LocationService {
private final LocationPositionRepository locationPositionRepository;
public LocationService(LocationPositionRepository locationPositionRepository) {
this.locationPositionRepository = locationPositionRepository;
}
/**
* Save a position reported by a client.
*
* @param appUserId
* The app user ID
* @param payload
* The position payload from the client
*/
public void savePosition(String appUserId, Map<String, Object> payload) {
try {
Double latitude = extractDouble(payload.get("latitude"));
Double longitude = extractDouble(payload.get("longitude"));
if (latitude == null || longitude == null) {
log.warn("[Location] Missing latitude or longitude from {}", appUserId);
return;
}
Double accuracy = extractDouble(payload.get("accuracy"));
Double altitude = extractDouble(payload.get("altitude"));
Double speed = extractDouble(payload.get("speed"));
Double heading = extractDouble(payload.get("heading"));
Instant timestamp = extractInstant(payload.get("timestamp"));
LocationPosition position = new LocationPosition(
appUserId, latitude, longitude, accuracy, altitude, speed, heading, timestamp);
locationPositionRepository.save(position);
log.debug("[Location] Saved position for {}: lat={}, lon={}", appUserId, latitude, longitude);
} catch (Exception e) {
log.error("[Location] Error saving position for {}: {}", appUserId, e.getMessage());
}
}
/**
* Get the latest position for a specific app user.
*
* @param appUserId
* The app user ID
* @return The latest position or null if none found
*/
public LocationPosition getLatestPosition(String appUserId) {
List<LocationPosition> positions = locationPositionRepository.findTop1ByAppUserIdOrderByTimestampDesc(appUserId);
return positions.isEmpty() ? null : positions.get(0);
}
/**
* Get all positions for a specific app user from the last N minutes.
*
* @param appUserId
* The app user ID
* @param minutes
* Number of minutes to look back
* @return List of positions
*/
public List<LocationPosition> getRecentPositions(String appUserId, int minutes) {
Instant cutoff = Instant.now().minus(minutes, ChronoUnit.MINUTES);
return locationPositionRepository.findByAppUserIdAndReceivedAtAfterOrderByTimestampDesc(appUserId, cutoff);
}
/**
* Cleanup old positions. Runs every 5 minutes.
* Note: Positions also have a TTL index that auto-deletes after 60 minutes,
* but this scheduled cleanup ensures immediate removal and logging.
*/
@Scheduled(fixedRate = 300000) // 5 minutes
public void cleanupOldPositions() {
try {
Instant cutoff = Instant.now().minus(60, ChronoUnit.MINUTES);
long countBefore = locationPositionRepository.count();
locationPositionRepository.deleteByReceivedAtBefore(cutoff);
long countAfter = locationPositionRepository.count();
long deleted = countBefore - countAfter;
if (deleted > 0) {
log.info("[Location] Cleaned up {} positions older than 60 minutes", deleted);
}
} catch (Exception e) {
log.error("[Location] Error during cleanup: {}", e.getMessage());
}
}
/**
* Extract Double from various number types.
*/
private Double extractDouble(Object value) {
if (value == null) {
return null;
}
if (value instanceof Number) {
return ((Number) value).doubleValue();
}
try {
return Double.parseDouble(value.toString());
} catch (NumberFormatException e) {
return null;
}
}
/**
* Extract Instant from various formats.
*/
private Instant extractInstant(Object value) {
if (value == null) {
return Instant.now();
}
if (value instanceof Instant) {
return (Instant) value;
}
try {
return Instant.parse(value.toString());
} catch (Exception e) {
return Instant.now();
}
}
}

View File

@@ -1,5 +1,6 @@
package de.assecutor.votianlt.service; package de.assecutor.votianlt.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.assecutor.votianlt.model.Job; import de.assecutor.votianlt.model.Job;
import de.assecutor.votianlt.model.Message; import de.assecutor.votianlt.model.Message;
import de.assecutor.votianlt.model.MessageContentType; import de.assecutor.votianlt.model.MessageContentType;
@@ -8,7 +9,7 @@ import de.assecutor.votianlt.model.MessageType;
import de.assecutor.votianlt.dto.ChatMessageInboundPayload; import de.assecutor.votianlt.dto.ChatMessageInboundPayload;
import de.assecutor.votianlt.dto.ChatMessageOutboundPayload; import de.assecutor.votianlt.dto.ChatMessageOutboundPayload;
import de.assecutor.votianlt.event.MessageReceivedEvent; import de.assecutor.votianlt.event.MessageReceivedEvent;
import de.assecutor.votianlt.messaging.MessagingPublisher; import de.assecutor.votianlt.messaging.WebSocketService;
import de.assecutor.votianlt.repository.JobRepository; import de.assecutor.votianlt.repository.JobRepository;
import de.assecutor.votianlt.repository.MessageRepository; import de.assecutor.votianlt.repository.MessageRepository;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@@ -16,6 +17,7 @@ import org.bson.types.ObjectId;
import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
@@ -26,15 +28,17 @@ public class MessageService {
private final MessageRepository messageRepository; private final MessageRepository messageRepository;
private final JobRepository jobRepository; private final JobRepository jobRepository;
private final MessagingPublisher messagingPublisher;
private final ApplicationEventPublisher eventPublisher; private final ApplicationEventPublisher eventPublisher;
private final WebSocketService webSocketService;
private final ObjectMapper objectMapper;
public MessageService(MessageRepository messageRepository, JobRepository jobRepository, MessagingPublisher messagingPublisher, public MessageService(MessageRepository messageRepository, JobRepository jobRepository,
ApplicationEventPublisher eventPublisher) { ApplicationEventPublisher eventPublisher, WebSocketService webSocketService, ObjectMapper objectMapper) {
this.messageRepository = messageRepository; this.messageRepository = messageRepository;
this.jobRepository = jobRepository; this.jobRepository = jobRepository;
this.messagingPublisher = messagingPublisher;
this.eventPublisher = eventPublisher; this.eventPublisher = eventPublisher;
this.webSocketService = webSocketService;
this.objectMapper = objectMapper;
} }
/** /**
@@ -111,17 +115,95 @@ public class MessageService {
} }
/** /**
* Publish message to topic for the receiver * Publish message to topic for the receiver.
* Only sends if client is connected, otherwise keeps NOTSEND status.
*/ */
private void publishMessage(Message message, String receiver) { private void publishMessage(Message message, String receiver) {
try { try {
// Check if client is connected before attempting to send
if (!webSocketService.isClientConnected(receiver)) {
log.debug("[Messaging] Client {} not connected, message {} saved as NOTSEND", receiver,
message.getIdAsString());
// Message already has NOTSEND status by default
return;
}
ChatMessageOutboundPayload payload = ChatMessageOutboundPayload.fromMessage(message); ChatMessageOutboundPayload payload = ChatMessageOutboundPayload.fromMessage(message);
messagingPublisher.publishAsJson(receiver, "message", payload); byte[] data = objectMapper.writeValueAsString(payload).getBytes(StandardCharsets.UTF_8);
// Use WebSocketService directly to get CompletableFuture for delivery tracking
webSocketService.sendToClient(receiver, "message", data)
.thenRun(() -> {
// Success: mark as sent
message.markAsSent();
messageRepository.save(message);
log.debug("[Messaging] Message {} delivered to client {}, marked as SEND",
message.getIdAsString(), receiver);
})
.exceptionally(ex -> {
// Failed to deliver: keep NOTSEND status
log.debug("[Messaging] Failed to deliver message {} to client {}: {}",
message.getIdAsString(), receiver, ex.getMessage());
return null;
});
} catch (Exception e) { } catch (Exception e) {
log.error("[Messaging] Error publishing message: {}", e.getMessage()); log.error("[Messaging] Error publishing message: {}", e.getMessage());
} }
} }
/**
* Get all undelivered messages (NOTSEND status) for a receiver
*/
public List<Message> getUndeliveredMessagesForReceiver(String receiver) {
return messageRepository.findUndeliveredMessagesForReceiver(receiver);
}
/**
* Send pending messages to a client that just connected.
* Called after successful authentication.
*
* @param receiver
* AppUser ID (clientId)
* @return Number of messages sent
*/
public int sendPendingMessages(String receiver) {
if (!webSocketService.isClientConnected(receiver)) {
log.debug("[Messaging] Cannot send pending messages, client {} not connected", receiver);
return 0;
}
List<Message> pendingMessages = getUndeliveredMessagesForReceiver(receiver);
if (pendingMessages.isEmpty()) {
return 0;
}
int sentCount = 0;
for (Message message : pendingMessages) {
try {
ChatMessageOutboundPayload payload = ChatMessageOutboundPayload.fromMessage(message);
byte[] data = objectMapper.writeValueAsString(payload).getBytes(StandardCharsets.UTF_8);
webSocketService.sendToClient(receiver, "message", data)
.thenRun(() -> {
message.markAsSent();
messageRepository.save(message);
})
.exceptionally(ex -> {
log.error("[Messaging] Failed to send pending message {}: {}",
message.getIdAsString(), ex.getMessage());
return null;
});
sentCount++;
} catch (Exception e) {
log.error("[Messaging] Failed to send pending message {}: {}", message.getIdAsString(), e.getMessage());
}
}
log.info("[Messaging] Sent {} pending messages to client {}", sentCount, receiver);
return sentCount;
}
/** /**
* Get all messages for a specific receiver * Get all messages for a specific receiver
*/ */