@@ -29,6 +29,7 @@ const (
2929 TaskExecutionFailedMsg = "Task execution failed"
3030 WorkerTimeoutMsg = "Worker timeout"
3131 WorkerDisconnectedMsg = "Worker disconnected"
32+ ServiceUnavailableMsg = "服务不可用"
3233)
3334
3435//go:embed index.html
@@ -189,14 +190,38 @@ func (s *Scheduler) cleanupWorkerTasks(worker *Worker, errorMsg string) {
189190 s .mu .Unlock ()
190191}
191192
193+ // selectWorker 选择可用的Worker来执行指定方法
194+ func (s * Scheduler ) selectWorker (method string ) * Worker {
195+ var selectedWorker * Worker
196+ minCount := math .MaxInt
197+
198+ s .workers .Range (func (key , value interface {}) bool {
199+ worker := value .(* Worker )
200+ lastPingNano := atomic .LoadInt64 (& worker .LastPing )
201+ lastPing := time .Unix (0 , lastPingNano )
202+ if time .Since (lastPing ) > 60 * time .Second {
203+ return true
204+ }
205+
206+ for _ , workerMethod := range worker .Methods {
207+ if workerMethod .Name == method {
208+ workerCount := atomic .LoadInt64 (& worker .Count )
209+ if workerCount < int64 (minCount ) {
210+ selectedWorker = worker
211+ minCount = int (workerCount )
212+ }
213+ break
214+ }
215+ }
216+ return true
217+ })
218+
219+ return selectedWorker
220+ }
221+
192222func (s * Scheduler ) handleWorkerMessages (worker * Worker , conn * websocket.Conn ) {
193223 for {
194- var msg struct {
195- Type string `json:"type"`
196- TaskID string `json:"taskId"`
197- Result json.RawMessage `json:"result"`
198- Error string `json:"error"`
199- }
224+ var msg TaskResultMessage
200225 if err := conn .ReadJSON (& msg ); err != nil {
201226 s .cleanupWorkerTasks (worker , WorkerDisconnectedMsg )
202227 log .Printf ("Worker %s disconnected: %v" , worker .ID , err )
@@ -207,17 +232,12 @@ func (s *Scheduler) handleWorkerMessages(worker *Worker, conn *websocket.Conn) {
207232 case "pong" :
208233 atomic .StoreInt64 (& worker .LastPing , time .Now ().UnixNano ())
209234 case "result" :
210- s .processTaskResult (msg )
235+ s .processTaskResult (& msg )
211236 }
212237 }
213238}
214239
215- func (s * Scheduler ) processTaskResult (msg struct {
216- Type string `json:"type"`
217- TaskID string `json:"taskId"`
218- Result json.RawMessage `json:"result"`
219- Error string `json:"error"`
220- }) {
240+ func (s * Scheduler ) processTaskResult (msg * TaskResultMessage ) {
221241 s .mu .Lock ()
222242 defer s .mu .Unlock ()
223243
@@ -273,33 +293,11 @@ func (s *Scheduler) handleEncryptedExecute(w http.ResponseWriter, r *http.Reques
273293 s .encryptedTasks [encryptedTask .ID ] = encryptedTask
274294
275295 // 选择可用的Worker
276- var selectedWorker * Worker
277- minCount := math .MaxInt
278-
279- s .workers .Range (func (key , value interface {}) bool {
280- worker := value .(* Worker )
281- lastPingNano := atomic .LoadInt64 (& worker .LastPing )
282- lastPing := time .Unix (0 , lastPingNano )
283- if time .Since (lastPing ) > 60 * time .Second {
284- return true
285- }
286-
287- for _ , method := range worker .Methods {
288- if method .Name == req .Method {
289- workerCount := atomic .LoadInt64 (& worker .Count )
290- if workerCount < int64 (minCount ) {
291- selectedWorker = worker
292- minCount = int (workerCount )
293- }
294- break
295- }
296- }
297- return true
298- })
296+ selectedWorker := s .selectWorker (req .Method )
299297
300298 if selectedWorker == nil {
301299 encryptedTask .Status = TaskStatusError
302- encryptedTask .Result = "服务不可用"
300+ encryptedTask .Result = ServiceUnavailableMsg
303301 s .mu .Unlock ()
304302 } else {
305303 atomic .AddInt64 (& selectedWorker .Count , 1 )
@@ -363,36 +361,11 @@ func (s *Scheduler) handleExecute(w http.ResponseWriter, r *http.Request) {
363361 s .tasks .Store (task .ID , task )
364362
365363 // 在同一个锁内完成worker选择和状态更新,避免竞态条件
366- var selectedWorker * Worker
367- minCount := math .MaxInt
368-
369- // 查找可用Worker并验证其连接状态
370- s .workers .Range (func (key , value interface {}) bool {
371- worker := value .(* Worker )
372- // 使用原子操作检查worker连接是否仍然有效
373- lastPingNano := atomic .LoadInt64 (& worker .LastPing )
374- lastPing := time .Unix (0 , lastPingNano )
375- if time .Since (lastPing ) > 60 * time .Second {
376- return true // 跳过可能已断线的worker
377- }
378-
379- for _ , method := range worker .Methods {
380- if method .Name == req .Method {
381- // 使用原子操作读取计数
382- workerCount := atomic .LoadInt64 (& worker .Count )
383- if workerCount < int64 (minCount ) {
384- selectedWorker = worker
385- minCount = int (workerCount )
386- }
387- break // 找到匹配方法就跳出内层循环
388- }
389- }
390- return true
391- })
364+ selectedWorker := s .selectWorker (req .Method )
392365
393366 if selectedWorker == nil {
394367 task .Status = TaskStatusError
395- task .Result = "服务不可用"
368+ task .Result = ServiceUnavailableMsg
396369 s .mu .Unlock ()
397370 } else {
398371 // 使用原子操作增加计数
0 commit comments