From 01db81633262638ecc3df18fb76bfc58ea657d38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=A2=81=E6=99=93=E7=94=9F?= Date: Sun, 26 Nov 2023 23:54:40 +0800 Subject: [PATCH] feat: implement cache --- .../domain/run/event_create_handler.go | 76 +++++++++++++++++-- .../context/submission/domain/run/factory.go | 46 +++++------ .../context/submission/domain/run/object.go | 26 ++++--- .../submission/domain/run/repository.go | 1 + .../submission/domain/submission/event.go | 1 + .../domain/submission/event_create_handler.go | 1 + .../persistence/run/sql/mapper.go | 48 ++++++------ .../infrastructure/persistence/run/sql/po.go | 24 +++--- .../persistence/run/sql/repository.go | 22 ++++++ 9 files changed, 170 insertions(+), 75 deletions(-) diff --git a/internal/context/submission/domain/run/event_create_handler.go b/internal/context/submission/domain/run/event_create_handler.go index 2519eb9..9690083 100644 --- a/internal/context/submission/domain/run/event_create_handler.go +++ b/internal/context/submission/domain/run/event_create_handler.go @@ -3,8 +3,11 @@ package run import ( "context" "encoding/json" + "errors" "fmt" + "gorm.io/gorm" "strings" + "time" "github.com/Bio-OS/bioos/internal/context/submission/domain/submission" "github.com/Bio-OS/bioos/internal/context/workspace/infrastructure/eventbus" @@ -48,6 +51,16 @@ func (e *EventHandlerCreateRuns) Handle(ctx context.Context, event *submission.E } for _, run := range runList { + // try to call caching + hitCache, err := e.CallCaching(ctx, run, event) + if err != nil { + return err + } + // hit cache: skip publish submitRun event + if hitCache { + continue + } + // public submit run if err = e.Publish(ctx, run, event); err != nil { return err @@ -57,6 +70,51 @@ func (e *EventHandlerCreateRuns) Handle(ctx context.Context, event *submission.E return nil } +func (e *EventHandlerCreateRuns) CallCaching(ctx context.Context, run *Run, event *submission.EventCreateRuns) (bool, error) { + // check user open cache + readFromCache := event.RunConfig.WorkflowEngineParameters["read_from_cache"].(bool) + if !readFromCache { + return false, nil + } + + // find the same input run from history succeeded + samRun, err := e.runRepo.FindSameRun(ctx, run) + if err != nil { + // if not found, directly return false + if !errors.Is(err, gorm.ErrRecordNotFound) { + return false, err + } + return false, nil + } + // copy params from history run + e.copyOutput(samRun, run) + // save + if err := e.runRepo.Save(ctx, run); err != nil { + return false, apperrors.NewInternalError(err) + } + applog.Infof("callCaching success, from %s to %S", samRun.ID, run.ID) + + // publish sync submission event + eventSyncSubmission := submission.NewSyncSubmissionEvent(event.SubmissionID) + if err := e.eventBus.Publish(ctx, eventSyncSubmission); err != nil { + return false, apperrors.NewInternalError(err) + } + return true, nil +} + +func (e *EventHandlerCreateRuns) copyOutput(from, to *Run) { + now := time.Now() + // set this run hit cache + to.Cache = true + // copy other params + to.Outputs = from.Outputs + to.EngineRunID = from.EngineRunID + to.Status = from.Status + to.Log = from.Log + to.Message = from.Message + to.FinishTime = &now +} + func (e *EventHandlerCreateRuns) genRunList(ctx context.Context, dataList *dataList, event *submission.EventCreateRuns) ([]*Run, error) { res := make([]*Run, 0) // only contains ws data model implement submissionType is filePath @@ -74,10 +132,11 @@ func (e *EventHandlerCreateRuns) genRunList(ctx context.Context, dataList *dataL return nil, err } tempRun, err := e.runFactory.CreateWithRunParam(CreateRunParam{ - SubmissionID: event.SubmissionID, - Name: name, - Inputs: inputStr, - Status: consts.RunPending, + SubmissionID: event.SubmissionID, + Name: name, + Inputs: inputStr, + Status: consts.RunPending, + WorkflowVersionID: event.RunConfig.WorkflowVersionID, }) if err != nil { return nil, err @@ -94,10 +153,11 @@ func (e *EventHandlerCreateRuns) genRunList(ctx context.Context, dataList *dataL return nil, err } tempRun, err := e.runFactory.CreateWithRunParam(CreateRunParam{ - SubmissionID: event.SubmissionID, - Name: rowID, - Inputs: inputStr, - Status: consts.RunPending, + SubmissionID: event.SubmissionID, + Name: rowID, + Inputs: inputStr, + Status: consts.RunPending, + WorkflowVersionID: event.RunConfig.WorkflowVersionID, }) if err != nil { return nil, err diff --git a/internal/context/submission/domain/run/factory.go b/internal/context/submission/domain/run/factory.go index d315f26..625d9e5 100644 --- a/internal/context/submission/domain/run/factory.go +++ b/internal/context/submission/domain/run/factory.go @@ -8,17 +8,18 @@ import ( ) type CreateRunParam struct { - ID string - Name string - SubmissionID string - Inputs map[string]interface{} `gorm:"serializer:json"` - Outputs *map[string]interface{} `gorm:"serializer:json"` - EngineRunID string - Status string - Log *string - Message *string - StartTime time.Time - FinishTime *time.Time + ID string + Name string + SubmissionID string + WorkflowVersionID string + Inputs map[string]interface{} `gorm:"serializer:json"` + Outputs *map[string]interface{} `gorm:"serializer:json"` + EngineRunID string + Status string + Log *string + Message *string + StartTime time.Time + FinishTime *time.Time } type CreateTaskParam struct { @@ -60,17 +61,18 @@ func (fac *Factory) CreateWithRunParam(param CreateRunParam) (*Run, error) { } return &Run{ - ID: param.ID, - Name: param.Name, - SubmissionID: param.SubmissionID, - Inputs: param.Inputs, - Outputs: param.Outputs, - EngineRunID: param.EngineRunID, - Status: param.Status, - Log: param.Log, - Message: param.Message, - StartTime: param.StartTime, - FinishTime: param.FinishTime, + ID: param.ID, + Name: param.Name, + SubmissionID: param.SubmissionID, + WorkflowVersionID: param.WorkflowVersionID, + Inputs: param.Inputs, + Outputs: param.Outputs, + EngineRunID: param.EngineRunID, + Status: param.Status, + Log: param.Log, + Message: param.Message, + StartTime: param.StartTime, + FinishTime: param.FinishTime, }, nil } diff --git a/internal/context/submission/domain/run/object.go b/internal/context/submission/domain/run/object.go index b77d82d..f3d7e4d 100644 --- a/internal/context/submission/domain/run/object.go +++ b/internal/context/submission/domain/run/object.go @@ -10,18 +10,20 @@ import ( // Run ... type Run struct { - ID string - Name string - SubmissionID string - Inputs map[string]interface{} `gorm:"serializer:json"` - Outputs *map[string]interface{} `gorm:"serializer:json"` - EngineRunID string - Status string - Log *string - Message *string - StartTime time.Time - FinishTime *time.Time - Tasks []*Task + ID string + Name string + SubmissionID string + WorkflowVersionID string + Cache bool + Inputs map[string]interface{} `gorm:"serializer:json"` + Outputs *map[string]interface{} `gorm:"serializer:json"` + EngineRunID string + Status string + Log *string + Message *string + StartTime time.Time + FinishTime *time.Time + Tasks []*Task } // Task ... diff --git a/internal/context/submission/domain/run/repository.go b/internal/context/submission/domain/run/repository.go index da55d58..72d1be6 100644 --- a/internal/context/submission/domain/run/repository.go +++ b/internal/context/submission/domain/run/repository.go @@ -7,4 +7,5 @@ type Repository interface { Save(ctx context.Context, r *Run) error Get(ctx context.Context, id string) (*Run, error) Delete(ctx context.Context, r *Run) error + FindSameRun(ctx context.Context, w *Run) (*Run, error) } diff --git a/internal/context/submission/domain/submission/event.go b/internal/context/submission/domain/submission/event.go index 0a7d9bf..3351f77 100644 --- a/internal/context/submission/domain/submission/event.go +++ b/internal/context/submission/domain/submission/event.go @@ -161,6 +161,7 @@ type RunConfig struct { MainWorkflowFilePath string WorkflowEngineParameters map[string]interface{} Version string + WorkflowVersionID string } func NewEventCreateRuns(workspaceID, submissionID, submissionType string, inputs, outputs map[string]interface{}, dataModelID *string, DataModelRowIDs []string, runConfig *RunConfig) *EventCreateRuns { diff --git a/internal/context/submission/domain/submission/event_create_handler.go b/internal/context/submission/domain/submission/event_create_handler.go index 3e8c589..3386a0a 100644 --- a/internal/context/submission/domain/submission/event_create_handler.go +++ b/internal/context/submission/domain/submission/event_create_handler.go @@ -71,6 +71,7 @@ func (h *CreateHandler) genCreateRunEvent(ctx context.Context, event *CreateEven WorkflowContents: files, WorkflowEngineParameters: workflowEngineParameters, Version: getWorkflowVersionResp.Version.LanguageVersion, + WorkflowVersionID: getWorkflowVersionResp.Version.Id, }), nil } diff --git a/internal/context/submission/infrastructure/persistence/run/sql/mapper.go b/internal/context/submission/infrastructure/persistence/run/sql/mapper.go index f336fa6..6523bec 100644 --- a/internal/context/submission/infrastructure/persistence/run/sql/mapper.go +++ b/internal/context/submission/infrastructure/persistence/run/sql/mapper.go @@ -71,17 +71,19 @@ func StatusCountPOToStatusCountDTO(count *StatusCount) *query.StatusCount { func RunPOToRunDO(runPO *Run) *run.Run { return &run.Run{ - ID: runPO.ID, - Name: runPO.Name, - SubmissionID: runPO.SubmissionID, - Inputs: runPO.Inputs, - Outputs: runPO.Outputs, - EngineRunID: runPO.EngineRunID, - Status: runPO.Status, - Log: runPO.Log, - Message: runPO.Message, - StartTime: runPO.StartTime, - FinishTime: runPO.FinishTime, + ID: runPO.ID, + Name: runPO.Name, + SubmissionID: runPO.SubmissionID, + WorkflowVersionID: runPO.WorkflowVersionID, + Cache: runPO.Cache, + Inputs: runPO.Inputs, + Outputs: runPO.Outputs, + EngineRunID: runPO.EngineRunID, + Status: runPO.Status, + Log: runPO.Log, + Message: runPO.Message, + StartTime: runPO.StartTime, + FinishTime: runPO.FinishTime, } } @@ -104,16 +106,18 @@ func RunDOToTaskPOList(runDO *run.Run) []*Task { func RunDOToRunPO(runDO *run.Run) *Run { return &Run{ - ID: runDO.ID, - Name: runDO.Name, - SubmissionID: runDO.SubmissionID, - Inputs: runDO.Inputs, - Outputs: runDO.Outputs, - EngineRunID: runDO.EngineRunID, - Status: runDO.Status, - Log: runDO.Log, - Message: runDO.Message, - StartTime: runDO.StartTime, - FinishTime: runDO.FinishTime, + ID: runDO.ID, + Name: runDO.Name, + SubmissionID: runDO.SubmissionID, + WorkflowVersionID: runDO.WorkflowVersionID, + Cache: runDO.Cache, + Inputs: runDO.Inputs, + Outputs: runDO.Outputs, + EngineRunID: runDO.EngineRunID, + Status: runDO.Status, + Log: runDO.Log, + Message: runDO.Message, + StartTime: runDO.StartTime, + FinishTime: runDO.FinishTime, } } diff --git a/internal/context/submission/infrastructure/persistence/run/sql/po.go b/internal/context/submission/infrastructure/persistence/run/sql/po.go index ede2703..b71642b 100644 --- a/internal/context/submission/infrastructure/persistence/run/sql/po.go +++ b/internal/context/submission/infrastructure/persistence/run/sql/po.go @@ -6,17 +6,19 @@ import ( // Run ... type Run struct { - ID string - Name string `gorm:"type:varchar(200);not null;uniqueIndex:sub_run"` - SubmissionID string `gorm:"type:varchar(32);not null;uniqueIndex:sub_run"` - Inputs map[string]interface{} `gorm:"serializer:json"` - Outputs *map[string]interface{} `gorm:"serializer:json"` - EngineRunID string `gorm:"type:varchar(128);not null"` - Status string `gorm:"type:varchar(32);not null"` - Log *string `gorm:"type:longtext"` - Message *string `gorm:"type:longtext"` - StartTime time.Time - FinishTime *time.Time + ID string + Name string `gorm:"type:varchar(200);not null;uniqueIndex:sub_run"` + SubmissionID string `gorm:"type:varchar(32);not null;uniqueIndex:sub_run"` + WorkflowVersionID string `gorm:"type:varchar(32)"` + Cache bool `gorm:"type:bool"` + Inputs map[string]interface{} `gorm:"serializer:json"` + Outputs *map[string]interface{} `gorm:"serializer:json"` + EngineRunID string `gorm:"type:varchar(128);not null"` + Status string `gorm:"type:varchar(32);not null"` + Log *string `gorm:"type:longtext"` + Message *string `gorm:"type:longtext"` + StartTime time.Time + FinishTime *time.Time } func (r *Run) TableName() string { diff --git a/internal/context/submission/infrastructure/persistence/run/sql/repository.go b/internal/context/submission/infrastructure/persistence/run/sql/repository.go index ce786f9..0d552db 100644 --- a/internal/context/submission/infrastructure/persistence/run/sql/repository.go +++ b/internal/context/submission/infrastructure/persistence/run/sql/repository.go @@ -2,7 +2,9 @@ package sql import ( "context" + "encoding/json" "errors" + "github.com/Bio-OS/bioos/pkg/consts" "gorm.io/gorm" "gorm.io/gorm/clause" @@ -82,3 +84,23 @@ func (r *runRepository) Delete(ctx context.Context, w *run.Run) error { return nil }) } + +func (r *runRepository) FindSameRun(ctx context.Context, w *run.Run) (*run.Run, error) { + runPO := RunDOToRunPO(w) + + inputs, err := json.Marshal(runPO.Inputs) + if err != nil { + return nil, err + } + + var sameRunPo Run + err = r.db.WithContext(ctx). + Where("workflow_version_id = ?", runPO.WorkflowVersionID). + Where("inputs = ?", string(inputs)). + Where("status = ?", consts.RunSucceeded). + First(&sameRunPo).Error + if err != nil { + return nil, err + } + return RunPOToRunDO(&sameRunPo), nil +}