mongodb viewer
This commit is contained in:
Binary file not shown.
@@ -25,6 +25,7 @@ import de.assecutor.votianlt.model.Comment;
|
||||
import de.assecutor.votianlt.service.JobHistoryService;
|
||||
import de.assecutor.votianlt.service.EmailService;
|
||||
import de.assecutor.votianlt.service.MessageService;
|
||||
import de.assecutor.votianlt.service.ClientConnectionService;
|
||||
import de.assecutor.votianlt.model.JobStatus;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
@@ -72,12 +73,14 @@ public class MessageController {
|
||||
private final MessageService messageService;
|
||||
private final UserService userService;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final ClientConnectionService clientConnectionService;
|
||||
|
||||
public MessageController(MqttPublisher mqttPublisher, AppUserRepository appUserRepository,
|
||||
AppUserService appUserService, JobRepository jobRepository, CargoItemRepository cargoItemRepository,
|
||||
TaskRepository taskRepository, PhotoRepository photoRepository, BarcodeRepository barcodeRepository,
|
||||
SignatureRepository signatureRepository, CommentRepository commentRepository, JobHistoryService jobHistoryService,
|
||||
EmailService emailService, MessageService messageService, UserService userService, ObjectMapper objectMapper) {
|
||||
EmailService emailService, MessageService messageService, UserService userService, ObjectMapper objectMapper,
|
||||
ClientConnectionService clientConnectionService) {
|
||||
this.mqttPublisher = mqttPublisher;
|
||||
this.appUserRepository = appUserRepository;
|
||||
this.appUserService = appUserService;
|
||||
@@ -93,6 +96,7 @@ public class MessageController {
|
||||
this.messageService = messageService;
|
||||
this.userService = userService;
|
||||
this.objectMapper = objectMapper;
|
||||
this.clientConnectionService = clientConnectionService;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -123,6 +127,8 @@ public class MessageController {
|
||||
response = new AppLoginResponse(true, "Anmeldung erfolgreich", null, null, user.getIdAsString());
|
||||
// Store clientId mapping for this user session
|
||||
storeClientIdMapping(user.getIdAsString(), request.getClientId());
|
||||
// Register client for ping/pong monitoring
|
||||
clientConnectionService.registerClient(request.getClientId(), user.getIdAsString());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -172,7 +178,15 @@ public class MessageController {
|
||||
|
||||
// Find jobs assigned to this app user
|
||||
List<Job> assignedJobs = jobRepository.findByAppUser(appUserId);
|
||||
log.debug("Found {} jobs for appUserId: {}", assignedJobs.size(), appUserId);
|
||||
log.info("Found {} jobs for appUserId: {}", assignedJobs.size(), appUserId);
|
||||
|
||||
// Debug: Log all jobs and their app_user values to diagnose assignment issues
|
||||
List<Job> allJobs = jobRepository.findAll();
|
||||
log.info("DEBUG: Total jobs in database: {}", allJobs.size());
|
||||
for (Job job : allJobs) {
|
||||
log.info("DEBUG: Job {} (number: {}) has app_user='{}', digitalProcessing={}",
|
||||
job.getIdAsString(), job.getJobNumber(), job.getAppUser(), job.isDigitalProcessing());
|
||||
}
|
||||
|
||||
// For each job, fetch related cargo items and tasks (ordered by task order)
|
||||
List<JobWithRelatedDataDTO> jobsWithRelatedData = assignedJobs.stream().map(job -> {
|
||||
@@ -608,6 +622,22 @@ public class MessageController {
|
||||
return clientIdUserMapping.get(clientId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle pong response from a client.
|
||||
* Client sends to /server/{clientId}/pong with payload { timestamp }.
|
||||
* Used for connection monitoring.
|
||||
*/
|
||||
public void handlePong(Map<String, Object> payload) {
|
||||
String clientId = payload.get("clientId") != null ? payload.get("clientId").toString() : null;
|
||||
log.debug("Received pong from client: {}", clientId);
|
||||
|
||||
if (clientId != null && !clientId.isBlank()) {
|
||||
clientConnectionService.handlePong(clientId);
|
||||
} else {
|
||||
log.warn("Received pong without clientId in payload");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle incoming message from a client via MQTT.
|
||||
* Client sends to /server/{clientId}/message with payload:
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
package de.assecutor.votianlt.messaging.config;
|
||||
|
||||
import de.assecutor.votianlt.controller.MessageController;
|
||||
import de.assecutor.votianlt.dto.AppLoginRequest;
|
||||
import de.assecutor.votianlt.messaging.delivery.MessageDeliveryService;
|
||||
import de.assecutor.votianlt.messaging.model.AcknowledgmentMessage;
|
||||
import de.assecutor.votianlt.messaging.model.MessageEnvelope;
|
||||
import de.assecutor.votianlt.messaging.plugin.*;
|
||||
import de.assecutor.votianlt.messaging.plugin.mqtt.MqttMessagingPlugin;
|
||||
import de.assecutor.votianlt.service.ClientConnectionService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||
@@ -14,6 +17,7 @@ import org.springframework.context.event.EventListener;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Configuration for the plugin-based messaging system.
|
||||
@@ -61,8 +65,10 @@ public class PluginMessagingConfig {
|
||||
MessagingPlugin plugin = createPlugin(pluginType);
|
||||
PluginConfig config = createPluginConfig(pluginType);
|
||||
|
||||
// Get MessageDeliveryService from context (after all beans are created)
|
||||
// Get beans from context (after all beans are created)
|
||||
MessageDeliveryService deliveryService = event.getApplicationContext().getBean(MessageDeliveryService.class);
|
||||
MessageController messageController = event.getApplicationContext().getBean(MessageController.class);
|
||||
ClientConnectionService clientConnectionService = event.getApplicationContext().getBean(ClientConnectionService.class);
|
||||
|
||||
// Set up a listener to subscribe when connected
|
||||
log.info("[PluginMessagingConfig] Adding state listener");
|
||||
@@ -72,7 +78,7 @@ public class PluginMessagingConfig {
|
||||
if (stateEvent.isConnected()) {
|
||||
log.info("[PluginMessagingConfig] Plugin connected, setting up subscriptions");
|
||||
try {
|
||||
setupSubscriptions(deliveryService);
|
||||
setupSubscriptions(deliveryService, messageController, clientConnectionService);
|
||||
log.info("[PluginMessagingConfig] Subscriptions setup completed");
|
||||
} catch (Exception e) {
|
||||
log.error("[PluginMessagingConfig] Error setting up subscriptions: {}", e.getMessage(), e);
|
||||
@@ -133,7 +139,9 @@ public class PluginMessagingConfig {
|
||||
/**
|
||||
* Setup message subscriptions using the new plugin API.
|
||||
*/
|
||||
private void setupSubscriptions(MessageDeliveryService deliveryService) {
|
||||
private void setupSubscriptions(MessageDeliveryService deliveryService,
|
||||
MessageController messageController,
|
||||
ClientConnectionService clientConnectionService) {
|
||||
log.info("[PluginMessagingConfig] Setting up message subscriptions");
|
||||
|
||||
try {
|
||||
@@ -157,12 +165,13 @@ public class PluginMessagingConfig {
|
||||
"task_completed",
|
||||
"jobs/assigned",
|
||||
"message",
|
||||
"login"
|
||||
"login",
|
||||
"pong"
|
||||
};
|
||||
|
||||
for (String messageType : messageTypes) {
|
||||
pluginManager.registerMessageHandler(messageType, (clientId, payload) ->
|
||||
handleEnvelopedMessage(clientId, payload, deliveryService));
|
||||
handleEnvelopedMessage(clientId, payload, deliveryService, messageController, clientConnectionService));
|
||||
}
|
||||
|
||||
log.info("[PluginMessagingConfig] Message subscriptions initialized");
|
||||
@@ -175,8 +184,10 @@ public class PluginMessagingConfig {
|
||||
|
||||
/**
|
||||
* Handle incoming enveloped message.
|
||||
* Supports both new envelope format and legacy format for backwards compatibility.
|
||||
*/
|
||||
private void handleEnvelopedMessage(String clientId, byte[] payload, MessageDeliveryService deliveryService) {
|
||||
private void handleEnvelopedMessage(String clientId, byte[] payload, MessageDeliveryService deliveryService,
|
||||
MessageController messageController, ClientConnectionService clientConnectionService) {
|
||||
try {
|
||||
String json = new String(payload, StandardCharsets.UTF_8);
|
||||
log.info("[PluginMessagingConfig] Received JSON from client {}: {}", clientId, json);
|
||||
@@ -184,13 +195,75 @@ public class PluginMessagingConfig {
|
||||
// Try to parse as envelope first
|
||||
try {
|
||||
MessageEnvelope envelope = objectMapper.readValue(json, MessageEnvelope.class);
|
||||
deliveryService.handleIncomingMessage(envelope);
|
||||
// Valid envelope - check if it has required fields
|
||||
if (envelope.getMessageId() != null && envelope.getTopic() != null) {
|
||||
deliveryService.handleIncomingMessage(envelope);
|
||||
return;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// If not an envelope, it might be a legacy message - log and skip
|
||||
log.warn("[PluginMessagingConfig] Received non-enveloped message from client {}, skipping. JSON: {}", clientId, json);
|
||||
// Not a valid envelope, try legacy format
|
||||
log.debug("[PluginMessagingConfig] Message is not an envelope, trying legacy format");
|
||||
}
|
||||
|
||||
// Handle legacy format (direct payload without envelope)
|
||||
handleLegacyMessage(clientId, json, messageController, clientConnectionService);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[PluginMessagingConfig] Error handling enveloped message from client {}: {}", clientId, e.getMessage(), e);
|
||||
log.error("[PluginMessagingConfig] Error handling message from client {}: {}", clientId, e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle legacy message format (without envelope wrapper).
|
||||
* This supports older clients that don't use the envelope format.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private void handleLegacyMessage(String clientId, String json,
|
||||
MessageController messageController, ClientConnectionService clientConnectionService) {
|
||||
try {
|
||||
Map<String, Object> payload = objectMapper.readValue(json, Map.class);
|
||||
log.info("[PluginMessagingConfig] Processing legacy message from client {}: {}", clientId, payload);
|
||||
|
||||
// Check if this is a login request (has email, password, clientId)
|
||||
if (payload.containsKey("email") && payload.containsKey("password") && payload.containsKey("clientId")) {
|
||||
AppLoginRequest loginRequest = objectMapper.convertValue(payload, AppLoginRequest.class);
|
||||
log.info("[PluginMessagingConfig] Routing legacy login request for email: {}", loginRequest.getEmail());
|
||||
messageController.handleAppLogin(loginRequest);
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if this is a pong response
|
||||
if ("pong".equals(payload.get("type"))) {
|
||||
String pongClientId = clientId != null ? clientId : (String) payload.get("clientId");
|
||||
if (pongClientId != null) {
|
||||
log.debug("[PluginMessagingConfig] Routing legacy pong from client: {}", pongClientId);
|
||||
clientConnectionService.handlePong(pongClientId);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if this is a task completion
|
||||
if (payload.containsKey("taskType") || payload.containsKey("taskId")) {
|
||||
String taskType = payload.get("taskType") != null ? payload.get("taskType").toString() : null;
|
||||
log.info("[PluginMessagingConfig] Routing legacy task_completed from client: {}", clientId);
|
||||
messageController.handleTaskCompleted(payload, taskType);
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if this is a jobs/assigned request
|
||||
if (payload.containsKey("appUserId")) {
|
||||
if (clientId != null) {
|
||||
payload.put("clientId", clientId);
|
||||
}
|
||||
log.info("[PluginMessagingConfig] Routing legacy jobs/assigned request from client: {}", clientId);
|
||||
messageController.handleGetAssignedJobs(payload);
|
||||
return;
|
||||
}
|
||||
|
||||
log.warn("[PluginMessagingConfig] Unknown legacy message format from client {}: {}", clientId, json);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[PluginMessagingConfig] Error handling legacy message from client {}: {}", clientId, e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,6 +51,8 @@ public class AcknowledgmentHandler {
|
||||
handleLogin(payloadMap);
|
||||
} else if (topic.matches("/server/.+/message")) {
|
||||
handleIncomingMessage(topic, payloadMap);
|
||||
} else if (topic.matches("/server/.+/pong")) {
|
||||
handlePong(topic, payloadMap);
|
||||
} else {
|
||||
log.debug("[AckHandler] No route for topic {}", topic);
|
||||
}
|
||||
@@ -124,5 +126,24 @@ public class AcknowledgmentHandler {
|
||||
log.error("[AckHandler] Error handling incoming message: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle pong response from client for connection monitoring
|
||||
*/
|
||||
private void handlePong(String topic, Map<String, Object> payload) {
|
||||
try {
|
||||
// Extract clientId from topic: /server/{clientId}/pong
|
||||
String[] parts = topic.split("/");
|
||||
String clientId = parts.length > 2 ? parts[2] : null;
|
||||
if (clientId != null && !clientId.isBlank()) {
|
||||
payload.put("clientId", clientId);
|
||||
} else {
|
||||
log.warn("[AckHandler] Couldn't extract clientId from topic {} for pong", topic);
|
||||
}
|
||||
messageController.handlePong(payload);
|
||||
} catch (Exception e) {
|
||||
log.error("[AckHandler] Error handling pong: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -97,6 +97,14 @@ public interface MessageDeliveryService {
|
||||
*/
|
||||
void retryPendingDeliveries();
|
||||
|
||||
/**
|
||||
* Retry pending deliveries for a specific client.
|
||||
* Called when a client reconnects.
|
||||
*
|
||||
* @param clientId The client identifier
|
||||
*/
|
||||
void retryPendingDeliveriesForClient(String clientId);
|
||||
|
||||
/**
|
||||
* Clean up expired and completed deliveries.
|
||||
* Called by scheduled task.
|
||||
|
||||
@@ -6,6 +6,7 @@ import de.assecutor.votianlt.messaging.plugin.PluginManager;
|
||||
import de.assecutor.votianlt.messaging.plugin.SendOptions;
|
||||
import de.assecutor.votianlt.repository.MessageEnvelopeRepository;
|
||||
import de.assecutor.votianlt.repository.PendingDeliveryRepository;
|
||||
import de.assecutor.votianlt.service.ClientConnectionService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
@@ -45,6 +46,7 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService {
|
||||
private final AcknowledgmentHandler acknowledgmentHandler;
|
||||
private final DeliveryConfig config;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final ClientConnectionService clientConnectionService;
|
||||
|
||||
// In-memory tracking of messages awaiting acknowledgment
|
||||
private final Map<String, PendingAckMessage> pendingAckMessages = new ConcurrentHashMap<>();
|
||||
@@ -77,13 +79,15 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService {
|
||||
MessageEnvelopeRepository envelopeRepository,
|
||||
AcknowledgmentHandler acknowledgmentHandler,
|
||||
DeliveryConfig config,
|
||||
ObjectMapper objectMapper) {
|
||||
ObjectMapper objectMapper,
|
||||
ClientConnectionService clientConnectionService) {
|
||||
this.pluginManager = pluginManager;
|
||||
this.pendingDeliveryRepository = pendingDeliveryRepository;
|
||||
this.envelopeRepository = envelopeRepository;
|
||||
this.acknowledgmentHandler = acknowledgmentHandler;
|
||||
this.config = config;
|
||||
this.objectMapper = objectMapper;
|
||||
this.clientConnectionService = clientConnectionService;
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
@@ -133,6 +137,13 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService {
|
||||
List<String> toRemove = new ArrayList<>();
|
||||
|
||||
for (PendingAckMessage pending : pendingAckMessages.values()) {
|
||||
// Only retry if client is connected
|
||||
if (!clientConnectionService.isClientConnected(pending.clientId)) {
|
||||
log.debug("[MessageDelivery] Skipping retry for message {} - client {} is not connected",
|
||||
pending.messageId, pending.clientId);
|
||||
continue;
|
||||
}
|
||||
|
||||
pending.retryCount++;
|
||||
|
||||
if (pending.retryCount > ackMaxRetries) {
|
||||
@@ -351,6 +362,59 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void retryPendingDeliveriesForClient(String clientId) {
|
||||
if (clientId == null || clientId.isBlank()) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("[MessageDelivery] Client {} reconnected - retrying pending messages", clientId);
|
||||
|
||||
// Retry in-memory pending ACK messages for this client
|
||||
int inMemoryCount = 0;
|
||||
for (PendingAckMessage pending : pendingAckMessages.values()) {
|
||||
if (clientId.equals(pending.clientId)) {
|
||||
inMemoryCount++;
|
||||
try {
|
||||
SendOptions sendOptions = SendOptions.reliable();
|
||||
pluginManager.sendToClient(pending.clientId, pending.messageType, pending.envelopeData, sendOptions)
|
||||
.thenAccept(v -> log.info("[MessageDelivery] Reconnect retry sent for message {} to client {}",
|
||||
pending.messageId, clientId))
|
||||
.exceptionally(ex -> {
|
||||
log.error("[MessageDelivery] Reconnect retry failed for message {}: {}",
|
||||
pending.messageId, ex.getMessage());
|
||||
return null;
|
||||
});
|
||||
} catch (Exception e) {
|
||||
log.error("[MessageDelivery] Error during reconnect retry for message {}: {}",
|
||||
pending.messageId, e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Retry database pending deliveries for this client
|
||||
try {
|
||||
List<PendingDelivery> pendingDeliveries = pendingDeliveryRepository
|
||||
.findByStatusIn(List.of(DeliveryStatus.PENDING, DeliveryStatus.SENT));
|
||||
|
||||
int dbCount = 0;
|
||||
for (PendingDelivery pending : pendingDeliveries) {
|
||||
String topic = pending.getTopic();
|
||||
if (topic != null && topic.startsWith(clientId + "/")) {
|
||||
dbCount++;
|
||||
retryDelivery(pending);
|
||||
}
|
||||
}
|
||||
|
||||
log.info("[MessageDelivery] Triggered retry for client {}: {} in-memory, {} from database",
|
||||
clientId, inMemoryCount, dbCount);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[MessageDelivery] Error retrying database deliveries for client {}: {}",
|
||||
clientId, e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanupOldDeliveries() {
|
||||
try {
|
||||
@@ -460,6 +524,13 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService {
|
||||
String clientId = parts[0];
|
||||
String messageType = parts[1];
|
||||
|
||||
// Only retry if client is connected
|
||||
if (!clientConnectionService.isClientConnected(clientId)) {
|
||||
log.debug("[MessageDelivery] Skipping retry for message {} - client {} is not connected",
|
||||
pending.getMessageId(), clientId);
|
||||
return;
|
||||
}
|
||||
|
||||
// Send via plugin manager
|
||||
SendOptions options = SendOptions.reliable();
|
||||
pluginManager.sendToClient(clientId, messageType, pending.getEnvelopeData(), options)
|
||||
|
||||
@@ -86,8 +86,12 @@ public interface JobRepository extends MongoRepository<Job, ObjectId> {
|
||||
long countByIsDraftTrue();
|
||||
|
||||
/**
|
||||
* Findet alle Aufträge, die einem bestimmten App-Nutzer zugewiesen sind
|
||||
* Findet alle nicht abgeschlossenen Aufträge, die einem bestimmten App-Nutzer zugewiesen sind.
|
||||
* Excludes jobs with status COMPLETED or CANCELLED.
|
||||
* Uses explicit query because @Field("app_user") annotation is not always
|
||||
* respected by Spring Data MongoDB query derivation.
|
||||
*/
|
||||
@Query("{'app_user': ?0, 'status': {'$nin': ['COMPLETED', 'CANCELLED']}}")
|
||||
List<Job> findByAppUser(String appUser);
|
||||
|
||||
/**
|
||||
|
||||
@@ -41,6 +41,11 @@ public interface PendingDeliveryRepository extends MongoRepository<PendingDelive
|
||||
*/
|
||||
List<PendingDelivery> findByStatusInAndExpiresAtBefore(List<DeliveryStatus> statuses, LocalDateTime dateTime);
|
||||
|
||||
/**
|
||||
* Find deliveries with specific statuses
|
||||
*/
|
||||
List<PendingDelivery> findByStatusIn(List<DeliveryStatus> statuses);
|
||||
|
||||
/**
|
||||
* Find all deliveries for a specific client
|
||||
*/
|
||||
|
||||
@@ -0,0 +1,276 @@
|
||||
package de.assecutor.votianlt.service;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import de.assecutor.votianlt.messaging.delivery.MessageDeliveryService;
|
||||
import de.assecutor.votianlt.messaging.plugin.PluginManager;
|
||||
import de.assecutor.votianlt.messaging.plugin.SendOptions;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* Service for managing client connections via Ping/Pong mechanism.
|
||||
* Tracks connected clients and periodically checks their connectivity.
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
public class ClientConnectionService {
|
||||
|
||||
/**
|
||||
* Represents the connection state of a client.
|
||||
*/
|
||||
public record ClientState(
|
||||
String clientId,
|
||||
String userId,
|
||||
boolean connected,
|
||||
Instant lastPingSent,
|
||||
Instant lastPongReceived,
|
||||
Instant connectedAt
|
||||
) {
|
||||
public ClientState withPingSent(Instant pingSent) {
|
||||
return new ClientState(clientId, userId, connected, pingSent, lastPongReceived, connectedAt);
|
||||
}
|
||||
|
||||
public ClientState withPongReceived(Instant pongReceived) {
|
||||
return new ClientState(clientId, userId, true, lastPingSent, pongReceived, connectedAt);
|
||||
}
|
||||
|
||||
public ClientState withConnected(boolean isConnected) {
|
||||
return new ClientState(clientId, userId, isConnected, lastPingSent, lastPongReceived, connectedAt);
|
||||
}
|
||||
}
|
||||
|
||||
private final Map<String, ClientState> connectedClients = new ConcurrentHashMap<>();
|
||||
private final PluginManager pluginManager;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final MessageDeliveryService messageDeliveryService;
|
||||
|
||||
@Value("${app.client.ping.interval-seconds:15}")
|
||||
private int pingIntervalSeconds;
|
||||
|
||||
@Value("${app.client.ping.timeout-seconds:5}")
|
||||
private int pingTimeoutSeconds;
|
||||
|
||||
public ClientConnectionService(PluginManager pluginManager, ObjectMapper objectMapper,
|
||||
@Lazy MessageDeliveryService messageDeliveryService) {
|
||||
this.pluginManager = pluginManager;
|
||||
this.objectMapper = objectMapper;
|
||||
this.messageDeliveryService = messageDeliveryService;
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a client as connected after successful login.
|
||||
*
|
||||
* @param clientId The unique client identifier
|
||||
* @param userId The user ID associated with this client
|
||||
*/
|
||||
public void registerClient(String clientId, String userId) {
|
||||
if (clientId == null || clientId.isBlank()) {
|
||||
log.warn("Cannot register client with null or blank clientId");
|
||||
return;
|
||||
}
|
||||
|
||||
ClientState previousState = connectedClients.get(clientId);
|
||||
boolean wasDisconnected = previousState != null && !previousState.connected();
|
||||
|
||||
Instant now = Instant.now();
|
||||
ClientState state = new ClientState(clientId, userId, true, null, now, now);
|
||||
connectedClients.put(clientId, state);
|
||||
log.info("[ClientConnectionService] Client registered: clientId={}, userId={}, totalClients={}",
|
||||
clientId, userId, connectedClients.size());
|
||||
|
||||
// If client was previously disconnected, retry pending messages
|
||||
if (wasDisconnected) {
|
||||
log.info("Client {} re-registered after disconnect - triggering pending message retry", clientId);
|
||||
messageDeliveryService.retryPendingDeliveriesForClient(clientId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Unregisters a client (e.g., on explicit logout).
|
||||
*
|
||||
* @param clientId The client identifier to unregister
|
||||
*/
|
||||
public void unregisterClient(String clientId) {
|
||||
ClientState removed = connectedClients.remove(clientId);
|
||||
if (removed != null) {
|
||||
log.info("Client unregistered: clientId={}, userId={}", clientId, removed.userId());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles a pong response from a client.
|
||||
*
|
||||
* @param clientId The client that sent the pong
|
||||
*/
|
||||
public void handlePong(String clientId) {
|
||||
if (clientId == null || clientId.isBlank()) {
|
||||
log.warn("Received pong from null or blank clientId");
|
||||
return;
|
||||
}
|
||||
|
||||
ClientState state = connectedClients.get(clientId);
|
||||
if (state != null) {
|
||||
boolean wasDisconnected = !state.connected();
|
||||
ClientState updatedState = state.withPongReceived(Instant.now());
|
||||
connectedClients.put(clientId, updatedState);
|
||||
log.debug("Pong received from client: clientId={}", clientId);
|
||||
|
||||
// If client was disconnected and is now reconnected, retry pending messages
|
||||
if (wasDisconnected) {
|
||||
log.info("Client {} reconnected via pong - triggering pending message retry", clientId);
|
||||
messageDeliveryService.retryPendingDeliveriesForClient(clientId);
|
||||
}
|
||||
} else {
|
||||
log.warn("Received pong from unknown client: clientId={}", clientId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if a client is currently connected.
|
||||
*
|
||||
* @param clientId The client identifier
|
||||
* @return true if the client is connected
|
||||
*/
|
||||
public boolean isClientConnected(String clientId) {
|
||||
ClientState state = connectedClients.get(clientId);
|
||||
return state != null && state.connected();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets all connected client IDs.
|
||||
*
|
||||
* @return Set of connected client IDs
|
||||
*/
|
||||
public Set<String> getConnectedClientIds() {
|
||||
return connectedClients.entrySet().stream()
|
||||
.filter(e -> e.getValue().connected())
|
||||
.map(Map.Entry::getKey)
|
||||
.collect(java.util.stream.Collectors.toSet());
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the connection state for a specific client.
|
||||
*
|
||||
* @param clientId The client identifier
|
||||
* @return ClientState or null if not found
|
||||
*/
|
||||
public ClientState getClientState(String clientId) {
|
||||
return connectedClients.get(clientId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Scheduled task to send pings to all connected clients.
|
||||
* Runs based on the configured interval (app.client.ping.interval-seconds).
|
||||
*/
|
||||
@Scheduled(fixedRateString = "${app.client.ping.interval-seconds:15}000")
|
||||
public void sendPingsToAllClients() {
|
||||
log.info("[ClientConnectionService] Ping cycle started - pluginConnected={}, connectedClients={}",
|
||||
pluginManager.isConnected(), connectedClients.size());
|
||||
|
||||
if (!pluginManager.isConnected()) {
|
||||
log.info("[ClientConnectionService] Plugin not connected, skipping ping cycle");
|
||||
return;
|
||||
}
|
||||
|
||||
if (connectedClients.isEmpty()) {
|
||||
log.info("[ClientConnectionService] No connected clients, skipping ping cycle");
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("[ClientConnectionService] Starting ping cycle for {} clients", connectedClients.size());
|
||||
Instant now = Instant.now();
|
||||
|
||||
for (Map.Entry<String, ClientState> entry : connectedClients.entrySet()) {
|
||||
String clientId = entry.getKey();
|
||||
ClientState state = entry.getValue();
|
||||
|
||||
// Check if previous ping timed out
|
||||
if (state.lastPingSent() != null && state.connected()) {
|
||||
Instant expectedPongBy = state.lastPingSent().plusSeconds(pingTimeoutSeconds);
|
||||
boolean pongReceivedAfterPing = state.lastPongReceived() != null
|
||||
&& state.lastPongReceived().isAfter(state.lastPingSent());
|
||||
|
||||
if (now.isAfter(expectedPongBy) && !pongReceivedAfterPing) {
|
||||
// Client did not respond in time - mark as disconnected
|
||||
ClientState disconnectedState = state.withConnected(false);
|
||||
connectedClients.put(clientId, disconnectedState);
|
||||
log.warn("Client timed out, marking as disconnected: clientId={}, userId={}",
|
||||
clientId, state.userId());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Send ping to connected clients
|
||||
if (state.connected()) {
|
||||
log.info("[ClientConnectionService] Sending ping to client: {}", clientId);
|
||||
sendPing(clientId);
|
||||
ClientState updatedState = state.withPingSent(now);
|
||||
connectedClients.put(clientId, updatedState);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a ping message to a specific client.
|
||||
*
|
||||
* @param clientId The target client
|
||||
*/
|
||||
private void sendPing(String clientId) {
|
||||
try {
|
||||
Map<String, Object> pingPayload = Map.of(
|
||||
"type", "ping",
|
||||
"timestamp", Instant.now().toEpochMilli()
|
||||
);
|
||||
|
||||
String json = objectMapper.writeValueAsString(pingPayload);
|
||||
byte[] payload = json.getBytes(StandardCharsets.UTF_8);
|
||||
log.info("[ClientConnectionService] Ping payload for client {}: {}", clientId, json);
|
||||
|
||||
SendOptions options = SendOptions.builder()
|
||||
.qos(1)
|
||||
.retained(false)
|
||||
.build();
|
||||
|
||||
pluginManager.sendToClient(clientId, "ping", payload, options)
|
||||
.whenComplete((result, error) -> {
|
||||
if (error != null) {
|
||||
log.warn("[ClientConnectionService] Failed to send ping to client {}: {}", clientId, error.getMessage());
|
||||
} else {
|
||||
log.info("[ClientConnectionService] Ping sent successfully to client: {}", clientId);
|
||||
}
|
||||
});
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("Error sending ping to client {}: {}", clientId, e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the number of currently connected clients.
|
||||
*
|
||||
* @return Number of connected clients
|
||||
*/
|
||||
public int getConnectedClientCount() {
|
||||
return (int) connectedClients.values().stream()
|
||||
.filter(ClientState::connected)
|
||||
.count();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the total number of registered clients (connected and disconnected).
|
||||
*
|
||||
* @return Total number of registered clients
|
||||
*/
|
||||
public int getTotalClientCount() {
|
||||
return connectedClients.size();
|
||||
}
|
||||
}
|
||||
@@ -20,6 +20,8 @@ spring.mustache.check-template-location=false
|
||||
|
||||
# Launch the default browser when starting the application in development mode
|
||||
vaadin.launch-browser=true
|
||||
# Disable Vaadin Copilot to avoid NullPointerException in dev mode
|
||||
vaadin.copilot.enabled=false
|
||||
# To improve the performance during development.
|
||||
# For more information https://vaadin.com/docs/latest/flow/integrations/spring/configuration#special-configuration-parameters
|
||||
vaadin.allowed-packages=com.vaadin,org.vaadin,de.assecutor.votianlt
|
||||
@@ -78,5 +80,24 @@ app.messaging.plugin.mqtt.username=app
|
||||
app.messaging.plugin.mqtt.password=apppwd
|
||||
app.messaging.plugin.mqtt.client.id=votianlt-server
|
||||
|
||||
# Client Connection Monitoring (Ping/Pong)
|
||||
# Server sends ping to: /client/{clientId}/ping
|
||||
# Client responds to: /server/{clientId}/pong
|
||||
#
|
||||
# Ping JSON (Server -> Client):
|
||||
# {
|
||||
# "type": "ping",
|
||||
# "timestamp": 1702835000000
|
||||
# }
|
||||
#
|
||||
# Pong JSON (Client -> Server):
|
||||
# {
|
||||
# "type": "pong",
|
||||
# "timestamp": 1702835000000
|
||||
# }
|
||||
#
|
||||
app.client.ping.interval-seconds=15
|
||||
app.client.ping.timeout-seconds=5
|
||||
|
||||
# Application Version - automatically set from pom.xml during build
|
||||
app.version=@project.version@
|
||||
Reference in New Issue
Block a user