Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 220 additions & 0 deletions backend/app/physics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
"""
Core physics module for dive calculations.
Based on algorithms from Subsurface (https://github.com/subsurface/subsurface).

This module implements:
1. Real Gas Law (Compressibility/Z-Factor) using Virial expansion.
2. Gas Laws (Dalton's Law, partial pressures).
3. Isobaric Counterdiffusion (ICD) checks.
4. Unit conversions and standard constants.
"""

from typing import Tuple, Optional, Dict, List
from pydantic import BaseModel

# -----------------------------------------------------------------------------
# Constants & Enums
# -----------------------------------------------------------------------------

# Subsurface uses 1.01325 bar as standard surface pressure for volume conversions
SURFACE_PRESSURE_BAR = 1.01325

class GasMix(BaseModel):
"""
Represents a breathing gas mixture.
Percentages should be 0-100 (e.g., 21 for Air).
"""
o2: float
he: float = 0.0

@property
def n2(self) -> float:
return 100.0 - self.o2 - self.he

@property
def is_air(self) -> bool:
return abs(self.o2 - 21.0) < 0.1 and self.he < 0.1

@property
def is_trimix(self) -> bool:
return self.he > 0.0

# -----------------------------------------------------------------------------
# Compressibility (Z-Factor) - Real Gas Law
# -----------------------------------------------------------------------------

def calculate_z_factor(pressure_bar: float, gas: GasMix) -> float:
"""
Calculates the gas compressibility factor (Z) using the Virial equation.
Based on Subsurface implementation (core/compressibility.c / core/gas.c).

Z = PV / nRT
Z = 1 + B/V + C/V^2 ... approximated via pressure series for diving ranges.

Args:
pressure_bar: Pressure in bar.
gas: GasMix object.

Returns:
float: The compressibility factor Z.
"""
# Clamp pressure to 0-500 bar range as per Subsurface to avoid polynomial explosion
p = max(0.0, min(pressure_bar, 500.0))

# Coefficients from Subsurface (3rd order virial expansion)
# These are empirical coefficients for O2, N2, He
O2_COEFFS = [-7.18092073703e-4, 2.81852572808e-6, -1.50290620492e-9]
N2_COEFFS = [-2.19260353292e-4, 2.92844845532e-6, -2.07613482075e-9]
HE_COEFFS = [4.87320026468e-4, -8.83632921053e-8, 5.33304543646e-11]

def virial(coeffs: List[float], x: float) -> float:
return x * coeffs[0] + (x ** 2) * coeffs[1] + (x ** 3) * coeffs[2]

# Subsurface calculation uses weighted averages of the virial terms
# Note: Subsurface code uses permille (0-1000), we convert to that scale for the formula match
o2_permille = gas.o2 * 10.0
he_permille = gas.he * 10.0
n2_permille = gas.n2 * 10.0

# The formula essentially calculates Z-1 (z_minus_1) scaled by 1000
z_m1 = (virial(O2_COEFFS, p) * o2_permille +
virial(HE_COEFFS, p) * he_permille +
virial(N2_COEFFS, p) * n2_permille)

# Convert back: Z = 1 + (Weighted_Virial_Sum / 1000)
return z_m1 * 0.001 + 1.0

def calculate_real_volume(tank_water_volume_liters: float, pressure_bar: float, gas: GasMix) -> float:
"""
Calculates the actual volume of gas (at surface pressure) in a tank, accounting for compressibility.

Real Volume = (Tank_Water_Vol * Pressure) / Z
(Note: Adjusted for surface pressure reference)

Args:
tank_water_volume_liters: Wet volume of the tank (e.g., 12L).
pressure_bar: Current pressure in the tank.
gas: The gas mixture.

Returns:
float: Equivalent surface volume in liters.
"""
if pressure_bar <= 0:
return 0.0

z = calculate_z_factor(pressure_bar, gas)

# Standard formula: V_surface = (P_tank * V_tank) / (P_surface * Z)
return (pressure_bar * tank_water_volume_liters) / (SURFACE_PRESSURE_BAR * z)

def calculate_pressure_from_volume(surface_volume_liters: float, tank_water_volume_liters: float, gas: GasMix) -> float:
"""
Inverse of calculate_real_volume. Iteratively solves for pressure given a gas volume.
Useful for "How much pressure do I need for X liters of gas?"
"""
# Initial guess using Ideal Gas Law: P = (V_surf * P_surf) / V_tank
p_guess = (surface_volume_liters * SURFACE_PRESSURE_BAR) / tank_water_volume_liters

