From 7922832313096080037be96c4145e1830bd6176d Mon Sep 17 00:00:00 2001 From: cedric Date: Sat, 9 Aug 2025 10:39:10 +0200 Subject: [PATCH] feat: add Shelly push mode to fix Marstek Venus v153 communication issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds push mode functionality to work around communication interruptions introduced in Marstek Venus firmware v153. Instead of waiting for periodic JSON-RPC requests from the battery (which randomly stop after 10s-2min), the emulator can now proactively push power data to the battery. Related to issue #180 - intermittent communication failures on Venus v153. Features: - SHELLY_PUSH_MODE: enable proactive data transmission - SHELLY_PUSH_TARGET_IP/PORT: target battery address - SHELLY_PUSH_INTERVAL: configurable transmission rate - Maintains backward compatibility with request/response mode - Works with all Shelly device types (Pro3EM, EMG3, ProEM50) 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- CLAUDE.md | 63 ++++++++++++++++++++++++++++++++++++++++++++++ config.ini.example | 10 ++++++++ main.py | 61 ++++++++++++++++++++++++++++++++++++++++---- shelly/shelly.py | 62 ++++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 190 insertions(+), 6 deletions(-) create mode 100644 CLAUDE.md diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..d1b87e9 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,63 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Commands + +### Development +- **Run application**: `pipenv run python main.py` +- **Install dependencies**: `pipenv install` +- **Format code**: `pipenv run format` (uses black) +- **Run linting**: `pipenv run flake8` +- **Run tests**: `pipenv run pytest` +- **Run single test**: `pipenv run pytest path/to/test_file.py` + +### Docker +- **Build and run**: `docker-compose up -d` +- **View logs**: `docker-compose logs -f` + +## Architecture + +This is a Python-based smart meter emulator for Marstek storage systems (B2500, Jupiter, Venus). The system emulates multiple device types to bridge between real smart meters and Marstek energy storage systems. + +### Core Components + +1. **Device Emulators** (`ct001/`, `shelly/`): + - `CT001`: UDP/TCP protocol emulator for CT001 devices + - `Shelly`: UDP protocol emulator for various Shelly device types (Pro 3EM, EM Gen3, Pro EM50) + +2. **Powermeter Integrations** (`powermeter/`): + - Pluggable architecture with `base.Powermeter` abstract class + - Implementations for: Shelly, Tasmota, Home Assistant, MQTT, Modbus, ESPHome, etc. + - Each powermeter returns power values as list of watts (3-phase support) + +3. **Configuration System** (`config/`): + - INI-based configuration with multiple powermeter sections + - Client filtering via NETMASK settings for multi-device deployments + - Global and per-powermeter throttling support + +4. **Main Application** (`main.py`): + - Thread pool executor for running multiple device emulators concurrently + - Configuration loading and device orchestration + - Command-line argument parsing with config file fallbacks + +### Key Patterns + +- **Plugin Architecture**: Powermeters are loaded dynamically based on configuration sections +- **Client Filtering**: Multiple powermeters can serve different storage system IP ranges +- **Threaded Servers**: UDP discovery and TCP data streams run in separate threads +- **Power Value Convention**: Positive = grid import, Negative = grid export +- **Three-Phase Support**: All components handle 1-3 phase configurations + +### Configuration Structure + +- `config.ini` defines device types, powermeters, and network settings +- Multiple sections with same prefix (e.g., `[SHELLY_1]`, `[SHELLY_2]`) supported +- NETMASK filtering allows different powermeters for different client IPs +- Throttling prevents control instability in storage systems + +### Testing + +- Tests use `_test.py` suffix and are located alongside source files +- Powermeter implementations include unit tests for HTTP/API interactions +- Main test directory contains integration and compatibility tests \ No newline at end of file diff --git a/config.ini.example b/config.ini.example index 561835b..75f0a05 100644 --- a/config.ini.example +++ b/config.ini.example @@ -17,6 +17,16 @@ POLL_INTERVAL = 1 # Can be overridden per powermeter section THROTTLE_INTERVAL = 0 +# Shelly Push Mode Configuration (optional) +# Enable push mode to send data proactively to a target IP instead of waiting for requests +SHELLY_PUSH_MODE = False +# Target IP address for push mode (required if push mode is enabled) +SHELLY_PUSH_TARGET_IP = 192.168.2.8 +# Target port for push mode (required if push mode is enabled) +SHELLY_PUSH_TARGET_PORT = 22222 +# Interval between push messages in seconds +SHELLY_PUSH_INTERVAL = 1.0 + #[SHELLY] #TYPE = 1PM #PLUS1PM #EM #3EM #3EMPRO #IP = 192.168.1.100 diff --git a/main.py b/main.py index ad6776e..549ad05 100644 --- a/main.py +++ b/main.py @@ -55,6 +55,10 @@ def run_device( args: argparse.Namespace, powermeters: List[Tuple[Powermeter, ClientFilter]], device_id: Optional[str] = None, + push_mode: bool = False, + push_target_ip: Optional[str] = None, + push_target_port: Optional[int] = None, + push_interval: float = 1.0, ): logger.debug(f"Starting device: {device_type}") @@ -111,22 +115,62 @@ def update_readings(addr): elif device_type == "shellypro3em_old": logger.debug(f"Shelly Pro 3EM Settings:") logger.debug(f"Device ID: {device_id}") - device = Shelly(powermeters=powermeters, device_id=device_id, udp_port=1010) + if push_mode: + logger.debug(f"Push mode enabled: {push_target_ip}:{push_target_port} every {push_interval}s") + device = Shelly( + powermeters=powermeters, + device_id=device_id, + udp_port=1010, + push_mode=push_mode, + push_target_ip=push_target_ip, + push_target_port=push_target_port, + push_interval=push_interval + ) elif device_type == "shellypro3em_new": logger.debug(f"Shelly Pro 3EM Settings:") logger.debug(f"Device ID: {device_id}") - device = Shelly(powermeters=powermeters, device_id=device_id, udp_port=2220) + if push_mode: + logger.debug(f"Push mode enabled: {push_target_ip}:{push_target_port} every {push_interval}s") + device = Shelly( + powermeters=powermeters, + device_id=device_id, + udp_port=2220, + push_mode=push_mode, + push_target_ip=push_target_ip, + push_target_port=push_target_port, + push_interval=push_interval + ) elif device_type == "shellyemg3": logger.debug(f"Shelly EM Gen3 Settings:") logger.debug(f"Device ID: {device_id}") - device = Shelly(powermeters=powermeters, device_id=device_id, udp_port=2222) + if push_mode: + logger.debug(f"Push mode enabled: {push_target_ip}:{push_target_port} every {push_interval}s") + device = Shelly( + powermeters=powermeters, + device_id=device_id, + udp_port=2222, + push_mode=push_mode, + push_target_ip=push_target_ip, + push_target_port=push_target_port, + push_interval=push_interval + ) elif device_type == "shellyproem50": logger.debug(f"Shelly Pro EM 50 Settings:") logger.debug(f"Device ID: {device_id}") - device = Shelly(powermeters=powermeters, device_id=device_id, udp_port=2223) + if push_mode: + logger.debug(f"Push mode enabled: {push_target_ip}:{push_target_port} every {push_interval}s") + device = Shelly( + powermeters=powermeters, + device_id=device_id, + udp_port=2223, + push_mode=push_mode, + push_target_ip=push_target_ip, + push_target_port=push_target_port, + push_interval=push_interval + ) else: raise ValueError(f"Unsupported device type: {device_type}") @@ -198,6 +242,12 @@ def main(): if args.skip_powermeter_test is not None else cfg.getboolean("GENERAL", "SKIP_POWERMETER_TEST", fallback=False) ) + + # Load push mode configuration + push_mode = cfg.getboolean("GENERAL", "SHELLY_PUSH_MODE", fallback=False) + push_target_ip = cfg.get("GENERAL", "SHELLY_PUSH_TARGET_IP", fallback=None) + push_target_port = cfg.getint("GENERAL", "SHELLY_PUSH_TARGET_PORT", fallback=None) + push_interval = cfg.getfloat("GENERAL", "SHELLY_PUSH_INTERVAL", fallback=1.0) device_ids = args.device_ids if args.device_ids is not None else [] # Fill missing device IDs with default format @@ -246,7 +296,8 @@ def main(): for device_type, device_id in zip(device_types, device_ids): futures.append( executor.submit( - run_device, device_type, cfg, args, powermeters, device_id + run_device, device_type, cfg, args, powermeters, device_id, + push_mode, push_target_ip, push_target_port, push_interval ) ) # end for diff --git a/shelly/shelly.py b/shelly/shelly.py index 5fded9a..2fdf2a7 100644 --- a/shelly/shelly.py +++ b/shelly/shelly.py @@ -1,8 +1,9 @@ import socket import threading import json +import time from concurrent.futures import ThreadPoolExecutor -from typing import List, Tuple +from typing import List, Tuple, Optional from config import ClientFilter from powermeter import Powermeter from config.logger import logger @@ -14,15 +15,26 @@ def __init__( powermeters: List[Tuple[Powermeter, ClientFilter]], udp_port: int, device_id, + push_mode: bool = False, + push_target_ip: Optional[str] = None, + push_target_port: Optional[int] = None, + push_interval: float = 1.0, ): self._udp_port = udp_port self._device_id = device_id self._powermeters = powermeters self._udp_thread = None + self._push_thread = None self._stop = False self._value_mutex = threading.Lock() self._executor = ThreadPoolExecutor(max_workers=5) self._send_lock = threading.Lock() + + # Push mode configuration + self._push_mode = push_mode + self._push_target_ip = push_target_ip + self._push_target_port = push_target_port + self._push_interval = push_interval def _calculate_derived_values(self, power): decimal_point_enforcer = 0.001 @@ -116,6 +128,42 @@ def _handle_request(self, sock, data, addr): except Exception as e: logger.error(f"Error processing message: {e}") + def _push_data_loop(self): + """Send data proactively to target IP in push mode""" + if not self._push_mode or not self._push_target_ip or not self._push_target_port: + return + + logger.info(f"Starting push mode to {self._push_target_ip}:{self._push_target_port} every {self._push_interval}s") + + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + target_addr = (self._push_target_ip, self._push_target_port) + + try: + while not self._stop: + try: + # Get the first available powermeter (for push mode, we don't filter by client IP) + if self._powermeters: + powermeter = self._powermeters[0][0] # Take first powermeter + powers = powermeter.get_powermeter_watts() + + # Create a fake request ID for push mode + request_id = int(time.time()) % 1000000 + + # Default to EM.GetStatus format for push mode + response = self._create_em_response(request_id, powers) + response_json = json.dumps(response, separators=(",", ":")) + + logger.debug(f"Pushing data to {target_addr}: {response_json}") + sock.sendto(response_json.encode(), target_addr) + + except Exception as e: + logger.error(f"Error in push mode: {e}") + + time.sleep(self._push_interval) + + finally: + sock.close() + def udp_server(self): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind(("", self._udp_port)) @@ -133,16 +181,28 @@ def start(self): if self._udp_thread: return self._stop = False + + # Always start UDP server for backward compatibility self._udp_thread = threading.Thread(target=self.udp_server) self._udp_thread.start() + + # Start push thread if push mode is enabled + if self._push_mode: + self._push_thread = threading.Thread(target=self._push_data_loop) + self._push_thread.start() def join(self): if self._udp_thread: self._udp_thread.join() + if self._push_thread: + self._push_thread.join() def stop(self): self._stop = True if self._udp_thread: self._udp_thread.join() self._udp_thread = None + if self._push_thread: + self._push_thread.join() + self._push_thread = None self._executor.shutdown(wait=True)