Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Commands

### Development
- **Run application**: `pipenv run python main.py`
- **Install dependencies**: `pipenv install`
- **Format code**: `pipenv run format` (uses black)
- **Run linting**: `pipenv run flake8`
- **Run tests**: `pipenv run pytest`
- **Run single test**: `pipenv run pytest path/to/test_file.py`

### Docker
- **Build and run**: `docker-compose up -d`
- **View logs**: `docker-compose logs -f`

## Architecture

This is a Python-based smart meter emulator for Marstek storage systems (B2500, Jupiter, Venus). The system emulates multiple device types to bridge between real smart meters and Marstek energy storage systems.

### Core Components

1. **Device Emulators** (`ct001/`, `shelly/`):
- `CT001`: UDP/TCP protocol emulator for CT001 devices
- `Shelly`: UDP protocol emulator for various Shelly device types (Pro 3EM, EM Gen3, Pro EM50)

2. **Powermeter Integrations** (`powermeter/`):
- Pluggable architecture with `base.Powermeter` abstract class
- Implementations for: Shelly, Tasmota, Home Assistant, MQTT, Modbus, ESPHome, etc.
- Each powermeter returns power values as list of watts (3-phase support)

3. **Configuration System** (`config/`):
- INI-based configuration with multiple powermeter sections
- Client filtering via NETMASK settings for multi-device deployments
- Global and per-powermeter throttling support

4. **Main Application** (`main.py`):
- Thread pool executor for running multiple device emulators concurrently
- Configuration loading and device orchestration
- Command-line argument parsing with config file fallbacks

### Key Patterns

- **Plugin Architecture**: Powermeters are loaded dynamically based on configuration sections
- **Client Filtering**: Multiple powermeters can serve different storage system IP ranges
- **Threaded Servers**: UDP discovery and TCP data streams run in separate threads
- **Power Value Convention**: Positive = grid import, Negative = grid export
- **Three-Phase Support**: All components handle 1-3 phase configurations

### Configuration Structure

- `config.ini` defines device types, powermeters, and network settings
- Multiple sections with same prefix (e.g., `[SHELLY_1]`, `[SHELLY_2]`) supported
- NETMASK filtering allows different powermeters for different client IPs
- Throttling prevents control instability in storage systems

### Testing

- Tests use `_test.py` suffix and are located alongside source files
- Powermeter implementations include unit tests for HTTP/API interactions
- Main test directory contains integration and compatibility tests
10 changes: 10 additions & 0 deletions config.ini.example
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@ POLL_INTERVAL = 1
# Can be overridden per powermeter section
THROTTLE_INTERVAL = 0

# Shelly Push Mode Configuration (optional)
# Enable push mode to send data proactively to a target IP instead of waiting for requests
SHELLY_PUSH_MODE = False
# Target IP address for push mode (required if push mode is enabled)
SHELLY_PUSH_TARGET_IP = 192.168.2.8
# Target port for push mode (required if push mode is enabled)
SHELLY_PUSH_TARGET_PORT = 22222
# Interval between push messages in seconds
SHELLY_PUSH_INTERVAL = 1.0

#[SHELLY]
#TYPE = 1PM #PLUS1PM #EM #3EM #3EMPRO
#IP = 192.168.1.100
Expand Down
61 changes: 56 additions & 5 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ def run_device(
args: argparse.Namespace,
powermeters: List[Tuple[Powermeter, ClientFilter]],
device_id: Optional[str] = None,
push_mode: bool = False,
push_target_ip: Optional[str] = None,
push_target_port: Optional[int] = None,
push_interval: float = 1.0,
):
logger.debug(f"Starting device: {device_type}")

Expand Down Expand Up @@ -111,22 +115,62 @@ def update_readings(addr):
elif device_type == "shellypro3em_old":
logger.debug(f"Shelly Pro 3EM Settings:")
logger.debug(f"Device ID: {device_id}")
device = Shelly(powermeters=powermeters, device_id=device_id, udp_port=1010)
if push_mode:
logger.debug(f"Push mode enabled: {push_target_ip}:{push_target_port} every {push_interval}s")
device = Shelly(
powermeters=powermeters,
device_id=device_id,
udp_port=1010,
push_mode=push_mode,
push_target_ip=push_target_ip,
push_target_port=push_target_port,
push_interval=push_interval
)

elif device_type == "shellypro3em_new":
logger.debug(f"Shelly Pro 3EM Settings:")
logger.debug(f"Device ID: {device_id}")
device = Shelly(powermeters=powermeters, device_id=device_id, udp_port=2220)
if push_mode:
logger.debug(f"Push mode enabled: {push_target_ip}:{push_target_port} every {push_interval}s")
device = Shelly(
powermeters=powermeters,
device_id=device_id,
udp_port=2220,
push_mode=push_mode,
push_target_ip=push_target_ip,
push_target_port=push_target_port,
push_interval=push_interval
)

elif device_type == "shellyemg3":
logger.debug(f"Shelly EM Gen3 Settings:")
logger.debug(f"Device ID: {device_id}")
device = Shelly(powermeters=powermeters, device_id=device_id, udp_port=2222)
if push_mode:
logger.debug(f"Push mode enabled: {push_target_ip}:{push_target_port} every {push_interval}s")
device = Shelly(
powermeters=powermeters,
device_id=device_id,
udp_port=2222,
push_mode=push_mode,
push_target_ip=push_target_ip,
push_target_port=push_target_port,
push_interval=push_interval
)