# Newton-Raphson or simple iteration to converge
for _ in range(5):
z = calculate_z_factor(p_guess, gas)
p_new = (surface_volume_liters * SURFACE_PRESSURE_BAR * z) / tank_water_volume_liters
if abs(p_new - p_guess) < 0.1:
return p_new
p_guess = p_new

return p_guess

# -----------------------------------------------------------------------------
# Partial Pressures & Depths
# -----------------------------------------------------------------------------

def depth_to_bar(depth_meters: float, surface_pressure: float = 1.013) -> float:
"""Calculates absolute ambient pressure at depth (ATA/bar)."""
# Simple approx: depth/10 + surface.
# For higher precision, density of water (salt/fresh) could be a parameter.
return (depth_meters / 10.0) + surface_pressure

def calculate_mod(gas: GasMix, pp_o2_max: float) -> float:
"""
Calculates Maximum Operating Depth (MOD) in meters.
MOD = (ppO2_max / fO2 - surface_pressure) * 10
"""
if gas.o2 <= 0:
return 0.0

f_o2 = gas.o2 / 100.0
max_ata = pp_o2_max / f_o2
return max(0.0, (max_ata - 1.0) * 10.0)

def calculate_end(depth_meters: float, gas: GasMix) -> float:
"""
Calculates Equivalent Narcotic Depth (END).
Assumes O2 is narcotic (standard recreational/tech view).
Formula: END = (Depth + 10) * (1 - fHe) - 10
"""
f_he = gas.he / 100.0
return (depth_meters + 10.0) * (1.0 - f_he) - 10.0

def calculate_ead(depth_meters: float, gas: GasMix) -> float:
"""
Calculates Equivalent Air Depth (EAD) for Nitrox.
EAD = (Depth + 10) * (fN2 / 0.79) - 10
"""
f_n2 = gas.n2 / 100.0
return (depth_meters + 10.0) * (f_n2 / 0.79) - 10.0

# -----------------------------------------------------------------------------
# Safety Checks
# -----------------------------------------------------------------------------

class ICDResult(BaseModel):
warning: bool
delta_n2: float
delta_he: float
message: Optional[str] = None

def check_isobaric_counterdiffusion(current_gas: GasMix, next_gas: GasMix) -> ICDResult:
"""
Checks for Isobaric Counterdiffusion (ICD) risks when switching gases.
Implements the 'Rule of Fifths': Nitrogen increase should not exceed 1/5th of Helium decrease.

Based on Subsurface: core/gas.cpp -> isobaric_counterdiffusion()

Args:
current_gas: The gas being breathed currently.
next_gas: The gas being switched to.

Returns:
ICDResult: Warning flag and delta values.
"""
# Delta N2 (increase is positive)
d_n2 = next_gas.n2 - current_gas.n2

# Delta He (decrease is negative)
d_he = next_gas.he - current_gas.he

# Logic:
# 1. Current gas must have Helium (>0)
# 2. Switching results in N2 increase (>0)
# 3. Switching results in He decrease (<0)
# 4. Rule check: 5 * delta_N2 > -delta_He (Magnitude of N2 rise is large relative to He drop)

warning = False
message = None

if current_gas.he > 0 and d_n2 > 0 and d_he < 0:
# Subsurface logic: 5 * results->dN2 > -results->dHe
if 5 * d_n2 > -d_he:
warning = True
message = (
f"ICD Warning: Isobaric Counterdiffusion risk detected. "
f"Nitrogen increase ({d_n2:.1f}%) is more than 1/5th of Helium decrease ({abs(d_he):.1f}%)."
)

return ICDResult(
warning=warning,
delta_n2=d_n2,
delta_he=d_he,
message=message
)
47 changes: 47 additions & 0 deletions backend/app/routers/dives/dives_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from .dives_validation import raise_validation_error
from .dives_logging import log_dive_operation, log_error
from .dives_utils import has_deco_profile, generate_dive_name, get_or_create_deco_tag, find_dive_site_by_import_id
from app.physics import GasMix, calculate_real_volume


