diff --git a/go/src/pythia/backend/queue.go b/go/src/pythia/backend/queue.go index 9214834..592e8c3 100644 --- a/go/src/pythia/backend/queue.go +++ b/go/src/pythia/backend/queue.go @@ -17,12 +17,14 @@ package backend import ( "container/list" + "encoding/json" "flag" "fmt" "log" "pythia" "strings" "sync" + "time" ) func init() { @@ -41,7 +43,7 @@ type queueClient struct { Id int // The response channel. - Response chan<- pythia.Message + Response chan<- pythia.Message `json:"-"` // The number of parallel jobs this pool can handle. Capacity int @@ -67,7 +69,7 @@ type queueJob struct { // The client having submitted this job. Origin *queueClient - // Element of the queue.waiting list pointing to this job, or nil if the job + // Element of the queue.Waiting list pointing to this job, or nil if the job // is currently running. WaitingElement *list.Element @@ -96,6 +98,17 @@ const ( quitMsg pythia.MsgType = "-quit" ) +// A queueStatus is an internal structure required to marshal the state of the Queue +// in a semantically right JSON. +type QueueStatus struct { + Capacity int `json:"capacity"` + Available int `json:"available"` + Clients []*queueClient `json:"clients, omitempty"` + Jobs []*queueJob `json:"jobs, omitempty"` + Waiting *list.List `json:"waiting"` + CreationDate time.Time `json:"creation_date"` +} + // The Queue is the central component of Pythia. // It receives jobs (tasks with inputs) from front-ends and dispatches them // to the sandboxes. @@ -116,13 +129,16 @@ type Queue struct { wg sync.WaitGroup // Active connections - clients map[int]*queueClient + Clients map[int]*queueClient // Jobs to be processed/currently processing - jobs map[string]*queueJob + Jobs map[string]*queueJob // List of jobs (*queueJob) waiting to be assigned. - waiting *list.List + Waiting *list.List + + // Get the Queue creation datetime + CreationDate time.Time } // NewQueue returns a new queue with default parameters. @@ -130,6 +146,7 @@ func NewQueue() *Queue { queue := new(Queue) queue.Capacity = 500 queue.quit = make(chan bool, 1) + queue.CreationDate = time.Now() return queue } @@ -149,6 +166,7 @@ func (queue *Queue) Run() { closing := false master := make(chan queueMessage) queue.master = master + go func() { <-queue.quit closing = true @@ -192,21 +210,21 @@ func (queue *Queue) Shutdown() { // Main goroutine responsible for scheduling the jobs. func (queue *Queue) main(master <-chan queueMessage) { defer queue.wg.Done() - queue.clients = make(map[int]*queueClient) - queue.jobs = make(map[string]*queueJob) - queue.waiting = list.New() + queue.Clients = make(map[int]*queueClient) + queue.Jobs = make(map[string]*queueJob) + queue.Waiting = list.New() for qm := range master { switch qm.Msg.Message { case connectMsg: log.Print("Client ", qm.Client.Id, ": connected.") - queue.clients[qm.Client.Id] = qm.Client + queue.Clients[qm.Client.Id] = qm.Client case pythia.RegisterPoolMsg: log.Print("Client ", qm.Client.Id, ": pool capacity ", qm.Msg.Capacity) qm.Client.Capacity = qm.Msg.Capacity case pythia.LaunchMsg: id := qm.Msg.Id - if _, ok := queue.jobs[id]; ok { + if _, ok := queue.Jobs[id]; ok { log.Print("Job ", id, ": already launched, rejecting.") qm.Client.Response <- pythia.Message{ Message: pythia.DoneMsg, @@ -214,7 +232,7 @@ func (queue *Queue) main(master <-chan queueMessage) { Status: pythia.Fatal, Output: "Job already launched", } - } else if queue.waiting.Len() >= queue.Capacity { + } else if queue.Waiting.Len() >= queue.Capacity { log.Print("Job ", id, ": queue full, rejecting.") qm.Client.Response <- pythia.Message{ Message: pythia.DoneMsg, @@ -229,14 +247,14 @@ func (queue *Queue) main(master <-chan queueMessage) { Origin: qm.Client, } qm.Client.Submitted[id] = job - queue.jobs[id] = job - job.WaitingElement = queue.waiting.PushBack(job) + queue.Jobs[id] = job + job.WaitingElement = queue.Waiting.PushBack(job) log.Print("Job ", id, ": queued.") } case pythia.DoneMsg: id := qm.Msg.Id log.Print("Job ", id, ": done.") - job := queue.jobs[id] + job := queue.Jobs[id] if job == nil { log.Println("Ignoring message for unknown job", qm.Msg) break @@ -246,7 +264,7 @@ func (queue *Queue) main(master <-chan queueMessage) { log.Println("Ignoring message from wrong source", qm.Msg) break } - delete(queue.jobs, id) + delete(queue.Jobs, id) delete(pool.Running, id) if job.Origin != nil { // job.Origin is nil if the submitting client has disconnected @@ -257,22 +275,22 @@ func (queue *Queue) main(master <-chan queueMessage) { case closedMsg: log.Print("Client ", qm.Client.Id, ": disconnected.") close(qm.Client.Response) - delete(queue.clients, qm.Client.Id) + delete(queue.Clients, qm.Client.Id) for _, job := range qm.Client.Running { if job.Origin == nil { // Submitter disconnected, we can discard the job. - delete(queue.jobs, job.Id) + delete(queue.Jobs, job.Id) } else { // Otherwise, reschedule it. job.Pool = nil - job.WaitingElement = queue.waiting.PushFront(job) + job.WaitingElement = queue.Waiting.PushFront(job) } } for _, job := range qm.Client.Submitted { if job.WaitingElement != nil { // Job is in waiting queue, discard it. - queue.waiting.Remove(job.WaitingElement) - delete(queue.jobs, job.Id) + queue.Waiting.Remove(job.WaitingElement) + delete(queue.Jobs, job.Id) } else if job.Pool != nil { // Job is running, abort it. job.Origin = nil @@ -280,12 +298,28 @@ func (queue *Queue) main(master <-chan queueMessage) { Message: pythia.AbortMsg, Id: job.Id, } - // Keep job in queue.jobs to handle abort result + // Keep job in queue.Jobs to handle abort result } } case quitMsg: log.Println("Quitting.") goto quit + + case pythia.StatusMsg: + status := fillQueueStatus(queue) + id := qm.Msg.Id + serializedStatus, err := json.Marshal(status) + if err != nil { + log.Fatal("Queue is in an invalid state") + log.Fatal(err) + } + qm.Client.Response <- pythia.Message{ + Message: pythia.DoneMsg, + Id: id, + Status: pythia.Success, + Output: string(serializedStatus), + } + log.Println("Client ", qm.Client.Id, " : Status sent") default: log.Fatal("Invalid internal message", qm.Msg) } @@ -295,19 +329,19 @@ func (queue *Queue) main(master <-chan queueMessage) { } quit: - if len(queue.clients) == 0 { + if len(queue.Clients) == 0 { return } - for _, client := range queue.clients { + for _, client := range queue.Clients { close(client.Response) } - // Wait for all clients to quit. We flush messages from the master channel + // Wait for all Clients to quit. We flush messages from the master channel // to ensure no connection handler is in a deadlock. for qm := range master { switch qm.Msg.Message { case closedMsg: - delete(queue.clients, qm.Client.Id) - if len(queue.clients) == 0 { + delete(queue.Clients, qm.Client.Id) + if len(queue.Clients) == 0 { return } default: @@ -320,17 +354,17 @@ quit: // This function shall be called from the main goroutine, as it manipulates // the queue data structures. func (queue *Queue) schedule() { - if queue.waiting.Len() == 0 { + if queue.Waiting.Len() == 0 { return } - for _, client := range queue.clients { + for _, client := range queue.Clients { for len(client.Running) < client.Capacity { - job := queue.waiting.Remove(queue.waiting.Front()).(*queueJob) + job := queue.Waiting.Remove(queue.Waiting.Front()).(*queueJob) job.WaitingElement = nil job.Pool = client client.Running[job.Id] = job client.Response <- job.Msg - if queue.waiting.Len() == 0 { + if queue.Waiting.Len() == 0 { return } } @@ -361,6 +395,8 @@ func (queue *Queue) handle(conn *pythia.Conn, client *queueClient, response chan queue.master <- queueMessage{msg, client} case pythia.DoneMsg: queue.master <- queueMessage{msg, client} + case pythia.StatusMsg: + queue.master <- queueMessage{msg, client} default: log.Println("Ignoring message", msg) } @@ -374,10 +410,40 @@ func (queue *Queue) handle(conn *pythia.Conn, client *queueClient, response chan case pythia.DoneMsg: msg.Id = msg.Id[strings.Index(msg.Id, ":")+1:] conn.Send(msg) + case pythia.StatusMsg: + conn.Send(msg) default: log.Fatal("Invalid internal message", msg) } } } +func convertClientsToSlice(clients map[int]*queueClient) (clientsSlice []*queueClient) { + clientsSlice = make([]*queueClient, 0) + for _, element := range clients { + clientsSlice = append(clientsSlice, element) + } + return clientsSlice +} + +func convertJobsToSlice(jobs map[string]*queueJob) (jobsSlice []*queueJob) { + jobsSlice = make([]*queueJob, 0) + for _, element := range jobs { + jobsSlice = append(jobsSlice, element) + } + return jobsSlice +} + +// Return a QueueStatus struct filled with information coming from the Queue +func fillQueueStatus(queue *Queue) (status QueueStatus) { + status.Capacity = queue.Capacity + status.Available = queue.Capacity - len(queue.Jobs) - queue.Waiting.Len() + status.Clients = convertClientsToSlice(queue.Clients) + status.Jobs = convertJobsToSlice(queue.Jobs) + status.Waiting = queue.Waiting + status.CreationDate = queue.CreationDate + + return status +} + // vim:set ts=4 sw=4 noet: diff --git a/go/src/pythia/backend/queue_test.go b/go/src/pythia/backend/queue_test.go index bb51466..eb010f2 100644 --- a/go/src/pythia/backend/queue_test.go +++ b/go/src/pythia/backend/queue_test.go @@ -16,7 +16,10 @@ package backend import ( + "encoding/json" "pythia" + "reflect" + "strconv" "testing" "testutils" "testutils/pytest" @@ -114,4 +117,46 @@ func TestQueueSimple(t *testing.T) { f.TearDown() } +func TestQueueStatus(t *testing.T) { + f := SetupQueueFixture(t, 500, 2) + frontend := f.Clients[0] + + frontend.Send(pythia.Message{ + Message: pythia.StatusMsg, + Id: "test", + }) + + // Removing Clients array from Message as this part will differ due to client list + // referencing the test client as a client but said client is no longer connected + // when the status is emitted + status := fillQueueStatus(f.Queue) + status.Clients = make([]*queueClient, 0) + + msg := <-frontend.Conn.Receive() + if msg.Message != pythia.DoneMsg || msg.Id != "test" || msg.Status != pythia.Success { + t.Fatal("Message content mismatching") + } + + var expected QueueStatus + expected.Clients = make([]*queueClient, 0) + json.Unmarshal([]byte(msg.Output), &expected) + + // The content of the Waiting list is not compared because it's not efficient + // and it's not really interesting because the list is supposed to be empty + if expected.Capacity != status.Capacity || + expected.Available != status.Available || + !reflect.DeepEqual(expected.Jobs, status.Jobs) || + !reflect.DeepEqual(expected.Waiting.Len(), status.Waiting.Len()) || + !expected.CreationDate.Equal(status.CreationDate) { + + t.Error("Capacity : " + strconv.FormatBool(expected.Capacity != status.Capacity)) + t.Error("Available : " + strconv.FormatBool(expected.Available != status.Available)) + t.Error("Jobs : " + strconv.FormatBool(!reflect.DeepEqual(expected.Jobs, status.Jobs))) + t.Error("Waiting : " + strconv.FormatBool(!reflect.DeepEqual(expected.Waiting, status.Waiting))) + t.Error("CreationDate : " + strconv.FormatBool(!expected.CreationDate.Equal(status.CreationDate))) + } + + f.TearDown() +} + // vim:set sw=4 ts=4 noet: diff --git a/go/src/pythia/frontend/server.go b/go/src/pythia/frontend/server.go index fc0d923..046e171 100644 --- a/go/src/pythia/frontend/server.go +++ b/go/src/pythia/frontend/server.go @@ -83,6 +83,7 @@ func (server *Server) Run() { }() // Start the web server http.HandleFunc("/execute", handler) + http.HandleFunc("/status", statusHandler) log.Println("Server listening on", server.Port) if err := http.ListenAndServe(fmt.Sprint(":", server.Port), nil); err != nil { log.Fatal(err) @@ -140,4 +141,28 @@ func handler(rw http.ResponseWriter, req *http.Request) { rw.WriteHeader(http.StatusInternalServerError) } +// Handle for /status route to get the status of the Queue +func statusHandler(rw http.ResponseWriter, req *http.Request) { + log.Println("Client connected: ", req.URL) + if req.Method != "GET" { + rw.WriteHeader(http.StatusMethodNotAllowed) + return + } + // Connection to the pool + conn := pythia.DialRetry(pythia.QueueAddr) + defer conn.Close() + conn.Send(pythia.Message{ + Message: pythia.StatusMsg, + }) + if msg, ok := <-conn.Receive(); ok { + switch msg.Status { + case "success": + rw.Header().Set("Content-Type", "application/json") + fmt.Fprintf(rw, msg.Output) + } + return + } + rw.WriteHeader(http.StatusInternalServerError) +} + // vim:set sw=4 ts=4 noet: diff --git a/go/src/pythia/structs.go b/go/src/pythia/structs.go index 0964cb8..dc41d0e 100644 --- a/go/src/pythia/structs.go +++ b/go/src/pythia/structs.go @@ -90,6 +90,9 @@ const ( // (or another status if the job has ended meanwhile). // Frontend->Queue, Queue->Pool AbortMsg MsgType = "abort" + + // Request status of the Queue + StatusMsg MsgType = "status" ) // A Message is the basic entity that is sent between components. Messages are