forked from volpe-framework/volpe-container-py
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
152 lines (129 loc) · 5.3 KB
/
main.py
File metadata and controls
152 lines (129 loc) · 5.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import struct
import threading
import concurrent.futures
from array import array
from typing import override
import grpc
import tsplib95 as tsplib
from deap import base, creator, tools, algorithms
import random
# Protobuf imports
import volpe_py as volpe
# --- TSP Setup ---
problem = tsplib.load_problem('gil262.tsp')
NDIM = 262
BASE_POPULATION_SIZE = 100
LAMBDA_SIZE = BASE_POPULATION_SIZE*7
CXPROB = 0.5
MUTATION_RATE = 0.2
# --- DEAP Configuration ---
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
creator.create("Individual", array, fitness=creator.FitnessMin, typecode='i')
toolbox = base.Toolbox()
toolbox.register('indices', random.sample, range(NDIM), NDIM)
toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.indices)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
def evaluate_tsp(individual):
# Convert array back to list for tsplib if necessary
return (problem.trace_tours([[i+1 for i in individual]])[0],)
toolbox.register("evaluate", evaluate_tsp)
toolbox.register("mate", tools.cxOrdered)
toolbox.register("mutate", tools.mutShuffleIndexes, indpb=0.05)
toolbox.register("select", tools.selTournament, tournsize=3)
# --- Serialization Utilities ---
# 'i' is signed 4-byte integer.
STRUCT_FORMAT = f">{NDIM}i"
def ind_to_bytes(individual: array) -> bytes:
"""Converts array-based individual to bytes."""
return struct.pack(STRUCT_FORMAT, *individual)
def bytes_to_ind(data: bytes):
"""Converts bytes back to a DEAP array Individual."""
indices = struct.unpack(STRUCT_FORMAT, data)
return creator.Individual(indices)
# --- Servicer Implementation ---
class VolpeGreeterServicer(volpe.VolpeContainerServicer):
def __init__(self):
# Evaluate initial population
print("INITIALIZED TSP")
self.popln = toolbox.population(n=BASE_POPULATION_SIZE)
for ind in self.popln:
ind.fitness.values = toolbox.evaluate(ind)
self.poplock = threading.Lock()
self.last_best = float('inf')
@override
def InitFromSeed(self, request, context):
with self.poplock:
self.popln = toolbox.population(n=BASE_POPULATION_SIZE)
for ind in self.popln:
ind.fitness.values = toolbox.evaluate(ind)
return volpe.Reply(success=True)
@override
def InitFromSeedPopulation(self, request: pbc.Population, context):
with self.poplock:
indices = random.sample(range(len(self.popln)), len(request.members))
for memb, idx in zip(request.members, indices):
new_ind = bytes_to_ind(memb.genotype)
new_ind.fitness.values = (memb.fitness,)
self.popln[idx] = new_ind
return volpe.Reply(success=True)
@override
def GetBestPopulation(self, request: volpe.PopulationSize, context):
with self.poplock:
best_inds = tools.selBest(self.popln, request.size)
members = [
pbc.Individual(
genotype=ind_to_bytes(ind),
fitness=ind.fitness.values[0]
) for ind in best_inds
]
return pbc.Population(members=members)
@override
def GetRandom(self, request: volpe.PopulationSize, context):
with self.poplock:
rand_inds = tools.selRandom(self.popln, request.size)
members = [
pbc.Individual(
genotype=ind_to_bytes(ind),
fitness=ind.fitness.values[0]
) for ind in rand_inds
]
return pbc.Population(members=members)
@override
def GetResults(self, request: volpe.PopulationSize, context: grpc.ServicerContext):
with self.poplock:
if not self.popln:
return volpe.ResultPopulation(members=[])
top_n = tools.selBest(self.popln, k=request.size)
indList: list[volpe.ResultIndividual] = []
for mem in top_n:
# Convert the array.array/DEAP individual to a clean list for string output
route_str = str(list(mem))
indList.append(
volpe.ResultIndividual(
representation=route_str,
fitness=mem.fitness.values[0]
)
)
return volpe.ResultPopulation(members=indList)
@override
def RunForGenerations(self, request: volpe.PopulationSize, context):
with self.poplock:
self.popln, logbook = algorithms.eaMuPlusLambda(
population=self.popln,
toolbox=toolbox,
mu=BASE_POPULATION_SIZE,
lambda_= LAMBDA_SIZE,
cxpb=CXPROB,
mutpb=MUTATION_RATE,
ngen=request.size,
verbose=False
)
current_best = tools.selBest(self.popln, 1)[0].fitness.values[0]
self.last_best = current_best
return volpe.Reply(success=True)
if __name__ == '__main__':
server = grpc.server(concurrent.futures.ThreadPoolExecutor(max_workers=10))
volpe.add_VolpeContainerServicer_to_server(VolpeGreeterServicer(), server)
server.add_insecure_port("0.0.0.0:8081")
server.start()
server.wait_for_termination()