# Helper function to convert old difficulty labels to new codes
Expand Down Expand Up @@ -643,6 +644,7 @@ def parse_cylinder(cylinder_elem):
cylinder_data['workpressure'] = cylinder_elem.get('workpressure')
cylinder_data['description'] = cylinder_elem.get('description')
cylinder_data['o2'] = cylinder_elem.get('o2')
cylinder_data['he'] = cylinder_elem.get('he')
cylinder_data['start'] = cylinder_elem.get('start')
cylinder_data['end'] = cylinder_elem.get('end')
cylinder_data['depth'] = cylinder_elem.get('depth')
Expand Down Expand Up @@ -820,6 +822,51 @@ def convert_to_divemap_format(dive_number, rating, visibility, sac, otu, cns, ta
if sac:
dive_info_parts.append(f"SAC: {sac}")

# Calculate Real SAC (Z-Factor) using Physics Engine
if parsed_duration and parsed_duration > 0 and cylinders:
# Use first cylinder with valid pressure drop
for cylinder in cylinders:
try:
# Extract volume (e.g. "15.0 l")
size_str = cylinder.get('size', '').replace(' l', '').strip()
vol = float(size_str) if size_str else 0

# Extract pressures (e.g. "200.0 bar")
start_str = cylinder.get('start', '').replace(' bar', '').strip()
end_str = cylinder.get('end', '').replace(' bar', '').strip()
start_p = float(start_str) if start_str else 0
end_p = float(end_str) if end_str else 0

# Extract gas mix
o2_str = cylinder.get('o2', '21%').replace('%', '').strip()
he_str = cylinder.get('he', '0%').replace('%', '').strip()
o2 = float(o2_str) if o2_str else 21.0
he = float(he_str) if he_str else 0.0

if vol > 0 and start_p > end_p:
# Get average depth from computer data if available
avg_depth = 0
if computer_data and computer_data.get('mean_depth'):
avg_depth = float(computer_data['mean_depth'].replace(' m', ''))

if avg_depth > 0:
gas_mix = GasMix(o2=o2, he=he)

# Calculate Real Volume used (Surface Equivalent)
vol_start = calculate_real_volume(vol, start_p, gas_mix)
vol_end = calculate_real_volume(vol, end_p, gas_mix)
gas_used_liters = vol_start - vol_end

# Calculate SAC (L/min/atm)
# ATA = Depth/10 + 1 (Approx for SAC)
ata = (avg_depth / 10.0) + 1.0
real_sac = gas_used_liters / parsed_duration / ata

dive_info_parts.append(f"Real SAC (Z-Factor): {real_sac:.1f} L/min")
break # Only calculate for primary cylinder
except (ValueError, AttributeError):
continue

if otu:
dive_info_parts.append(f"OTU: {otu}")

Expand Down
66 changes: 66 additions & 0 deletions backend/tests/test_physics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@

from app.physics import (
GasMix,
calculate_z_factor,
calculate_real_volume,
calculate_pressure_from_volume,
check_isobaric_counterdiffusion
)
import pytest

def test_z_factor_air_surface():
# Air at 1 bar should be close to ideal (Z=1)
air = GasMix(o2=21, he=0)
z = calculate_z_factor(1.0, air)
assert abs(z - 1.0) < 0.01

def test_z_factor_air_high_pressure():
# Air at 200 bar compresses LESS than ideal (Z > 1)
air = GasMix(o2=21, he=0)
z = calculate_z_factor(200.0, air)
assert z > 1.0
# Expected approx range 1.05 - 1.10 for 200 bar air
assert 1.0 < z < 1.15

def test_real_volume_air_200bar():
# 12L tank at 200 bar
# Ideal: 12 * 200 = 2400 L
# Real: Less than 2400 because Z > 1
air = GasMix(o2=21, he=0)
vol = calculate_real_volume(12, 200, air)

# Calculate expected roughly
# z ~ 1.07
# P_surf = 1.01325
# V = (200 * 12) / (1.01325 * 1.07) ~ 2213 L
assert vol < 2400
assert 2100 < vol < 2300

def test_icd_check_trimix_to_air():
# Switching from Trimix 21/35 (35% He) to Air (0% He, 79% N2)
# Current: 21% O2, 35% He, 44% N2
# Next: 21% O2, 0% He, 79% N2
# dHe = -35
# dN2 = +35
# Rule of Fifths: 5 * dN2 > -dHe ?
# 5 * 35 = 175. 175 > 35. Yes -> Warning.

tx = GasMix(o2=21, he=35)
air = GasMix(o2=21, he=0)

result = check_isobaric_counterdiffusion(tx, air)
assert result.warning is True
assert "Isobaric Counterdiffusion risk" in result.message

def test_icd_check_safe_switch():
# Switching from Tx 18/45 to Tx 50/25 (Deco gas)
# Current: 18% O2, 45% He, 37% N2
# Next: 50% O2, 25% He, 25% N2
# dHe = -20
# dN2 = -12 (N2 actually decreases too, so no ICD risk from N2 loading)

deep_gas = GasMix(o2=18, he=45)
deco_gas = GasMix(o2=50, he=25)

result = check_isobaric_counterdiffusion(deep_gas, deco_gas)
assert result.warning is False
Loading
Loading