refactor: Projektstruktur in app/ und backend/ aufgeteilt
This commit is contained in:
114
app/lib/services/message_handler.dart
Normal file
114
app/lib/services/message_handler.dart
Normal file
@@ -0,0 +1,114 @@
|
||||
import 'package:flutter/foundation.dart';
|
||||
|
||||
import '../models/message_envelope.dart';
|
||||
|
||||
/// Result of unwrapping a message envelope
|
||||
class UnwrapResult {
|
||||
/// The unwrapped payload
|
||||
final dynamic payload;
|
||||
|
||||
/// The message ID (null if not an envelope)
|
||||
final String? messageId;
|
||||
|
||||
/// Whether this message requires acknowledgment
|
||||
final bool requiresAck;
|
||||
|
||||
UnwrapResult({
|
||||
required this.payload,
|
||||
this.messageId,
|
||||
this.requiresAck = false,
|
||||
});
|
||||
}
|
||||
|
||||
/// Handles message envelope unwrapping and deduplication.
|
||||
///
|
||||
/// This class is extracted from WebSocketService for testability.
|
||||
/// It manages:
|
||||
/// - Detecting and unwrapping MessageEnvelope structures
|
||||
/// - Deduplicating messages by messageId
|
||||
/// - Triggering ACK callbacks when required
|
||||
class MessageHandler {
|
||||
final Set<String> _processedMessageIds = {};
|
||||
|
||||
/// Maximum number of message IDs to track for deduplication
|
||||
final int maxProcessedIds;
|
||||
|
||||
/// Callback invoked when an ACK should be sent
|
||||
final void Function(String messageId)? onAckRequired;
|
||||
|
||||
MessageHandler({
|
||||
this.maxProcessedIds = 100,
|
||||
this.onAckRequired,
|
||||
});
|
||||
|
||||
/// Check if data is a valid MessageEnvelope structure.
|
||||
///
|
||||
/// A valid envelope must contain:
|
||||
/// - messageId
|
||||
/// - timestamp
|
||||
/// - topic
|
||||
/// - payload
|
||||
bool isEnvelopeMessage(dynamic data) {
|
||||
if (data is! Map<String, dynamic>) return false;
|
||||
return data.containsKey('messageId') &&
|
||||
data.containsKey('timestamp') &&
|
||||
data.containsKey('topic') &&
|
||||
data.containsKey('payload');
|
||||
}
|
||||
|
||||
/// Unwrap a message envelope and handle deduplication.
|
||||
///
|
||||
/// Returns null if the message was already processed (duplicate).
|
||||
/// For duplicates, still triggers onAckRequired if the original required ACK.
|
||||
///
|
||||
/// Returns [UnwrapResult] with payload and ACK info for new messages.
|
||||
/// If data is not an envelope, returns it as-is with requiresAck=false.
|
||||
UnwrapResult? unwrapEnvelope(dynamic data) {
|
||||
if (!isEnvelopeMessage(data)) {
|
||||
// Not an envelope, return data as-is (no ACK needed)
|
||||
return UnwrapResult(
|
||||
payload: data,
|
||||
messageId: null,
|
||||
requiresAck: false,
|
||||
);
|
||||
}
|
||||
|
||||
final envelope = MessageEnvelope.fromJson(data as Map<String, dynamic>);
|
||||
|
||||
// Check for duplicate
|
||||
if (_processedMessageIds.contains(envelope.messageId)) {
|
||||
// Still send ACK for duplicate messages
|
||||
if (envelope.requiresAck && onAckRequired != null) {
|
||||
onAckRequired!(envelope.messageId);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
// Track this message as processed
|
||||
_processedMessageIds.add(envelope.messageId);
|
||||
|
||||
// Limit set size to prevent memory growth (FIFO eviction)
|
||||
if (_processedMessageIds.length > maxProcessedIds) {
|
||||
_processedMessageIds.remove(_processedMessageIds.first);
|
||||
}
|
||||
|
||||
return UnwrapResult(
|
||||
payload: envelope.payload,
|
||||
messageId: envelope.messageId,
|
||||
requiresAck: envelope.requiresAck,
|
||||
);
|
||||
}
|
||||
|
||||
/// Check if a message ID was already processed
|
||||
bool wasProcessed(String messageId) =>
|
||||
_processedMessageIds.contains(messageId);
|
||||
|
||||
/// Get the count of tracked message IDs
|
||||
int get processedCount => _processedMessageIds.length;
|
||||
|
||||
/// Clear all processed message IDs.
|
||||
///
|
||||
/// Primarily for testing purposes.
|
||||
@visibleForTesting
|
||||
void clearProcessedIds() => _processedMessageIds.clear();
|
||||
}
|
||||
Reference in New Issue
Block a user