MQTT
This commit is contained in:
155
MQTT_README.md
Normal file
155
MQTT_README.md
Normal file
@@ -0,0 +1,155 @@
|
||||
# VOTIANLT MQTT Messaging API
|
||||
|
||||
This document describes how mobile/Flutter apps should communicate with the backend using MQTT. It replaces the previous STOMP/WebSocket communication.
|
||||
|
||||
Broker: tcp://192.168.180.26:1883 (MQTT v5)
|
||||
QoS: 2 (exactly once)
|
||||
Retain: Enabled for critical topics (see below), otherwise not retained
|
||||
Payloads: JSON (UTF‑8)
|
||||
|
||||
Connection
|
||||
- MQTT clientId: choose a stable, unique per-device id (e.g., app-<uuid>)
|
||||
- Clean session: false (recommended for guaranteed delivery). The broker will queue QoS>0 messages while the app is offline.
|
||||
- Authentication: currently none (adjust if needed)
|
||||
|
||||
Topic Naming (v1/*)
|
||||
- v1/app/<deviceId>/auth/login (App -> Server)
|
||||
- v1/users/<username>/notifications (Server -> App)
|
||||
- v1/broadcasts (Server -> App)
|
||||
- v1/app/<deviceId>/jobs/assigned (App -> Server request)
|
||||
- v1/app/<deviceId>/job/status (App -> Server)
|
||||
- v1/app/<deviceId>/device/location (App -> Server)
|
||||
- v1/tasks/<taskId> (Server -> App events for a single task)
|
||||
- v1/task-updates (Server -> App general task events) [optional]
|
||||
|
||||
General pattern
|
||||
- Requests from apps go under v1/app/<deviceId>/...
|
||||
- Server responses and events are published to either a user‑ or task‑scoped topic as listed above.
|
||||
|
||||
1) Authentication (App -> Server)
|
||||
Topic: v1/app/<deviceId>/auth/login
|
||||
Payload request:
|
||||
{
|
||||
"email": "user@example.com",
|
||||
"password": "secret"
|
||||
}
|
||||
|
||||
Response (Server -> App)
|
||||
Topic: v1/users/<username-or-appUserId>/notifications
|
||||
Payload:
|
||||
{
|
||||
"type": "auth",
|
||||
"success": true,
|
||||
"message": "Anmeldung erfolgreich",
|
||||
"appUserId": "<ObjectId>"
|
||||
}
|
||||
|
||||
2) Job status update (App -> Server)
|
||||
Topic: v1/app/<deviceId>/job/status
|
||||
Payload request (example):
|
||||
{
|
||||
"jobId": "<ObjectId>",
|
||||
"status": "ON_ROUTE",
|
||||
"note": "...",
|
||||
"timestamp": "2025-09-13T22:00:00"
|
||||
}
|
||||
|
||||
Server may publish derived updates to:
|
||||
- v1/broadcasts (if global) or
|
||||
- v1/users/<username>/notifications (if per user)
|
||||
|
||||
3) Device location (App -> Server)
|
||||
Topic: v1/app/<deviceId>/device/location
|
||||
Payload:
|
||||
{
|
||||
"lat": 48.12345,
|
||||
"lon": 11.54321,
|
||||
"accuracy": 5.4,
|
||||
"timestamp": "2025-09-13T22:00:00"
|
||||
}
|
||||
|
||||
4) Assigned jobs request (App -> Server)
|
||||
Topic: v1/app/<deviceId>/jobs/assigned
|
||||
Payload request:
|
||||
{
|
||||
"appUserId": "<ObjectId>"
|
||||
}
|
||||
|
||||
Response (Server -> App)
|
||||
Topic: v1/users/<appUserId>/notifications
|
||||
Payload:
|
||||
{
|
||||
"type": "jobs",
|
||||
"jobs": [ { /* JobWithRelatedDataDTO */ } ]
|
||||
}
|
||||
|
||||
5) Task completion events (Server -> App)
|
||||
- When a task is completed (CONFIRMATION, SIGNATURE, BARCODE, TODOLIST, PHOTO), the server publishes an event to the task‑scoped topic.
|
||||
Topic: v1/tasks/<taskId>
|
||||
Payload:
|
||||
{
|
||||
"event": "taskCompleted",
|
||||
"taskId": "<taskId>",
|
||||
"jobId": "<jobId>",
|
||||
"taskType": "PHOTO|CONFIRMATION|...",
|
||||
"completed": true,
|
||||
"completedAt": "2025-09-13T22:05:00",
|
||||
"completedBy": "driver01",
|
||||
"note": "optional"
|
||||
}
|
||||
|
||||
6) Photo uploads (MQTT)
|
||||
- Apps send photos as base64 strings within the MQTT payload when reporting PHOTO task completion.
|
||||
|
||||
Topic (App -> Server): v1/app/<deviceId>/task/photo/completed
|
||||
Payload:
|
||||
{
|
||||
"taskId": "<taskId>",
|
||||
"completedBy": "driver01",
|
||||
"note": "optional",
|
||||
"extraData": {
|
||||
"photos": ["<base64-1>", "<base64-2>"],
|
||||
"count": 2
|
||||
}
|
||||
}
|
||||
|
||||
Server behavior:
|
||||
- Saves the photos in the photos collection, marks the task as completed, and publishes an event:
|
||||
Topic (Server -> App): v1/tasks/<taskId>
|
||||
Payload:
|
||||
{
|
||||
"event": "taskCompleted",
|
||||
"taskId": "<taskId>",
|
||||
"jobId": "<jobId>",
|
||||
"taskType": "PHOTO",
|
||||
"completed": true,
|
||||
"completedAt": "...",
|
||||
"completedBy": "driver01",
|
||||
"note": "optional"
|
||||
}
|
||||
|
||||
7) Broadcasts and notifications (Server -> App)
|
||||
- Broadcasts: v1/broadcasts
|
||||
- User notifications: v1/users/<username>/notifications
|
||||
Payload example:
|
||||
{
|
||||
"type": "broadcast|notification",
|
||||
"message": "...",
|
||||
"timestamp": "2025-09-13T22:10:00"
|
||||
}
|
||||
|
||||
Quality of Service & Retain
|
||||
- QoS 2 (exactly once) is used by default server side for both inbound subscriptions and outbound publications.
|
||||
- Retained messages are disabled by default to avoid stale updates.
|
||||
|
||||
Error Handling
|
||||
- Server logs errors; apps should implement local retries for transient failures.
|
||||
- For request/response patterns over MQTT, include correlationId in payloads if you need strict pairing.
|
||||
|
||||
Security
|
||||
- If authentication is required at broker level, configure username/password.
|
||||
- Consider using TLS if the broker supports it.
|
||||
|
||||
Migration notes
|
||||
- Previous STOMP destinations like /topic/tasks/{taskId} are now MQTT topics v1/tasks/<taskId>.
|
||||
- Photos for PHOTO tasks must be embedded in the MQTT message (extraData.photos) published to v1/app/<deviceId>/task/photo/completed. The old HTTP endpoints have been removed.
|
||||
31
pom.xml
31
pom.xml
@@ -55,6 +55,22 @@
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-actuator</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- Spring Integration for MQTT (managed by Spring Boot BOM) -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-integration</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-mqtt</artifactId>
|
||||
</dependency>
|
||||
<!-- HiveMQ MQTT v5 Client -->
|
||||
<dependency>
|
||||
<groupId>com.hivemq</groupId>
|
||||
<artifactId>hivemq-mqtt-client</artifactId>
|
||||
<version>1.3.3</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-validation</artifactId>
|
||||
@@ -93,22 +109,7 @@
|
||||
<version>2.0.1</version>
|
||||
</dependency>
|
||||
|
||||
<!-- WebSocket and STOMP Dependencies for messaging -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-websocket</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-messaging</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- Zeroconf mDNS (JmDNS) -->
|
||||
<dependency>
|
||||
<groupId>org.jmdns</groupId>
|
||||
<artifactId>jmdns</artifactId>
|
||||
<version>3.6.1</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
|
||||
15
src/main/java/de/assecutor/votianlt/config/MqttConfig.java
Normal file
15
src/main/java/de/assecutor/votianlt/config/MqttConfig.java
Normal file
@@ -0,0 +1,15 @@
|
||||
package de.assecutor.votianlt.config;
|
||||
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* MQTT configuration placeholder.
|
||||
*
|
||||
* In environments where Spring Integration MQTT dependencies are not available,
|
||||
* this class remains empty to allow the application to compile and run without
|
||||
* MQTT wiring. The business code uses a no-op MqttPublisher that logs messages.
|
||||
*/
|
||||
@Configuration
|
||||
public class MqttConfig {
|
||||
public static final String MQTT_BROKER_URI = "tcp://192.168.180.26:1883";
|
||||
}
|
||||
@@ -0,0 +1,58 @@
|
||||
package de.assecutor.votianlt.config;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
@ConfigurationProperties(prefix = "app.mqtt")
|
||||
public class MqttProperties {
|
||||
/** Enable/disable MQTT subsystem */
|
||||
private boolean enabled = true;
|
||||
/** Broker URI, e.g. tcp://192.168.180.26:1883 */
|
||||
private String brokerUri = "tcp://192.168.180.26:1883";
|
||||
/** ClientId for the server */
|
||||
private String clientId = "server";
|
||||
/** Optional username */
|
||||
private String username;
|
||||
/** Optional password */
|
||||
private String password;
|
||||
/** MQTT v5 clean start flag */
|
||||
private boolean cleanStart = false;
|
||||
/** Session expiry interval in seconds (0 = expire immediately) */
|
||||
private long sessionExpiryInterval = 24 * 60 * 60; // 1 day
|
||||
/** Keep alive in seconds */
|
||||
private int keepAlive = 30;
|
||||
/** Max inflight messages */
|
||||
private int maxInflight = 50;
|
||||
/** Automatic reconnect */
|
||||
private boolean automaticReconnect = true;
|
||||
/** Default QoS to use for publishing */
|
||||
private int defaultQos = 2;
|
||||
/** Default retained flag for publishing */
|
||||
private boolean defaultRetained = false;
|
||||
|
||||
public boolean isEnabled() { return enabled; }
|
||||
public void setEnabled(boolean enabled) { this.enabled = enabled; }
|
||||
public String getBrokerUri() { return brokerUri; }
|
||||
public void setBrokerUri(String brokerUri) { this.brokerUri = brokerUri; }
|
||||
public String getClientId() { return clientId; }
|
||||
public void setClientId(String clientId) { this.clientId = clientId; }
|
||||
public String getUsername() { return username; }
|
||||
public void setUsername(String username) { this.username = username; }
|
||||
public String getPassword() { return password; }
|
||||
public void setPassword(String password) { this.password = password; }
|
||||
public boolean isCleanStart() { return cleanStart; }
|
||||
public void setCleanStart(boolean cleanStart) { this.cleanStart = cleanStart; }
|
||||
public long getSessionExpiryInterval() { return sessionExpiryInterval; }
|
||||
public void setSessionExpiryInterval(long sessionExpiryInterval) { this.sessionExpiryInterval = sessionExpiryInterval; }
|
||||
public int getKeepAlive() { return keepAlive; }
|
||||
public void setKeepAlive(int keepAlive) { this.keepAlive = keepAlive; }
|
||||
public int getMaxInflight() { return maxInflight; }
|
||||
public void setMaxInflight(int maxInflight) { this.maxInflight = maxInflight; }
|
||||
public boolean isAutomaticReconnect() { return automaticReconnect; }
|
||||
public void setAutomaticReconnect(boolean automaticReconnect) { this.automaticReconnect = automaticReconnect; }
|
||||
public int getDefaultQos() { return defaultQos; }
|
||||
public void setDefaultQos(int defaultQos) { this.defaultQos = defaultQos; }
|
||||
public boolean isDefaultRetained() { return defaultRetained; }
|
||||
public void setDefaultRetained(boolean defaultRetained) { this.defaultRetained = defaultRetained; }
|
||||
}
|
||||
@@ -1,92 +0,0 @@
|
||||
package de.assecutor.votianlt.config;
|
||||
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
|
||||
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
|
||||
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
|
||||
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
|
||||
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
|
||||
import org.springframework.messaging.simp.config.ChannelRegistration;
|
||||
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;
|
||||
import org.springframework.messaging.converter.MessageConverter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* WebSocket configuration for STOMP messaging.
|
||||
* Enables real-time communication with client applications.
|
||||
*/
|
||||
@Configuration
|
||||
@EnableWebSocketMessageBroker
|
||||
@Slf4j
|
||||
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
|
||||
|
||||
|
||||
@Override
|
||||
public void configureMessageBroker(MessageBrokerRegistry config) {
|
||||
// Enable a simple memory-based message broker to carry messages back to client
|
||||
// on destinations prefixed with "/topic" and "/queue"
|
||||
config.enableSimpleBroker("/topic", "/queue");
|
||||
|
||||
// Designate the "/app" prefix for messages that are bound to methods
|
||||
// annotated with @MessageMapping
|
||||
config.setApplicationDestinationPrefixes("/app");
|
||||
|
||||
// Set user destination prefix for user-specific messages
|
||||
config.setUserDestinationPrefix("/user");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configureClientInboundChannel(ChannelRegistration registration) {
|
||||
// Increase message size limits for large payloads (like base64 photos)
|
||||
registration.taskExecutor().corePoolSize(4);
|
||||
registration.taskExecutor().maxPoolSize(8);
|
||||
registration.taskExecutor().keepAliveSeconds(60);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configureClientOutboundChannel(ChannelRegistration registration) {
|
||||
// Configure outbound channel for better performance with large messages
|
||||
registration.taskExecutor().corePoolSize(4);
|
||||
registration.taskExecutor().maxPoolSize(8);
|
||||
registration.taskExecutor().keepAliveSeconds(60);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
|
||||
// Use framework defaults (no custom large-message settings)
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean configureMessageConverters(List<MessageConverter> messageConverters) {
|
||||
// Use default message converters (no custom large-payload converter)
|
||||
return false; // keep default converters
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerStompEndpoints(StompEndpointRegistry registry) {
|
||||
log.info("=== REGISTERING WEBSOCKET ENDPOINTS ===");
|
||||
|
||||
// Register the "/ws" endpoint for WebSocket connections with SockJS fallback
|
||||
registry.addEndpoint("/ws")
|
||||
.setAllowedOriginPatterns("*")
|
||||
.addInterceptors(new HttpSessionHandshakeInterceptor())
|
||||
.withSockJS()
|
||||
.setHeartbeatTime(25000)
|
||||
.setDisconnectDelay(5000)
|
||||
.setSessionCookieNeeded(false);
|
||||
|
||||
// Plain WebSocket endpoint without SockJS for native WebSocket clients (Flutter, mobile apps)
|
||||
registry.addEndpoint("/websocket")
|
||||
.setAllowedOriginPatterns("*")
|
||||
.addInterceptors(new HttpSessionHandshakeInterceptor());
|
||||
|
||||
// Additional endpoint specifically for mobile/Flutter clients that might have URL issues
|
||||
registry.addEndpoint("/stomp")
|
||||
.setAllowedOriginPatterns("*")
|
||||
.addInterceptors(new HttpSessionHandshakeInterceptor());
|
||||
|
||||
log.info("WebSocket endpoints registered: /ws (with SockJS), /websocket, /stomp");
|
||||
}
|
||||
}
|
||||
@@ -1,47 +0,0 @@
|
||||
package de.assecutor.votianlt.config;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.messaging.converter.DefaultContentTypeResolver;
|
||||
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
|
||||
import org.springframework.messaging.converter.MessageConverter;
|
||||
import org.springframework.util.MimeTypeUtils;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
import com.fasterxml.jackson.databind.SerializationFeature;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Additional configuration for handling large WebSocket messages.
|
||||
* This configuration specifically addresses JSON parsing limits for large payloads.
|
||||
*/
|
||||
@Configuration
|
||||
public class WebSocketMessageSizeConfig {
|
||||
|
||||
/**
|
||||
* Configure Jackson ObjectMapper to handle large JSON strings (like base64 photos).
|
||||
*/
|
||||
@Bean("webSocketObjectMapper")
|
||||
public ObjectMapper webSocketObjectMapper() {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
mapper.registerModule(new JavaTimeModule());
|
||||
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
|
||||
return mapper;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure message converter to use our custom ObjectMapper.
|
||||
*/
|
||||
@Bean
|
||||
public MappingJackson2MessageConverter messageConverter() {
|
||||
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
|
||||
converter.setObjectMapper(webSocketObjectMapper());
|
||||
|
||||
DefaultContentTypeResolver resolver = new DefaultContentTypeResolver();
|
||||
resolver.setDefaultMimeType(MimeTypeUtils.APPLICATION_JSON);
|
||||
converter.setContentTypeResolver(resolver);
|
||||
|
||||
return converter;
|
||||
}
|
||||
}
|
||||
@@ -17,11 +17,8 @@ import de.assecutor.votianlt.repository.TaskRepository;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
import org.springframework.messaging.handler.annotation.MessageMapping;
|
||||
import org.springframework.messaging.handler.annotation.SendTo;
|
||||
import org.springframework.messaging.simp.SimpMessagingTemplate;
|
||||
import org.springframework.messaging.simp.annotation.SendToUser;
|
||||
import org.springframework.stereotype.Controller;
|
||||
import de.assecutor.votianlt.mqtt.MqttPublisher;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
@@ -33,12 +30,12 @@ import java.util.Map;
|
||||
* STOMP message controller for handling real-time communication with apps.
|
||||
* Provides endpoints for sending and receiving messages via WebSocket/STOMP.
|
||||
*/
|
||||
@Controller
|
||||
@Component
|
||||
@Slf4j
|
||||
public class MessageController {
|
||||
|
||||
@Autowired
|
||||
private SimpMessagingTemplate messagingTemplate;
|
||||
private MqttPublisher mqttPublisher;
|
||||
|
||||
@Autowired
|
||||
private AppUserRepository appUserRepository;
|
||||
@@ -61,8 +58,6 @@ public class MessageController {
|
||||
/**
|
||||
* Handles messages sent to /app/message and broadcasts them to all subscribers of /topic/messages
|
||||
*/
|
||||
@MessageMapping("/message")
|
||||
@SendTo("/topic/messages")
|
||||
public Map<String, Object> handleMessage(Map<String, Object> message) {
|
||||
log.error("=== ANY MESSAGE RECEIVED === STOMP Endpoint '/app/message' called");
|
||||
log.info("STOMP Endpoint '/app/message' called with data: {}", message);
|
||||
@@ -78,8 +73,6 @@ public class MessageController {
|
||||
/**
|
||||
* Handles job status updates from apps
|
||||
*/
|
||||
@MessageMapping("/job/status")
|
||||
@SendTo("/topic/job-updates")
|
||||
public Map<String, Object> handleJobStatusUpdate(Map<String, Object> jobUpdate) {
|
||||
log.info("STOMP Endpoint '/app/job/status' called with data: {}", jobUpdate);
|
||||
|
||||
@@ -93,8 +86,6 @@ public class MessageController {
|
||||
/**
|
||||
* Handles device location updates from mobile apps
|
||||
*/
|
||||
@MessageMapping("/device/location")
|
||||
@SendTo("/topic/device-locations")
|
||||
public Map<String, Object> handleDeviceLocation(Map<String, Object> locationUpdate) {
|
||||
log.info("STOMP Endpoint '/app/device/location' called with data: {}", locationUpdate);
|
||||
|
||||
@@ -116,7 +107,7 @@ public class MessageController {
|
||||
);
|
||||
|
||||
log.info("Sending notification to user '{}': {}", username, notification);
|
||||
messagingTemplate.convertAndSendToUser(username, "/queue/notifications", notification);
|
||||
mqttPublisher.publishAsJson("v1/users/" + username + "/notifications", notification, true);
|
||||
log.info("Notification sent to '/user/{}/queue/notifications'", username);
|
||||
}
|
||||
|
||||
@@ -131,7 +122,7 @@ public class MessageController {
|
||||
);
|
||||
|
||||
log.info("Sending broadcast message: {}", broadcast);
|
||||
messagingTemplate.convertAndSend("/topic/broadcasts", broadcast);
|
||||
mqttPublisher.publishAsJson("v1/broadcasts", broadcast);
|
||||
log.info("Broadcast message sent to '/topic/broadcasts'");
|
||||
}
|
||||
|
||||
@@ -140,8 +131,6 @@ public class MessageController {
|
||||
* Client sends to /app/auth/login with payload { email, password }.
|
||||
* The response is sent back to the requesting user on /user/queue/auth
|
||||
*/
|
||||
@MessageMapping("/auth/login")
|
||||
@SendToUser("/queue/auth")
|
||||
public AppLoginResponse handleAppLogin(AppLoginRequest request) {
|
||||
log.info("STOMP Endpoint '/app/auth/login' called with email: {}",
|
||||
request != null ? request.getEmail() : "null");
|
||||
@@ -181,8 +170,6 @@ public class MessageController {
|
||||
* Client sends to /app/jobs/assigned with payload { appUserId }.
|
||||
* The response is sent back to the requesting user on /user/queue/jobs
|
||||
*/
|
||||
@MessageMapping("/jobs/assigned")
|
||||
@SendToUser("/queue/jobs")
|
||||
public List<JobWithRelatedDataDTO> handleGetAssignedJobs(Map<String, Object> request) {
|
||||
log.info("STOMP Endpoint '/app/jobs/assigned' called with data: {}", request);
|
||||
log.debug("Starting to process jobs request for STOMP endpoint");
|
||||
@@ -245,8 +232,6 @@ public class MessageController {
|
||||
* Broadcasts to /topic/task-updates and /topic/tasks/{taskId}.
|
||||
* This endpoint accepts any task type (fallback for GENERIC or unknown types).
|
||||
*/
|
||||
@MessageMapping("/task/completed")
|
||||
@SendTo("/topic/task-updates")
|
||||
public Map<String, Object> handleTaskCompleted(Map<String, Object> payload) {
|
||||
log.info("STOMP Endpoint '/app/task/completed' called with data: {}", payload);
|
||||
return processTaskCompletion(payload, null); // null means accept any task type
|
||||
@@ -257,8 +242,6 @@ public class MessageController {
|
||||
* Client sends to /app/task/confirm with payload { taskId, completedBy?, note? }.
|
||||
* Broadcasts to /topic/task-updates and /topic/tasks/{taskId}.
|
||||
*/
|
||||
@MessageMapping("/task/confirm")
|
||||
@SendTo("/topic/task-updates")
|
||||
public Map<String, Object> handleTaskConfirmation(Map<String, Object> payload) {
|
||||
log.info("STOMP Endpoint '/app/task/confirm' called with data: {}", payload);
|
||||
return processTaskCompletion(payload, "CONFIRMATION");
|
||||
@@ -270,8 +253,6 @@ public class MessageController {
|
||||
* The extraData contains: { photos: base64List, count: base64List.length }
|
||||
* Broadcasts to /topic/task-updates and /topic/tasks/{taskId}.
|
||||
*/
|
||||
@MessageMapping("/task/photo/completed")
|
||||
@SendTo("/topic/task-updates")
|
||||
public Map<String, Object> handlePhotoTaskCompleted(Map<String, Object> payload) {
|
||||
log.info("STOMP Endpoint '/app/task/photo/completed' called");
|
||||
return processPhotoTaskCompletion(payload);
|
||||
@@ -282,8 +263,6 @@ public class MessageController {
|
||||
* Client sends to /app/task/signature/completed with payload { taskId, completedBy?, note? }.
|
||||
* Broadcasts to /topic/task-updates and /topic/tasks/{taskId}.
|
||||
*/
|
||||
@MessageMapping("/task/signature/completed")
|
||||
@SendTo("/topic/task-updates")
|
||||
public Map<String, Object> handleSignatureTaskCompleted(Map<String, Object> payload) {
|
||||
log.info("STOMP Endpoint '/app/task/signature/completed' called with data: {}", payload);
|
||||
return processTaskCompletion(payload, "SIGNATURE");
|
||||
@@ -294,8 +273,6 @@ public class MessageController {
|
||||
* Client sends to /app/task/barcode/completed with payload { taskId, completedBy?, note? }.
|
||||
* Broadcasts to /topic/task-updates and /topic/tasks/{taskId}.
|
||||
*/
|
||||
@MessageMapping("/task/barcode/completed")
|
||||
@SendTo("/topic/task-updates")
|
||||
public Map<String, Object> handleBarcodeTaskCompleted(Map<String, Object> payload) {
|
||||
log.info("STOMP Endpoint '/app/task/barcode/completed' called with data: {}", payload);
|
||||
return processTaskCompletion(payload, "BARCODE");
|
||||
@@ -306,8 +283,6 @@ public class MessageController {
|
||||
* Client sends to /app/task/todolist/completed with payload { taskId, completedBy?, note? }.
|
||||
* Broadcasts to /topic/task-updates and /topic/tasks/{taskId}.
|
||||
*/
|
||||
@MessageMapping("/task/todolist/completed")
|
||||
@SendTo("/topic/task-updates")
|
||||
public Map<String, Object> handleTodolistTaskCompleted(Map<String, Object> payload) {
|
||||
log.info("STOMP Endpoint '/app/task/todolist/completed' called with data: {}", payload);
|
||||
return processTaskCompletion(payload, "TODOLIST");
|
||||
@@ -395,8 +370,8 @@ public class MessageController {
|
||||
event.put("event", "taskCompleted");
|
||||
event.put("taskType", task.getTaskType());
|
||||
|
||||
// Send specific task topic
|
||||
messagingTemplate.convertAndSend("/topic/tasks/" + task.getIdAsString(), event);
|
||||
// Publish to MQTT task topic
|
||||
mqttPublisher.publishAsJson("v1/tasks/" + task.getIdAsString(), event);
|
||||
|
||||
response.put("success", true);
|
||||
response.putAll(event);
|
||||
@@ -469,8 +444,8 @@ public class MessageController {
|
||||
event.put("event", "taskCompleted");
|
||||
event.put("taskType", task.getTaskType());
|
||||
|
||||
// Send specific task topic
|
||||
messagingTemplate.convertAndSend("/topic/tasks/" + task.getIdAsString(), event);
|
||||
// Publish to MQTT task topic
|
||||
mqttPublisher.publishAsJson("v1/tasks/" + task.getIdAsString(), event);
|
||||
|
||||
response.put("success", true);
|
||||
response.putAll(event);
|
||||
|
||||
@@ -1,258 +0,0 @@
|
||||
package de.assecutor.votianlt.controller;
|
||||
|
||||
import de.assecutor.votianlt.model.Photo;
|
||||
import de.assecutor.votianlt.model.task.BaseTask;
|
||||
import de.assecutor.votianlt.repository.PhotoRepository;
|
||||
import de.assecutor.votianlt.repository.TaskRepository;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.bson.types.ObjectId;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.messaging.simp.SimpMessagingTemplate;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
import org.springframework.web.multipart.MultipartFile;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.Base64;
|
||||
|
||||
/**
|
||||
* REST endpoint for uploading photos for PHOTO tasks via HTTP POST instead of STOMP payload.
|
||||
*
|
||||
* Provides two content types on the same path:
|
||||
* - multipart/form-data: files[] (one or many images), optional completedBy, note
|
||||
* - application/json: { photos: [base64], completedBy?, note? }
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping("/api")
|
||||
@CrossOrigin(origins = "*")
|
||||
@Slf4j
|
||||
public class PhotoUploadController {
|
||||
|
||||
@Autowired
|
||||
private TaskRepository taskRepository;
|
||||
|
||||
@Autowired
|
||||
private PhotoRepository photoRepository;
|
||||
|
||||
@Autowired
|
||||
private SimpMessagingTemplate messagingTemplate;
|
||||
|
||||
@PostMapping(path = "/tasks/{taskId}/photos", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
|
||||
public ResponseEntity<Map<String, Object>> uploadPhotosMultipart(
|
||||
@PathVariable("taskId") String taskId,
|
||||
@RequestParam(value = "files") List<MultipartFile> files,
|
||||
@RequestParam(value = "completedBy", required = false) String completedBy,
|
||||
@RequestParam(value = "note", required = false) String note
|
||||
) {
|
||||
Map<String, Object> response = initResponse("photoUploadAck");
|
||||
if (!StringUtils.hasText(taskId)) {
|
||||
return badRequest(response, "taskId ist erforderlich");
|
||||
}
|
||||
if (files == null || files.isEmpty()) {
|
||||
return badRequest(response, "Mindestens eine Bilddatei (files) ist erforderlich");
|
||||
}
|
||||
try {
|
||||
ObjectId taskObjectId = new ObjectId(taskId);
|
||||
Optional<BaseTask> opt = taskRepository.findById(taskObjectId);
|
||||
if (opt.isEmpty()) {
|
||||
return notFound(response, "Task nicht gefunden");
|
||||
}
|
||||
BaseTask task = opt.get();
|
||||
if (!"PHOTO".equals(task.getTaskType())) {
|
||||
return badRequest(response, "Task-Typ stimmt nicht. Erwartet: PHOTO, Gefunden: " + task.getTaskType());
|
||||
}
|
||||
|
||||
// Convert files to base64 strings to keep storage compatible with existing Photo entity
|
||||
List<String> base64Photos = new ArrayList<>();
|
||||
for (MultipartFile file : files) {
|
||||
if (file == null || file.isEmpty()) continue;
|
||||
String base64 = Base64.getEncoder().encodeToString(file.getBytes());
|
||||
base64Photos.add(base64);
|
||||
}
|
||||
if (base64Photos.isEmpty()) {
|
||||
return badRequest(response, "Die übermittelten Dateien sind leer");
|
||||
}
|
||||
|
||||
Photo photo = new Photo(task.getJobId(), task.getId(), base64Photos, completedBy);
|
||||
photoRepository.save(photo);
|
||||
|
||||
// Build success response
|
||||
response.put("success", true);
|
||||
response.put("photoId", photo.getIdAsString());
|
||||
response.put("photosCount", base64Photos.size());
|
||||
response.put("taskId", task.getIdAsString());
|
||||
response.put("jobId", task.getJobIdAsString());
|
||||
|
||||
// Optionally broadcast a small event to task topic
|
||||
Map<String, Object> event = new HashMap<>();
|
||||
event.put("event", "photoUploaded");
|
||||
event.put("taskId", task.getIdAsString());
|
||||
event.put("jobId", task.getJobIdAsString());
|
||||
event.put("photosCount", base64Photos.size());
|
||||
event.put("timestamp", now());
|
||||
messagingTemplate.convertAndSend("/topic/tasks/" + task.getIdAsString(), event);
|
||||
|
||||
log.info("Photo upload (multipart) successful: taskId={}, jobId={}, photoId={}, count={}",
|
||||
taskId, task.getJobIdAsString(), photo.getIdAsString(), base64Photos.size());
|
||||
return ResponseEntity.ok(response);
|
||||
} catch (IllegalArgumentException e) {
|
||||
return badRequest(response, "Ungültige taskId");
|
||||
} catch (IOException e) {
|
||||
log.error("Fehler beim Lesen der Dateien: {}", e.getMessage(), e);
|
||||
return serverError(response, "Fehler beim Lesen der Dateien: " + e.getMessage());
|
||||
} catch (Exception e) {
|
||||
log.error("Fehler beim Speichern der Fotos: {}", e.getMessage(), e);
|
||||
return serverError(response, "Fehler beim Speichern der Fotos");
|
||||
}
|
||||
}
|
||||
|
||||
@PostMapping(path = "/tasks/{taskId}/photos", consumes = MediaType.APPLICATION_JSON_VALUE)
|
||||
public ResponseEntity<Map<String, Object>> uploadPhotosJson(
|
||||
@PathVariable("taskId") String taskId,
|
||||
@RequestBody Map<String, Object> body
|
||||
) {
|
||||
Map<String, Object> response = initResponse("photoUploadAck");
|
||||
if (!StringUtils.hasText(taskId)) {
|
||||
return badRequest(response, "taskId ist erforderlich");
|
||||
}
|
||||
try {
|
||||
ObjectId taskObjectId = new ObjectId(taskId);
|
||||
Optional<BaseTask> opt = taskRepository.findById(taskObjectId);
|
||||
if (opt.isEmpty()) {
|
||||
return notFound(response, "Task nicht gefunden");
|
||||
}
|
||||
BaseTask task = opt.get();
|
||||
if (!"PHOTO".equals(task.getTaskType())) {
|
||||
return badRequest(response, "Task-Typ stimmt nicht. Erwartet: PHOTO, Gefunden: " + task.getTaskType());
|
||||
}
|
||||
|
||||
String completedBy = body.get("completedBy") != null ? body.get("completedBy").toString() : null;
|
||||
@SuppressWarnings("unchecked")
|
||||
List<String> photos = body.get("photos") instanceof List ? (List<String>) body.get("photos") : null;
|
||||
if (photos == null || photos.isEmpty()) {
|
||||
return badRequest(response, "Feld 'photos' (Liste von Base64-Strings) ist erforderlich");
|
||||
}
|
||||
|
||||
Photo photo = new Photo(task.getJobId(), task.getId(), photos, completedBy);
|
||||
photoRepository.save(photo);
|
||||
|
||||
response.put("success", true);
|
||||
response.put("photoId", photo.getIdAsString());
|
||||
response.put("photosCount", photo.getCount());
|
||||
response.put("taskId", task.getIdAsString());
|
||||
response.put("jobId", task.getJobIdAsString());
|
||||
|
||||
Map<String, Object> event = new HashMap<>();
|
||||
event.put("event", "photoUploaded");
|
||||
event.put("taskId", task.getIdAsString());
|
||||
event.put("jobId", task.getJobIdAsString());
|
||||
event.put("photosCount", photo.getCount());
|
||||
event.put("timestamp", now());
|
||||
messagingTemplate.convertAndSend("/topic/tasks/" + task.getIdAsString(), event);
|
||||
|
||||
log.info("Photo upload (json) successful: taskId={}, jobId={}, photoId={}, count={}",
|
||||
taskId, task.getJobIdAsString(), photo.getIdAsString(), photo.getCount());
|
||||
return ResponseEntity.ok(response);
|
||||
} catch (IllegalArgumentException e) {
|
||||
return badRequest(response, "Ungültige taskId");
|
||||
} catch (Exception e) {
|
||||
log.error("Fehler beim Speichern der Fotos: {}", e.getMessage(), e);
|
||||
return serverError(response, "Fehler beim Speichern der Fotos");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* New simple JSON endpoint: accept a single base64 photo with taskId in the body.
|
||||
* If there are multiple photos, call this endpoint multiple times.
|
||||
* Body: { "taskId": "<ObjectId>", "photo": "<base64>", "completedBy"?: "...", "note"?: "..." }
|
||||
*/
|
||||
@PostMapping(path = "/photos", consumes = MediaType.APPLICATION_JSON_VALUE)
|
||||
public ResponseEntity<Map<String, Object>> uploadSinglePhotoJson(
|
||||
@RequestBody Map<String, Object> body
|
||||
) {
|
||||
Map<String, Object> response = initResponse("photoUploadAck");
|
||||
String taskId = body != null && body.get("taskId") != null ? body.get("taskId").toString() : null;
|
||||
String completedBy = body != null && body.get("completedBy") != null ? body.get("completedBy").toString() : null;
|
||||
String base64Photo = body != null && body.get("photo") != null ? body.get("photo").toString() : null;
|
||||
|
||||
if (!StringUtils.hasText(taskId)) {
|
||||
return badRequest(response, "taskId ist erforderlich");
|
||||
}
|
||||
if (!StringUtils.hasText(base64Photo)) {
|
||||
return badRequest(response, "Feld 'photo' (Base64-String) ist erforderlich");
|
||||
}
|
||||
try {
|
||||
ObjectId taskObjectId = new ObjectId(taskId);
|
||||
Optional<BaseTask> opt = taskRepository.findById(taskObjectId);
|
||||
if (opt.isEmpty()) {
|
||||
return notFound(response, "Task nicht gefunden");
|
||||
}
|
||||
BaseTask task = opt.get();
|
||||
if (!"PHOTO".equals(task.getTaskType())) {
|
||||
return badRequest(response, "Task-Typ stimmt nicht. Erwartet: PHOTO, Gefunden: " + task.getTaskType());
|
||||
}
|
||||
|
||||
Photo photo = new Photo(task.getJobId(), task.getId(), java.util.List.of(base64Photo), completedBy);
|
||||
photoRepository.save(photo);
|
||||
|
||||
response.put("success", true);
|
||||
response.put("photoId", photo.getIdAsString());
|
||||
response.put("photosCount", 1);
|
||||
response.put("taskId", task.getIdAsString());
|
||||
response.put("jobId", task.getJobIdAsString());
|
||||
|
||||
Map<String, Object> event = new HashMap<>();
|
||||
event.put("event", "photoUploaded");
|
||||
event.put("taskId", task.getIdAsString());
|
||||
event.put("jobId", task.getJobIdAsString());
|
||||
event.put("photosCount", 1);
|
||||
event.put("timestamp", now());
|
||||
messagingTemplate.convertAndSend("/topic/tasks/" + task.getIdAsString(), event);
|
||||
|
||||
log.info("Photo upload (single json) successful: taskId={}, jobId={}, photoId={}",
|
||||
taskId, task.getJobIdAsString(), photo.getIdAsString());
|
||||
return ResponseEntity.ok(response);
|
||||
} catch (IllegalArgumentException e) {
|
||||
return badRequest(response, "Ungültige taskId");
|
||||
} catch (Exception e) {
|
||||
log.error("Fehler beim Speichern des Fotos: {}", e.getMessage(), e);
|
||||
return serverError(response, "Fehler beim Speichern des Fotos");
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, Object> initResponse(String type) {
|
||||
Map<String, Object> map = new HashMap<>();
|
||||
map.put("timestamp", now());
|
||||
map.put("type", type);
|
||||
return map;
|
||||
}
|
||||
|
||||
private String now() {
|
||||
return LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME);
|
||||
}
|
||||
|
||||
private ResponseEntity<Map<String, Object>> badRequest(Map<String, Object> response, String msg) {
|
||||
response.put("success", false);
|
||||
response.put("message", msg);
|
||||
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(response);
|
||||
|
||||
}
|
||||
|
||||
private ResponseEntity<Map<String, Object>> notFound(Map<String, Object> response, String msg) {
|
||||
response.put("success", false);
|
||||
response.put("message", msg);
|
||||
return ResponseEntity.status(HttpStatus.NOT_FOUND).body(response);
|
||||
}
|
||||
|
||||
private ResponseEntity<Map<String, Object>> serverError(Map<String, Object> response, String msg) {
|
||||
response.put("success", false);
|
||||
response.put("message", msg);
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
package de.assecutor.votianlt.mqtt;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* Kept for compatibility: The actual MQTT v5 lifecycle is managed by MqttV5ClientManager.
|
||||
* This runner only logs application readiness.
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class MqttClientRunner {
|
||||
|
||||
@EventListener(ApplicationReadyEvent.class)
|
||||
public void onApplicationReady() {
|
||||
log.info("Application ready. MQTT v5 client lifecycle managed by MqttV5ClientManager.");
|
||||
}
|
||||
}
|
||||
47
src/main/java/de/assecutor/votianlt/mqtt/MqttPublisher.java
Normal file
47
src/main/java/de/assecutor/votianlt/mqtt/MqttPublisher.java
Normal file
@@ -0,0 +1,47 @@
|
||||
package de.assecutor.votianlt.mqtt;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
|
||||
/**
|
||||
* Simple MQTT publishing helper to send JSON payloads.
|
||||
*
|
||||
* Note: In environments where Spring Integration MQTT is unavailable (e.g., offline CI),
|
||||
* this implementation degrades to a no-op publisher that logs the intended message.
|
||||
*/
|
||||
public interface MqttPublisher {
|
||||
void publishAsJson(String topic, Object payload);
|
||||
void publishAsJson(String topic, Object payload, boolean retained);
|
||||
}
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
class MqttPublisherImpl implements MqttPublisher {
|
||||
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
private final MqttV5ClientManager clientManager;
|
||||
|
||||
public MqttPublisherImpl(@Lazy MqttV5ClientManager clientManager) {
|
||||
this.clientManager = clientManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publishAsJson(String topic, Object payload) {
|
||||
publishAsJson(topic, payload, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publishAsJson(String topic, Object payload, boolean retained) {
|
||||
try {
|
||||
String json = (payload instanceof String s) ? s : objectMapper.writeValueAsString(payload);
|
||||
byte[] bytes = json.getBytes(java.nio.charset.StandardCharsets.UTF_8);
|
||||
// Default QoS 2
|
||||
clientManager.publish(topic, bytes, 2, retained);
|
||||
log.debug("[MQTT v5] published topic={} retained={} bytes={}", topic, retained, bytes.length);
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to serialize/publish MQTT message for topic {}: {}", topic, e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,206 @@
|
||||
package de.assecutor.votianlt.mqtt;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
|
||||
import com.hivemq.client.mqtt.datatypes.MqttQos;
|
||||
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
|
||||
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
|
||||
import de.assecutor.votianlt.config.MqttProperties;
|
||||
import de.assecutor.votianlt.controller.MessageController;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
|
||||
import java.net.URI;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* Manages a single MQTT v5 client connection with Spring lifecycle using HiveMQ MQTT Client.
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
public class MqttV5ClientManager implements SmartLifecycle {
|
||||
|
||||
private final MqttProperties props;
|
||||
private final MessageController messageController;
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
private volatile boolean running = false;
|
||||
private Mqtt5AsyncClient client;
|
||||
|
||||
public MqttV5ClientManager(MqttProperties props, @Lazy MessageController messageController) {
|
||||
this.props = props;
|
||||
this.messageController = messageController;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
if (!props.isEnabled()) {
|
||||
log.warn("MQTT is disabled via app.mqtt.enabled=false");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
String clientId = buildClientId(props.getClientId());
|
||||
URI uri = URI.create(props.getBrokerUri());
|
||||
String host = uri.getHost();
|
||||
int port = uri.getPort() > 0 ? uri.getPort() : ("ssl".equalsIgnoreCase(uri.getScheme()) || "mqtts".equalsIgnoreCase(uri.getScheme()) ? 8883 : 1883);
|
||||
|
||||
var builder = Mqtt5Client.builder()
|
||||
.identifier(clientId)
|
||||
.serverHost(host)
|
||||
.serverPort(port);
|
||||
if ("ssl".equalsIgnoreCase(uri.getScheme()) || "mqtts".equalsIgnoreCase(uri.getScheme()) || "tls".equalsIgnoreCase(uri.getScheme())) {
|
||||
builder = builder.sslWithDefaultConfig();
|
||||
}
|
||||
if (props.isAutomaticReconnect()) {
|
||||
builder = builder.automaticReconnectWithDefaultConfig();
|
||||
}
|
||||
client = builder.buildAsync();
|
||||
|
||||
var connect = client.connectWith()
|
||||
.cleanStart(props.isCleanStart())
|
||||
.keepAlive(props.getKeepAlive())
|
||||
.sessionExpiryInterval(props.getSessionExpiryInterval());
|
||||
|
||||
log.info("[MQTT] Connecting to {} with clientId={} ...", props.getBrokerUri(), clientId);
|
||||
connect.send().join();
|
||||
log.info("[MQTT] Connected");
|
||||
|
||||
// Handle all incoming publishes
|
||||
client.publishes(MqttGlobalPublishFilter.ALL, publish -> {
|
||||
String topic = publish.getTopic().toString();
|
||||
byte[] bytes;
|
||||
try {
|
||||
ByteBuffer buf = publish.getPayload().orElse(null);
|
||||
bytes = buf != null ? toByteArray(buf) : new byte[0];
|
||||
} catch (Throwable t) {
|
||||
bytes = new byte[0];
|
||||
}
|
||||
handleInbound(topic, bytes);
|
||||
});
|
||||
|
||||
// Subscribe to topics with QoS
|
||||
String[] topics = new String[]{
|
||||
"v1/app/+/task/photo/completed",
|
||||
"v1/app/+/task/confirm",
|
||||
"v1/app/+/task/completed",
|
||||
"v1/app/+/job/status",
|
||||
"v1/app/+/device/location",
|
||||
"v1/app/+/jobs/assigned",
|
||||
"v1/app/+/auth/login"
|
||||
};
|
||||
MqttQos qos = mapQos(props.getDefaultQos());
|
||||
for (String topic : topics) {
|
||||
client.subscribeWith().topicFilter(topic).qos(qos).send().join();
|
||||
}
|
||||
running = true;
|
||||
log.info("[MQTT] Subscribed to {} topics (QoS={}), awaiting messages ...", topics.length, qos);
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to start HiveMQ MQTT client: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private static byte[] toByteArray(ByteBuffer buffer) {
|
||||
byte[] bytes = new byte[buffer.remaining()];
|
||||
buffer.get(bytes);
|
||||
return bytes;
|
||||
}
|
||||
|
||||
private String buildClientId(String base) {
|
||||
String b = (base == null || base.isBlank()) ? "server" : base;
|
||||
if (!b.contains("${random.uuid}")) {
|
||||
return b + "-" + UUID.randomUUID();
|
||||
}
|
||||
return b.replace("${random.uuid}", UUID.randomUUID().toString());
|
||||
}
|
||||
|
||||
private void handleInbound(String topic, byte[] payload) {
|
||||
String json = new String(payload, StandardCharsets.UTF_8);
|
||||
try {
|
||||
Map<String, Object> map = objectMapper.readValue(json, new TypeReference<Map<String, Object>>(){});
|
||||
routeInbound(topic, map);
|
||||
} catch (Exception ex) {
|
||||
log.error("Failed to parse inbound MQTT JSON on {}: {}", topic, ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private void routeInbound(String topic, Map<String, Object> payload) {
|
||||
try {
|
||||
if (topic.matches("v1/app/.+/task/photo/completed")) {
|
||||
messageController.handlePhotoTaskCompleted(payload);
|
||||
} else if (topic.matches("v1/app/.+/task/confirm")) {
|
||||
messageController.handleTaskConfirmation(payload);
|
||||
} else if (topic.matches("v1/app/.+/task/completed")) {
|
||||
messageController.handleTaskCompleted(payload);
|
||||
} else if (topic.matches("v1/app/.+/job/status")) {
|
||||
messageController.handleJobStatusUpdate(payload);
|
||||
} else if (topic.matches("v1/app/.+/device/location")) {
|
||||
messageController.handleDeviceLocation(payload);
|
||||
} else if (topic.matches("v1/app/.+/jobs/assigned")) {
|
||||
messageController.handleGetAssignedJobs(payload);
|
||||
} else if (topic.matches("v1/app/.+/auth/login")) {
|
||||
var om = new ObjectMapper();
|
||||
de.assecutor.votianlt.dto.AppLoginRequest req = om.convertValue(payload, de.assecutor.votianlt.dto.AppLoginRequest.class);
|
||||
messageController.handleAppLogin(req);
|
||||
} else {
|
||||
log.debug("No route for topic {}", topic);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Error routing inbound MQTT message on {}: {}", topic, e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
try {
|
||||
if (client != null) {
|
||||
client.disconnect().join();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("Error during MQTT client shutdown: {}", e.getMessage());
|
||||
} finally {
|
||||
running = false;
|
||||
client = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return running;
|
||||
}
|
||||
|
||||
private MqttQos mapQos(int q) {
|
||||
return switch (q) {
|
||||
case 0 -> MqttQos.AT_MOST_ONCE;
|
||||
case 1 -> MqttQos.AT_LEAST_ONCE;
|
||||
default -> MqttQos.EXACTLY_ONCE;
|
||||
};
|
||||
}
|
||||
|
||||
public void publish(String topic, byte[] payload, int qos, boolean retained) {
|
||||
try {
|
||||
if (client == null) {
|
||||
log.warn("[MQTT] Not connected, dropping publish topic={}", topic);
|
||||
return;
|
||||
}
|
||||
client.publishWith()
|
||||
.topic(topic)
|
||||
.payload(payload)
|
||||
.qos(mapQos(qos))
|
||||
.retain(retained)
|
||||
.send()
|
||||
.whenComplete((ack, ex) -> {
|
||||
if (ex != null) {
|
||||
log.error("Failed to publish to {}: {}", topic, ex.getMessage(), ex);
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to publish to {}: {}", topic, e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -41,23 +41,14 @@ public class SecurityConfig extends VaadinWebSecurity {
|
||||
new AntPathRequestMatcher("/frontend/**"),
|
||||
new AntPathRequestMatcher("/webjars/**"),
|
||||
new AntPathRequestMatcher("/h2-console/**"),
|
||||
new AntPathRequestMatcher("/frontend-es5/**", "/frontend-es6/**"),
|
||||
// WebSocket und STOMP Endpunkte
|
||||
new AntPathRequestMatcher("/ws/**"),
|
||||
new AntPathRequestMatcher("/websocket/**"),
|
||||
new AntPathRequestMatcher("/stomp/**"),
|
||||
new AntPathRequestMatcher("/app/**"),
|
||||
new AntPathRequestMatcher("/topic/**"),
|
||||
new AntPathRequestMatcher("/queue/**")
|
||||
new AntPathRequestMatcher("/frontend-es5/**", "/frontend-es6/**")
|
||||
).permitAll()
|
||||
);
|
||||
|
||||
// CSRF für WebSocket-Endpunkte deaktivieren
|
||||
// Standard-CSRF-Konfiguration (keine speziellen WebSocket/STOMP-Ausnahmen mehr notwendig)
|
||||
http.csrf(csrf -> csrf
|
||||
.ignoringRequestMatchers(
|
||||
new AntPathRequestMatcher("/ws/**"),
|
||||
new AntPathRequestMatcher("/websocket/**"),
|
||||
new AntPathRequestMatcher("/stomp/**")
|
||||
new AntPathRequestMatcher("/h2-console/**")
|
||||
)
|
||||
);
|
||||
|
||||
|
||||
@@ -1,86 +1,2 @@
|
||||
package de.assecutor.votianlt.zeroconf;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.event.ContextClosedEvent;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.InetAddress;
|
||||
|
||||
/**
|
||||
* Publishes the STOMP WebSocket endpoint via Zeroconf (mDNS/Bonjour) using reflection.
|
||||
* If JmDNS is present on the classpath, it will register the service _stomp._tcp.local.
|
||||
*/
|
||||
@Component
|
||||
public class ZeroconfPublisher {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(ZeroconfPublisher.class);
|
||||
|
||||
@Value("${server.port:8080}")
|
||||
private int serverPort;
|
||||
|
||||
// Expose stomp endpoints paths via TXT records
|
||||
@Value("${app.stomp.wsPath:/ws}")
|
||||
private String wsPath;
|
||||
|
||||
@Value("${app.stomp.websocketPath:/websocket}")
|
||||
private String websocketPath;
|
||||
|
||||
@Value("${app.zeroconf.enabled:true}")
|
||||
private boolean enabled;
|
||||
|
||||
@Value("${app.zeroconf.serviceName:votianlt-stomp}")
|
||||
private String serviceName;
|
||||
|
||||
// Controls whether to log a notice if JmDNS is not available
|
||||
@Value("${app.zeroconf.warnWhenMissing:false}")
|
||||
private boolean warnWhenMissing;
|
||||
|
||||
private Object jmdns; // javax.jmdns.JmDNS instance if available
|
||||
|
||||
@EventListener(org.springframework.boot.context.event.ApplicationReadyEvent.class)
|
||||
public void onAppReady() {
|
||||
if (!enabled) return;
|
||||
try {
|
||||
Class<?> jmDNSClass = Class.forName("javax.jmdns.JmDNS");
|
||||
Class<?> serviceInfoClass = Class.forName("javax.jmdns.ServiceInfo");
|
||||
|
||||
InetAddress addr = InetAddress.getLocalHost();
|
||||
Method createMethod = jmDNSClass.getMethod("create", InetAddress.class);
|
||||
jmdns = createMethod.invoke(null, addr);
|
||||
|
||||
String type = "_stomp._tcp.local.";
|
||||
String text = "path=" + wsPath + ",websocket=" + websocketPath + ",protocol=stomp";
|
||||
|
||||
Method createServiceInfo = serviceInfoClass.getMethod("create", String.class, String.class, int.class, String.class);
|
||||
Object serviceInfo = createServiceInfo.invoke(null, type, serviceName, serverPort, text);
|
||||
|
||||
Method registerService = jmDNSClass.getMethod("registerService", serviceInfoClass);
|
||||
registerService.invoke(jmdns, serviceInfo);
|
||||
|
||||
logger.info("STOMP-Service veröffentlicht: {} name={} port={}", type, serviceName, serverPort);
|
||||
} catch (ClassNotFoundException e) {
|
||||
if (warnWhenMissing) {
|
||||
logger.warn("Hinweis: JmDNS ist nicht vorhanden – Zeroconf ist deaktiviert.");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Registrierung fehlgeschlagen: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@EventListener(ContextClosedEvent.class)
|
||||
public void onShutdown() {
|
||||
if (jmdns != null) {
|
||||
try {
|
||||
Method unregisterAll = jmdns.getClass().getMethod("unregisterAllServices");
|
||||
unregisterAll.invoke(jmdns);
|
||||
Method close = jmdns.getClass().getMethod("close");
|
||||
close.invoke(jmdns);
|
||||
} catch (Exception ignored) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Zeroconf removed from project.
|
||||
// This file intentionally left without any classes to eliminate any zeroconf-related beans or code.
|
||||
|
||||
@@ -28,16 +28,8 @@ mail.smtp.password=SV1705CA!noreply
|
||||
mail.smtp.host=smtp.ionos.de
|
||||
mail.smtp.port=587
|
||||
|
||||
# WebSocket and STOMP Configuration
|
||||
# WebSocket message size limits (in bytes) - Increased for large photo payloads
|
||||
# Enable STOMP over WebSocket
|
||||
spring.websocket.stomp.enabled=true
|
||||
# STOMP heartbeat settings (in milliseconds)
|
||||
spring.websocket.stomp.heartbeat.outgoing=10000
|
||||
spring.websocket.stomp.heartbeat.incoming=10000
|
||||
# HTTP request size limits for large payloads
|
||||
server.max-http-request-header-size=8MB
|
||||
# Spring messaging size limits for STOMP
|
||||
# Tomcat connector limits
|
||||
server.tomcat.max-http-form-post-size=64MB
|
||||
server.tomcat.max-save-post-size=64MB
|
||||
@@ -50,5 +42,20 @@ spring.servlet.multipart.max-request-size=64MB
|
||||
# Jackson message converter limits
|
||||
spring.jackson.default-property-inclusion=non_null
|
||||
|
||||
# MQTT v5 settings
|
||||
app.mqtt.enabled=true
|
||||
app.mqtt.broker-uri=tcp://192.168.180.26:1883
|
||||
# The server MQTT clientId; a random UUID suffix will be inserted where ${random.uuid} appears
|
||||
app.mqtt.client-id=server-${random.uuid}
|
||||
# v5 session and keepalive
|
||||
app.mqtt.clean-start=false
|
||||
app.mqtt.session-expiry-interval=86400
|
||||
app.mqtt.keep-alive=30
|
||||
app.mqtt.max-inflight=50
|
||||
app.mqtt.automatic-reconnect=true
|
||||
# Defaults for publishing
|
||||
app.mqtt.default-qos=2
|
||||
app.mqtt.default-retained=false
|
||||
|
||||
# 2FA Configuration
|
||||
app.security.two-factor.enabled=false
|
||||
Reference in New Issue
Block a user