-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexecution_plan.py
More file actions
57 lines (46 loc) · 1.67 KB
/
execution_plan.py
File metadata and controls
57 lines (46 loc) · 1.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
"""
ExecutionPlan - Immutable DTO for compiled execution plan
NORP Compliance:
- NORP-003: Immutable DTO (frozen dataclass)
- NORP-005: Deterministic execution order + parallel groups
License: MIT
Copyright: 2026 NeuraScope CONVERWAY
"""
from dataclasses import dataclass
from typing import List, Dict, Any
@dataclass(frozen=True)
class ExecutionPlan:
"""
Immutable execution plan (NORP-003 + NORP-005 compliant)
"""
nodes: List[Dict[str, Any]]
execution_order: List[str]
parallel_groups: List[Dict[str, Any]]
estimated_duration_ms: int
def get_level(self, level: int) -> List[str]:
"""Get nodes at specific dependency level"""
if level < len(self.parallel_groups):
return self.parallel_groups[level].get('nodes', [])
return []
def is_parallelizable(self, level: int) -> bool:
"""Check if level can be parallelized"""
if level >= len(self.parallel_groups):
return False
group = self.parallel_groups[level]
return group.get('parallel', False) and len(group.get('nodes', [])) > 1
def get_levels_count(self) -> int:
"""Get total number of levels in DAG"""
return len(self.parallel_groups)
def get_stats(self) -> dict:
"""Get execution plan statistics"""
parallelizable_count = sum(
len(g['nodes'])
for g in self.parallel_groups
if g.get('parallel', False)
)
return {
'total_nodes': len(self.nodes),
'levels': self.get_levels_count(),
'parallelizable_nodes': parallelizable_count,
'estimated_duration_ms': self.estimated_duration_ms
}