-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsearch.py
More file actions
110 lines (90 loc) · 4.48 KB
/
search.py
File metadata and controls
110 lines (90 loc) · 4.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import numpy as np
import importlib
import json
import itertools
import os
from dsl.common.evaluation import calculate_similarity
class BeamSearchSolver:
def __init__(self, specialist_name: str, beam_width: int = 5):
self.specialist_name = specialist_name
self.beam_width = beam_width
self.tools = self._load_tools()
def _load_tools(self):
"""Load tool definitions for this specialist from its context.json.
Uses a path anchored to this package directory to avoid dependence on
current working directory.
"""
base_dir = os.path.dirname(__file__)
context_path = os.path.join(base_dir, 'dsl', self.specialist_name, 'context.json')
try:
with open(context_path, 'r') as f:
return json.load(f)
except FileNotFoundError:
return []
def _generate_arguments(self, grid: np.ndarray, tool_params: list) -> list:
"""Generates possible argument combinations for a given tool."""
arg_possibilities = []
for param in tool_params:
param_name = param['name']
# Simple rule-based parameter generation
if param_name == 'grid':
continue # The grid is always the current state
elif param_name == 'direction':
arg_possibilities.append(['up', 'down', 'left', 'right'])
elif param_name == 'angle':
arg_possibilities.append([90, 180, 270])
elif param_name == 'old_color':
# Use colors present in the grid
arg_possibilities.append(list(np.unique(grid[grid > 0])))
elif param_name == 'new_color':
# Try changing to any of the 9 possible colors
arg_possibilities.append(list(range(1, 10)))
# Return all combinations of the generated arguments
if not arg_possibilities:
return [[]] # For functions with no params other than grid
return list(itertools.product(*arg_possibilities))
def solve(self, input_grid: np.ndarray, target_grid: np.ndarray, max_depth: int = 3, heuristic_plan: list = None):
initial_state = (input_grid, [], calculate_similarity(input_grid, target_grid))
beam = [initial_state]
prioritized_tools = [t for t in self.tools if t['function_name'] in heuristic_plan] if heuristic_plan else self.tools
for depth in range(max_depth):
candidates = []
for current_grid, program, score in beam:
if score == 1.0: return program
for tool in prioritized_tools:
func_name = tool['function_name']
module_name = tool['module']
# Generate all possible argument sets for this tool
arg_sets = self._generate_arguments(current_grid, tool['parameters'])
for args in arg_sets:
try:
module = importlib.import_module(module_name)
func = getattr(module, func_name)
new_grid = func(current_grid, *args)
new_program = program + [{'tool': func_name, 'params': list(args)}]
new_score = calculate_similarity(new_grid, target_grid)
candidates.append((new_grid, new_program, new_score))
except Exception:
continue
if not candidates: break
candidates.sort(key=lambda x: x[2], reverse=True)
beam = candidates[:self.beam_width]
if beam and beam[0][2] == 1.0:
return beam[0][1] # Return the program
return None
# --- Example usage ---
if __name__ == '__main__':
# Test with a puzzle that requires two steps: move right, then move down.
solver = BeamSearchSolver('geometry', beam_width=10)
start = np.array([[0,0,0],[0,2,0],[0,0,0]])
end = np.array([[0,0,0],[0,0,0],[0,0,2]])
print("Solving puzzle...")
# No heuristic provided, so it will search blindly using all geometry tools
solution_program = solver.solve(start, end, max_depth=2)
if solution_program:
print("✅ Solution Found!")
print("Program:")
for step in solution_program:
print(f" - {step['tool']}({', '.join(map(str, step['params']))})")
else:
print("❌ No solution found.")