elif device_type == "shellyproem50":
logger.debug(f"Shelly Pro EM 50 Settings:")
logger.debug(f"Device ID: {device_id}")
device = Shelly(powermeters=powermeters, device_id=device_id, udp_port=2223)
if push_mode:
logger.debug(f"Push mode enabled: {push_target_ip}:{push_target_port} every {push_interval}s")
device = Shelly(
powermeters=powermeters,
device_id=device_id,
udp_port=2223,
push_mode=push_mode,
push_target_ip=push_target_ip,
push_target_port=push_target_port,
push_interval=push_interval
)

else:
raise ValueError(f"Unsupported device type: {device_type}")
Expand Down Expand Up @@ -198,6 +242,12 @@ def main():
if args.skip_powermeter_test is not None
else cfg.getboolean("GENERAL", "SKIP_POWERMETER_TEST", fallback=False)
)

# Load push mode configuration
push_mode = cfg.getboolean("GENERAL", "SHELLY_PUSH_MODE", fallback=False)
push_target_ip = cfg.get("GENERAL", "SHELLY_PUSH_TARGET_IP", fallback=None)
push_target_port = cfg.getint("GENERAL", "SHELLY_PUSH_TARGET_PORT", fallback=None)
push_interval = cfg.getfloat("GENERAL", "SHELLY_PUSH_INTERVAL", fallback=1.0)

device_ids = args.device_ids if args.device_ids is not None else []
# Fill missing device IDs with default format
Expand Down Expand Up @@ -246,7 +296,8 @@ def main():
for device_type, device_id in zip(device_types, device_ids):
futures.append(
executor.submit(
run_device, device_type, cfg, args, powermeters, device_id
run_device, device_type, cfg, args, powermeters, device_id,
push_mode, push_target_ip, push_target_port, push_interval
)
)
# end for
Expand Down
62 changes: 61 additions & 1 deletion shelly/shelly.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import socket
import threading
import json
import time
from concurrent.futures import ThreadPoolExecutor
from typing import List, Tuple
from typing import List, Tuple, Optional
from config import ClientFilter
from powermeter import Powermeter
from config.logger import logger
Expand All @@ -14,15 +15,26 @@ def __init__(
powermeters: List[Tuple[Powermeter, ClientFilter]],
udp_port: int,
device_id,
push_mode: bool = False,
push_target_ip: Optional[str] = None,
push_target_port: Optional[int] = None,
push_interval: float = 1.0,
):
self._udp_port = udp_port
self._device_id = device_id
self._powermeters = powermeters
self._udp_thread = None
self._push_thread = None
self._stop = False
self._value_mutex = threading.Lock()
self._executor = ThreadPoolExecutor(max_workers=5)
self._send_lock = threading.Lock()

# Push mode configuration
self._push_mode = push_mode
self._push_target_ip = push_target_ip
self._push_target_port = push_target_port
self._push_interval = push_interval

def _calculate_derived_values(self, power):
decimal_point_enforcer = 0.001
Expand Down Expand Up @@ -116,6 +128,42 @@ def _handle_request(self, sock, data, addr):
except Exception as e:
logger.error(f"Error processing message: {e}")

def _push_data_loop(self):
"""Send data proactively to target IP in push mode"""
if not self._push_mode or not self._push_target_ip or not self._push_target_port:
return

logger.info(f"Starting push mode to {self._push_target_ip}:{self._push_target_port} every {self._push_interval}s")

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
target_addr = (self._push_target_ip, self._push_target_port)

try:
while not self._stop:
try:
# Get the first available powermeter (for push mode, we don't filter by client IP)
if self._powermeters:
powermeter = self._powermeters[0][0] # Take first powermeter
powers = powermeter.get_powermeter_watts()

# Create a fake request ID for push mode
request_id = int(time.time()) % 1000000

# Default to EM.GetStatus format for push mode
response = self._create_em_response(request_id, powers)
response_json = json.dumps(response, separators=(",", ":"))

logger.debug(f"Pushing data to {target_addr}: {response_json}")
sock.sendto(response_json.encode(), target_addr)

except Exception as e:
logger.error(f"Error in push mode: {e}")

time.sleep(self._push_interval)

finally:
sock.close()

def udp_server(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(("", self._udp_port))
Expand All @@ -133,16 +181,28 @@ def start(self):
if self._udp_thread:
return
self._stop = False

# Always start UDP server for backward compatibility
self._udp_thread = threading.Thread(target=self.udp_server)
self._udp_thread.start()

# Start push thread if push mode is enabled
if self._push_mode:
self._push_thread = threading.Thread(target=self._push_data_loop)
self._push_thread.start()

def join(self):
if self._udp_thread:
self._udp_thread.join()
if self._push_thread:
self._push_thread.join()

def stop(self):
self._stop = True
if self._udp_thread:
self._udp_thread.join()
self._udp_thread = None
if self._push_thread:
self._push_thread.join()
self._push_thread = None
self._executor.shutdown(wait=True)