Se ha implementado una solución completa para recibir, almacenar y exponer los mensajes MQTT del topic GREENHOUSE siguiendo las mejores prácticas de Spring Boot 2025 con separación de responsabilidades.
EMQX Broker (WSS)
↓
MqttConfig (Spring Integration)
↓
GreenhouseDataListener
↓
MqttMessageProcessor
├─→ Redis Cache (últimos 1000 mensajes)
├─→ TimescaleDB (histórico permanente)
└─→ Spring Event (GreenhouseMessageEvent)
↓
WebSocket Handler → Clientes WebSocket
GreenhouseMessageDto.kt- Mensaje GREENHOUSE con SENSOR_XX y SETPOINT_XXGreenhouseStatisticsDto.kt- Estadísticas (min/max/avg/count)GreenhouseSummaryDto.kt- Resumen de todos los sensoresGreenhouseExtensions.kt- Extension functions para conversión
GreenhouseCacheService.kt- Gestión de caché Redis (Sorted Set)GreenhouseDataService.kt- Lógica de negocio (coordina Redis + TimescaleDB)
MqttMessageProcessor.kt- Actualizado para:- Cachear en Redis
- Guardar en TimescaleDB
- Publicar eventos Spring
GreenhouseController.kt- Endpoints REST para consultar mensajes
WebSocketConfig.kt- Configuración STOMPGreenhouseWebSocketHandler.kt- Broadcast en tiempo real
application.yaml- Broker actualizado a WSSMqttConfig.kt- Compatible con WebSocket (sin cambios necesarios)
URL: ${MQTT_BROKER_URL} # e.g., wss://your-mqtt-broker.example.com:443/mqtt
Username: ${MQTT_USERNAME}
Password: ${MQTT_PASSWORD}
Topic: GREENHOUSE
QoS: 0IMPORTANTE: Configure las credenciales MQTT mediante variables de entorno (por ejemplo, en su archivo .env o variables de entorno del sistema). Consulte el archivo .env.example para más detalles.
{
"SENSOR_01": 1.23,
"SENSOR_02": 2.23,
"SETPOINT_01": 0.1,
"SETPOINT_02": 0.2,
"SETPOINT_03": 0.3
}GET /api/greenhouse/messages/recent?limit=100Respuesta:
[
{
"timestamp": "2025-11-09T18:16:24Z",
"sensor01": 1.23,
"sensor02": 2.23,
"setpoint01": 0.1,
"setpoint02": 0.2,
"setpoint03": 0.3,
"greenhouseId": "001",
"rawPayload": "{...}"
}
]GET /api/greenhouse/messages/range?from=2025-11-09T10:00:00Z&to=2025-11-09T11:00:00ZGET /api/greenhouse/messages/latestGET /api/greenhouse/statistics/SENSOR_01?period=1hParámetros de periodo:
1h- Última hora24h- Últimas 24 horas7d- Últimos 7 días30d- Últimos 30 días
Respuesta:
{
"sensorId": "SENSOR_01",
"sensorType": "SENSOR",
"min": 0.5,
"max": 2.5,
"avg": 1.5,
"count": 150,
"lastValue": 1.23,
"lastTimestamp": "2025-11-09T18:16:24Z",
"periodStart": "2025-11-09T17:16:24Z",
"periodEnd": "2025-11-09T18:16:24Z"
}GET /api/greenhouse/statistics/summary?period=1hRespuesta:
{
"timestamp": "2025-11-09T18:16:24Z",
"totalMessages": 500,
"sensors": {
"SENSOR_01": {
"current": 1.23,
"min": 0.5,
"max": 2.5,
"avg": 1.5,
"count": 500
},
"SENSOR_02": { ... }
},
"setpoints": {
"SETPOINT_01": { ... },
"SETPOINT_02": { ... },
"SETPOINT_03": { ... }
},
"periodStart": "2025-11-09T17:16:24Z",
"periodEnd": "2025-11-09T18:16:24Z"
}GET /api/greenhouse/cache/infoRespuesta:
{
"totalMessages": 1000,
"ttlSeconds": 86400,
"hasOldestMessage": true,
"hasLatestMessage": true,
"maxCapacity": 1000
}GET /api/greenhouse/healthEndpoint WebSocket:
ws://localhost:8080/ws/greenhouse
Con SockJS (compatibilidad navegadores):
http://localhost:8080/ws/greenhouse
/topic/greenhouse/messages- Mensajes nuevos en tiempo real/topic/greenhouse/statistics- Actualizaciones de estadísticas
// Usando SockJS + STOMP
const socket = new SockJS('http://localhost:8080/ws/greenhouse');
const stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
console.log('Conectado al WebSocket');
// Suscribirse a mensajes nuevos
stompClient.subscribe('/topic/greenhouse/messages', function(message) {
const data = JSON.parse(message.body);
console.log('Nuevo mensaje GREENHOUSE:', data);
// Actualizar UI con los nuevos datos
updateSensorDisplay(data);
});
});
function updateSensorDisplay(data) {
document.getElementById('sensor01').textContent = data.sensor01;
document.getElementById('sensor02').textContent = data.sensor02;
document.getElementById('setpoint01').textContent = data.setpoint01;
// ... etc
}const ws = new WebSocket('ws://localhost:8080/ws/greenhouse-native');
ws.onopen = function() {
console.log('WebSocket conectado');
};
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
console.log('Mensaje recibido:', data);
};- Estructura: Sorted Set con timestamp como score
- Capacidad: Últimos 1000 mensajes
- TTL: 24 horas
- Key:
greenhouse:messages
- Tabla:
sensor_readings - Esquema:
public - Retention: Permanente (o según política TimescaleDB)
- Campos: time, greenhouseId, sensorId, sensorType, value, unit
# Compilar
./gradlew build
# Ejecutar
./gradlew bootRun
# O con JAR
java -jar build/libs/invernaderos-0.0.1-SNAPSHOT.jarRevisa los logs al iniciar:
📥 Mensaje MQTT recibido:
Topic: GREENHOUSE
QoS: 0
Payload: {"SENSOR_01":1.23,...}
# Obtener últimos 10 mensajes
curl http://localhost:8080/api/greenhouse/messages/recent?limit=10
# Obtener último mensaje
curl http://localhost:8080/api/greenhouse/messages/latest
# Estadísticas de SENSOR_01 en última hora
curl "http://localhost:8080/api/greenhouse/statistics/SENSOR_01?period=1h"
# Resumen completo
curl "http://localhost:8080/api/greenhouse/statistics/summary?period=24h"Puedes usar una página HTML simple:
<!DOCTYPE html>
<html>
<head>
<title>Greenhouse Monitor</title>
<script src="https://cdn.jsdelivr.net/npm/sockjs-client@1/dist/sockjs.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/stompjs@2.3.3/lib/stomp.min.js"></script>
</head>
<body>
<h1>Greenhouse Real-Time Monitor</h1>
<div id="data"></div>
<script>
const socket = new SockJS('http://localhost:8080/ws/greenhouse');
const stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
console.log('Connected: ' + frame);
stompClient.subscribe('/topic/greenhouse/messages', function(message) {
const data = JSON.parse(message.body);
document.getElementById('data').innerHTML =
'<pre>' + JSON.stringify(data, null, 2) + '</pre>';
});
});
</script>
</body>
</html>- Mensaje llega al broker EMQX (topic: GREENHOUSE)
- Spring Integration MQTT recibe el mensaje
- GreenhouseDataListener captura el mensaje
- MqttMessageProcessor procesa:
- Convierte JSON a
GreenhouseMessageDto - Cachea en Redis (últimos 1000)
- Guarda en TimescaleDB (permanente)
- Publica
GreenhouseMessageEvent
- Convierte JSON a
- GreenhouseWebSocketHandler escucha el evento
- Broadcast WebSocket a todos los clientes suscritos
- Clientes reciben datos en tiempo real
IMPORTANTE: NO incluyas las credenciales reales aquí. Usa un gestor de secretos como:
- AWS Secrets Manager
- HashiCorp Vault
- Azure Key Vault
- Kubernetes Secrets
# MQTT
MQTT_BROKER_URL=wss://your-mqtt-broker.example.com:443/mqtt
MQTT_USERNAME=your_mqtt_username
MQTT_PASSWORD=your_secure_mqtt_password
# Redis
REDIS_HOST=your-redis-host
REDIS_PORT=6379
REDIS_PASSWORD=your_secure_redis_password
# TimescaleDB
TIMESCALE_PASSWORD=your_secure_timescale_password
# Metadata DB
METADATA_PASSWORD=your_secure_metadata_passwordIMPORTANTE:
- Reemplace todos los valores de ejemplo con credenciales seguras
- Nunca exponga credenciales reales en la documentación o código fuente
- Use gestores de secretos en producción (AWS Secrets Manager, Azure Key Vault, etc.)
Editar GreenhouseController.kt:
@CrossOrigin(origins = ["https://tu-dominio.com"])Y WebSocketConfig.kt:
registry.addEndpoint("/ws/greenhouse")
.setAllowedOrigins("https://tu-dominio.com")
.withSockJS()logging:
level:
com.apptolast.invernaderos: DEBUG
org.springframework.integration.mqtt: INFO- Total mensajes en cache:
/api/greenhouse/cache/info - Health check:
/api/greenhouse/health - Actuator:
/actuator/health
✅ Conexión al broker EMQX via WebSocket (WSS) ✅ Procesamiento de mensajes GREENHOUSE con formato exacto ✅ Caché Redis con los últimos 1000 mensajes ✅ Persistencia permanente en TimescaleDB ✅ REST API completa con 7 endpoints ✅ WebSocket en tiempo real (STOMP) ✅ Estadísticas agregadas (min/max/avg) ✅ Filtrado por rango de tiempo ✅ Separación de responsabilidades (Controller → Service → Repository/Cache) ✅ Event-driven architecture (Spring Events) ✅ Logging estructurado (SLF4J) ✅ Extension functions para conversión de DTOs ✅ Validación de parámetros ✅ Manejo de errores robusto
- Verificar logs: buscar "📥 Mensaje MQTT recibido"
- Verificar conexión al broker: logs de Spring Integration MQTT
- Verificar credenciales en
application.yaml
- Verificar puerto (default: 8080)
- Verificar CORS si accedes desde otro dominio
- Probar endpoint:
http://localhost:8080/ws/greenhouse
- Verificar conexión Redis: logs de Lettuce
- Verificar que
GreenhouseCacheServicese está inyectando - Usar endpoint:
/api/greenhouse/cache/info
- Crear índices en
timeysensorId - Usar políticas de retention de TimescaleDB
- Considerar compresión de datos antiguos
Implementado: 2025-11-09 Versión: Spring Boot 3.5.7 + Kotlin 1.9.25 Arquitectura: Event-Driven + REST + WebSocket + Redis Cache + TimescaleDB