Erweiterungen
This commit is contained in:
@@ -0,0 +1,31 @@
|
||||
package de.assecutor.votianlt.config;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.SerializationFeature;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Primary;
|
||||
|
||||
/**
|
||||
* Jackson configuration for consistent JSON serialization across the application.
|
||||
* Ensures all date/time fields are serialized as ISO 8601 strings.
|
||||
*/
|
||||
@Configuration
|
||||
public class JacksonConfig {
|
||||
|
||||
/**
|
||||
* Creates a configured ObjectMapper bean that serializes dates as ISO 8601 strings.
|
||||
* This bean is used throughout the application for JSON serialization.
|
||||
*/
|
||||
@Bean
|
||||
@Primary
|
||||
public ObjectMapper objectMapper() {
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
objectMapper.registerModule(new JavaTimeModule());
|
||||
// Serialize dates as ISO 8601 strings instead of timestamps/arrays
|
||||
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
|
||||
return objectMapper;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,7 +28,6 @@ import de.assecutor.votianlt.service.MessageService;
|
||||
import de.assecutor.votianlt.model.JobStatus;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
import de.assecutor.votianlt.mqtt.MqttPublisher;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@@ -72,12 +71,13 @@ public class MessageController {
|
||||
private final EmailService emailService;
|
||||
private final MessageService messageService;
|
||||
private final UserService userService;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
public MessageController(MqttPublisher mqttPublisher, AppUserRepository appUserRepository,
|
||||
AppUserService appUserService, JobRepository jobRepository, CargoItemRepository cargoItemRepository,
|
||||
TaskRepository taskRepository, PhotoRepository photoRepository, BarcodeRepository barcodeRepository,
|
||||
SignatureRepository signatureRepository, CommentRepository commentRepository, JobHistoryService jobHistoryService,
|
||||
EmailService emailService, MessageService messageService, UserService userService) {
|
||||
EmailService emailService, MessageService messageService, UserService userService, ObjectMapper objectMapper) {
|
||||
this.mqttPublisher = mqttPublisher;
|
||||
this.appUserRepository = appUserRepository;
|
||||
this.appUserService = appUserService;
|
||||
@@ -92,6 +92,7 @@ public class MessageController {
|
||||
this.emailService = emailService;
|
||||
this.messageService = messageService;
|
||||
this.userService = userService;
|
||||
this.objectMapper = objectMapper;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -199,8 +200,6 @@ public class MessageController {
|
||||
// Log complete JSON for debugging
|
||||
log.debug("About to serialize {} jobs to JSON for logging", jobsWithRelatedData.size());
|
||||
try {
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
objectMapper.registerModule(new JavaTimeModule());
|
||||
String jsonOutput = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(jobsWithRelatedData);
|
||||
log.info("=== COMPLETE JSON RESPONSE FOR MQTT CLIENT ===");
|
||||
log.info("AppUserId: {}", appUserId);
|
||||
|
||||
@@ -12,7 +12,6 @@ import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.event.EventListener;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
@@ -45,10 +44,9 @@ public class PluginMessagingConfig {
|
||||
private final PluginManager pluginManager;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
public PluginMessagingConfig(PluginManager pluginManager) {
|
||||
public PluginMessagingConfig(PluginManager pluginManager, ObjectMapper objectMapper) {
|
||||
this.pluginManager = pluginManager;
|
||||
this.objectMapper = new ObjectMapper();
|
||||
this.objectMapper.registerModule(new JavaTimeModule());
|
||||
this.objectMapper = objectMapper;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -143,7 +141,11 @@ public class PluginMessagingConfig {
|
||||
pluginManager.registerAckHandler((messageId, payload) -> {
|
||||
try {
|
||||
String json = new String(payload, StandardCharsets.UTF_8);
|
||||
AcknowledgmentMessage ack = objectMapper.readValue(json, AcknowledgmentMessage.class);
|
||||
log.info("[PluginMessagingConfig] Received ACK JSON: {}", json);
|
||||
|
||||
// ACK messages are wrapped in MessageEnvelope
|
||||
MessageEnvelope envelope = objectMapper.readValue(json, MessageEnvelope.class);
|
||||
AcknowledgmentMessage ack = objectMapper.convertValue(envelope.getPayload(), AcknowledgmentMessage.class);
|
||||
deliveryService.handleAcknowledgment(ack);
|
||||
} catch (Exception e) {
|
||||
log.error("[PluginMessagingConfig] Error handling ACK message: {}", e.getMessage(), e);
|
||||
@@ -177,6 +179,7 @@ public class PluginMessagingConfig {
|
||||
private void handleEnvelopedMessage(String clientId, byte[] payload, MessageDeliveryService deliveryService) {
|
||||
try {
|
||||
String json = new String(payload, StandardCharsets.UTF_8);
|
||||
log.info("[PluginMessagingConfig] Received JSON from client {}: {}", clientId, json);
|
||||
|
||||
// Try to parse as envelope first
|
||||
try {
|
||||
@@ -184,7 +187,7 @@ public class PluginMessagingConfig {
|
||||
deliveryService.handleIncomingMessage(envelope);
|
||||
} catch (Exception e) {
|
||||
// If not an envelope, it might be a legacy message - log and skip
|
||||
log.debug("[PluginMessagingConfig] Received non-enveloped message from client {}, skipping", clientId);
|
||||
log.warn("[PluginMessagingConfig] Received non-enveloped message from client {}, skipping. JSON: {}", clientId, json);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[PluginMessagingConfig] Error handling enveloped message from client {}: {}", clientId, e.getMessage(), e);
|
||||
|
||||
@@ -2,7 +2,6 @@ package de.assecutor.votianlt.messaging.delivery;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
import de.assecutor.votianlt.controller.MessageController;
|
||||
import de.assecutor.votianlt.dto.AppLoginRequest;
|
||||
import de.assecutor.votianlt.messaging.model.MessageEnvelope;
|
||||
@@ -23,10 +22,9 @@ public class AcknowledgmentHandler {
|
||||
private final MessageController messageController;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
public AcknowledgmentHandler(@Lazy MessageController messageController) {
|
||||
public AcknowledgmentHandler(@Lazy MessageController messageController, ObjectMapper objectMapper) {
|
||||
this.messageController = messageController;
|
||||
this.objectMapper = new ObjectMapper();
|
||||
this.objectMapper.registerModule(new JavaTimeModule());
|
||||
this.objectMapper = objectMapper;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package de.assecutor.votianlt.messaging.delivery;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
import de.assecutor.votianlt.messaging.model.*;
|
||||
import de.assecutor.votianlt.messaging.plugin.PluginManager;
|
||||
import de.assecutor.votianlt.messaging.plugin.SendOptions;
|
||||
@@ -36,14 +35,14 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService {
|
||||
PendingDeliveryRepository pendingDeliveryRepository,
|
||||
MessageEnvelopeRepository envelopeRepository,
|
||||
AcknowledgmentHandler acknowledgmentHandler,
|
||||
DeliveryConfig config) {
|
||||
DeliveryConfig config,
|
||||
ObjectMapper objectMapper) {
|
||||
this.pluginManager = pluginManager;
|
||||
this.pendingDeliveryRepository = pendingDeliveryRepository;
|
||||
this.envelopeRepository = envelopeRepository;
|
||||
this.acknowledgmentHandler = acknowledgmentHandler;
|
||||
this.config = config;
|
||||
this.objectMapper = new ObjectMapper();
|
||||
this.objectMapper.registerModule(new JavaTimeModule());
|
||||
this.objectMapper = objectMapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -64,6 +63,7 @@ public class MessageDeliveryServiceImpl implements MessageDeliveryService {
|
||||
// Serialize envelope to JSON
|
||||
String json = objectMapper.writeValueAsString(envelope);
|
||||
byte[] envelopeData = json.getBytes(StandardCharsets.UTF_8);
|
||||
log.info("[MessageDelivery] Sending JSON to client {} (type: {}): {}", clientId, messageType, json);
|
||||
|
||||
// Create pending delivery record if acknowledgment is required
|
||||
if (options.isRequiresAck()) {
|
||||
|
||||
@@ -21,8 +21,8 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
* Topic Structure (managed internally):
|
||||
* - Server -> Client: /client/{clientId}/{messageType}
|
||||
* - Client -> Server: /server/{clientId}/{messageType}
|
||||
* - ACK Server -> Client: /client/{clientId}/{messageId}/ack
|
||||
* - ACK Client -> Server: /server/{messageId}/ack
|
||||
* - ACK Server -> Client: /client/{clientId}/ack (messageId in payload)
|
||||
* - ACK Client -> Server: /server/{clientId}/ack (messageId in payload)
|
||||
*/
|
||||
@Slf4j
|
||||
public class MqttMessagingPlugin implements MessagingPlugin {
|
||||
@@ -33,8 +33,8 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
||||
// Topic templates
|
||||
private static final String TOPIC_TO_CLIENT = "/client/%s/%s"; // /client/{clientId}/{messageType}
|
||||
private static final String TOPIC_FROM_CLIENT = "/server/%s/%s"; // /server/{clientId}/{messageType}
|
||||
private static final String TOPIC_ACK_TO_CLIENT = "/client/%s/%s/ack"; // /client/{clientId}/{messageId}/ack
|
||||
private static final String TOPIC_ACK_FROM_CLIENT = "/server/%s/ack"; // /server/{messageId}/ack
|
||||
private static final String TOPIC_ACK_TO_CLIENT = "/client/%s/ack"; // /client/{clientId}/ack (messageId in payload)
|
||||
private static final String TOPIC_ACK_FROM_CLIENT = "/server/%s/ack"; // /server/{clientId}/ack (messageId in payload)
|
||||
|
||||
// Subscription patterns
|
||||
private static final String PATTERN_FROM_CLIENT = "/server/+/%s"; // /server/+/{messageType}
|
||||
@@ -205,7 +205,7 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
||||
return CompletableFuture.failedFuture(new PluginException("MQTT client is not connected"));
|
||||
}
|
||||
|
||||
String topic = String.format(TOPIC_ACK_TO_CLIENT, clientId, messageId);
|
||||
String topic = String.format(TOPIC_ACK_TO_CLIENT, clientId);
|
||||
log.debug("[MqttPlugin] Sending ACK to client {} for message {} on topic: {}", clientId, messageId, topic);
|
||||
|
||||
return sendToTopic(topic, payload, options);
|
||||
@@ -351,7 +351,7 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
||||
|
||||
/**
|
||||
* Handle ACK message from client.
|
||||
* Topic format: /server/{messageId}/ack
|
||||
* Topic format: /server/{clientId}/ack (messageId in payload)
|
||||
*/
|
||||
private void handleAckMessage(String topic, byte[] payload) {
|
||||
if (ackHandler == null) {
|
||||
@@ -359,20 +359,71 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
||||
return;
|
||||
}
|
||||
|
||||
// Extract messageId from topic: /server/{messageId}/ack
|
||||
// Extract clientId from topic: /server/{clientId}/ack
|
||||
String[] parts = topic.split("/");
|
||||
if (parts.length >= 4) {
|
||||
String messageId = parts[2]; // messageId is at index 2
|
||||
log.debug("[MqttPlugin] Routing ACK for message: {}", messageId);
|
||||
ackHandler.onAckReceived(messageId, payload);
|
||||
String clientId = parts[2]; // clientId is at index 2
|
||||
|
||||
// Extract messageId from payload
|
||||
String payloadStr = new String(payload, StandardCharsets.UTF_8);
|
||||
String messageId = extractMessageIdFromPayload(payloadStr);
|
||||
|
||||
if (messageId != null) {
|
||||
log.debug("[MqttPlugin] Routing ACK for message: {} from client: {}", messageId, clientId);
|
||||
ackHandler.onAckReceived(messageId, payload);
|
||||
} else {
|
||||
log.warn("[MqttPlugin] Could not extract messageId from ACK payload: {}", payloadStr);
|
||||
}
|
||||
} else {
|
||||
log.warn("[MqttPlugin] Invalid ACK topic format: {}", topic);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract messageId from ACK payload.
|
||||
* Expected payload format: JSON with "messageId" field, e.g., {"messageId": "abc-123"}
|
||||
* or plain messageId string.
|
||||
*/
|
||||
private String extractMessageIdFromPayload(String payload) {
|
||||
if (payload == null || payload.isBlank()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
payload = payload.trim();
|
||||
|
||||
// Try to extract from JSON format: {"messageId": "..."}
|
||||
if (payload.startsWith("{")) {
|
||||
// Simple JSON parsing for messageId field
|
||||
int keyIndex = payload.indexOf("\"messageId\"");
|
||||
if (keyIndex == -1) {
|
||||
keyIndex = payload.indexOf("'messageId'");
|
||||
}
|
||||
if (keyIndex >= 0) {
|
||||
int colonIndex = payload.indexOf(":", keyIndex);
|
||||
if (colonIndex >= 0) {
|
||||
int valueStart = payload.indexOf("\"", colonIndex);
|
||||
if (valueStart == -1) {
|
||||
valueStart = payload.indexOf("'", colonIndex);
|
||||
}
|
||||
if (valueStart >= 0) {
|
||||
char quote = payload.charAt(valueStart);
|
||||
int valueEnd = payload.indexOf(quote, valueStart + 1);
|
||||
if (valueEnd > valueStart) {
|
||||
return payload.substring(valueStart + 1, valueEnd);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If not JSON, treat the entire payload as the messageId
|
||||
return payload;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle client message.
|
||||
* Topic format: /server/{clientId}/{messageType} or /server/{messageType} (for login)
|
||||
* messageType can contain slashes, e.g., "jobs/assigned"
|
||||
*/
|
||||
private void handleClientMessage(String topic, byte[] payload) {
|
||||
// Extract clientId and messageType from topic
|
||||
@@ -391,10 +442,12 @@ public class MqttMessagingPlugin implements MessagingPlugin {
|
||||
return;
|
||||
}
|
||||
|
||||
// Handle /server/{clientId}/{messageType}
|
||||
// Handle /server/{clientId}/{messageType} where messageType can contain slashes
|
||||
if (parts.length >= 4) {
|
||||
String clientId = parts[2];
|
||||
String messageType = parts[3];
|
||||
// Join all parts from index 3 onwards to form the full messageType
|
||||
// e.g., /server/clientId/jobs/assigned -> messageType = "jobs/assigned"
|
||||
String messageType = String.join("/", java.util.Arrays.copyOfRange(parts, 3, parts.length));
|
||||
|
||||
ClientMessageHandler handler = messageHandlers.get(messageType);
|
||||
if (handler != null) {
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package de.assecutor.votianlt.mqtt;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
import de.assecutor.votianlt.messaging.delivery.MessageDeliveryService;
|
||||
import de.assecutor.votianlt.messaging.model.DeliveryOptions;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -31,10 +30,9 @@ class MqttPublisherImpl implements MqttPublisher {
|
||||
private final ObjectMapper objectMapper;
|
||||
private final MessageDeliveryService deliveryService;
|
||||
|
||||
public MqttPublisherImpl(@Lazy MessageDeliveryService deliveryService) {
|
||||
public MqttPublisherImpl(@Lazy MessageDeliveryService deliveryService, ObjectMapper objectMapper) {
|
||||
this.deliveryService = deliveryService;
|
||||
this.objectMapper = new ObjectMapper();
|
||||
this.objectMapper.registerModule(new JavaTimeModule());
|
||||
this.objectMapper = objectMapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -19,9 +19,9 @@ public class JobHistoryService {
|
||||
private final JobHistoryRepository jobHistoryRepository;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
public JobHistoryService(JobHistoryRepository jobHistoryRepository) {
|
||||
public JobHistoryService(JobHistoryRepository jobHistoryRepository, ObjectMapper objectMapper) {
|
||||
this.jobHistoryRepository = jobHistoryRepository;
|
||||
this.objectMapper = new ObjectMapper();
|
||||
this.objectMapper = objectMapper;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user