Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 68 additions & 8 deletions internal/context/submission/domain/run/event_create_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package run
import (
"context"
"encoding/json"
"errors"
"fmt"
"gorm.io/gorm"
"strings"
"time"

"github.com/Bio-OS/bioos/internal/context/submission/domain/submission"
"github.com/Bio-OS/bioos/internal/context/workspace/infrastructure/eventbus"
Expand Down Expand Up @@ -48,6 +51,16 @@ func (e *EventHandlerCreateRuns) Handle(ctx context.Context, event *submission.E
}

for _, run := range runList {
// try to call caching
hitCache, err := e.CallCaching(ctx, run, event)
if err != nil {
return err
}
// hit cache: skip publish submitRun event
if hitCache {
continue
}

// public submit run
if err = e.Publish(ctx, run, event); err != nil {
return err
Expand All @@ -57,6 +70,51 @@ func (e *EventHandlerCreateRuns) Handle(ctx context.Context, event *submission.E
return nil
}

func (e *EventHandlerCreateRuns) CallCaching(ctx context.Context, run *Run, event *submission.EventCreateRuns) (bool, error) {
// check user open cache
readFromCache := event.RunConfig.WorkflowEngineParameters["read_from_cache"].(bool)
if !readFromCache {
return false, nil
}

// find the same input run from history succeeded
samRun, err := e.runRepo.FindSameRun(ctx, run)
if err != nil {
// if not found, directly return false
if !errors.Is(err, gorm.ErrRecordNotFound) {
return false, err
}
return false, nil
}
// copy params from history run
e.copyOutput(samRun, run)
// save
if err := e.runRepo.Save(ctx, run); err != nil {
return false, apperrors.NewInternalError(err)
}
applog.Infof("callCaching success, from %s to %S", samRun.ID, run.ID)

// publish sync submission event
eventSyncSubmission := submission.NewSyncSubmissionEvent(event.SubmissionID)
if err := e.eventBus.Publish(ctx, eventSyncSubmission); err != nil {
return false, apperrors.NewInternalError(err)
}
return true, nil
}

func (e *EventHandlerCreateRuns) copyOutput(from, to *Run) {
now := time.Now()
// set this run hit cache
to.Cache = true
// copy other params
to.Outputs = from.Outputs
to.EngineRunID = from.EngineRunID
to.Status = from.Status
to.Log = from.Log
to.Message = from.Message
to.FinishTime = &now
}

func (e *EventHandlerCreateRuns) genRunList(ctx context.Context, dataList *dataList, event *submission.EventCreateRuns) ([]*Run, error) {
res := make([]*Run, 0)
// only contains ws data model implement submissionType is filePath
Expand All @@ -74,10 +132,11 @@ func (e *EventHandlerCreateRuns) genRunList(ctx context.Context, dataList *dataL
return nil, err
}
tempRun, err := e.runFactory.CreateWithRunParam(CreateRunParam{
SubmissionID: event.SubmissionID,
Name: name,
Inputs: inputStr,
Status: consts.RunPending,
SubmissionID: event.SubmissionID,
Name: name,
Inputs: inputStr,
Status: consts.RunPending,
WorkflowVersionID: event.RunConfig.WorkflowVersionID,
})
if err != nil {
return nil, err
Expand All @@ -94,10 +153,11 @@ func (e *EventHandlerCreateRuns) genRunList(ctx context.Context, dataList *dataL
return nil, err
}
tempRun, err := e.runFactory.CreateWithRunParam(CreateRunParam{
SubmissionID: event.SubmissionID,
Name: rowID,
Inputs: inputStr,
Status: consts.RunPending,
SubmissionID: event.SubmissionID,
Name: rowID,
Inputs: inputStr,
Status: consts.RunPending,
WorkflowVersionID: event.RunConfig.WorkflowVersionID,
})
if err != nil {
return nil, err
Expand Down
46 changes: 24 additions & 22 deletions internal/context/submission/domain/run/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,18 @@ import (
)

type CreateRunParam struct {
ID string
Name string
SubmissionID string
Inputs map[string]interface{} `gorm:"serializer:json"`
Outputs *map[string]interface{} `gorm:"serializer:json"`
EngineRunID string
Status string
Log *string
Message *string
StartTime time.Time
FinishTime *time.Time
ID string
Name string
SubmissionID string
WorkflowVersionID string
Inputs map[string]interface{} `gorm:"serializer:json"`
Outputs *map[string]interface{} `gorm:"serializer:json"`
EngineRunID string
Status string
Log *string
Message *string
StartTime time.Time
FinishTime *time.Time
}

type CreateTaskParam struct {
Expand Down Expand Up @@ -60,17 +61,18 @@ func (fac *Factory) CreateWithRunParam(param CreateRunParam) (*Run, error) {
}

return &Run{
ID: param.ID,
Name: param.Name,
SubmissionID: param.SubmissionID,
Inputs: param.Inputs,
Outputs: param.Outputs,
EngineRunID: param.EngineRunID,
Status: param.Status,
Log: param.Log,
Message: param.Message,
StartTime: param.StartTime,
FinishTime: param.FinishTime,
ID: param.ID,
Name: param.Name,
SubmissionID: param.SubmissionID,
WorkflowVersionID: param.WorkflowVersionID,
Inputs: param.Inputs,
Outputs: param.Outputs,
EngineRunID: param.EngineRunID,
Status: param.Status,
Log: param.Log,
Message: param.Message,
StartTime: param.StartTime,
FinishTime: param.FinishTime,
}, nil
}

Expand Down
26 changes: 14 additions & 12 deletions internal/context/submission/domain/run/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,20 @@ import (

// Run ...
type Run struct {
ID string
Name string
SubmissionID string
Inputs map[string]interface{} `gorm:"serializer:json"`
Outputs *map[string]interface{} `gorm:"serializer:json"`
EngineRunID string
Status string
Log *string
Message *string
StartTime time.Time
FinishTime *time.Time
Tasks []*Task
ID string
Name string
SubmissionID string
WorkflowVersionID string
Cache bool
Inputs map[string]interface{} `gorm:"serializer:json"`
Outputs *map[string]interface{} `gorm:"serializer:json"`
EngineRunID string
Status string
Log *string
Message *string
StartTime time.Time
FinishTime *time.Time
Tasks []*Task
}

// Task ...
Expand Down
1 change: 1 addition & 0 deletions internal/context/submission/domain/run/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ type Repository interface {
Save(ctx context.Context, r *Run) error
Get(ctx context.Context, id string) (*Run, error)
Delete(ctx context.Context, r *Run) error
FindSameRun(ctx context.Context, w *Run) (*Run, error)
}
1 change: 1 addition & 0 deletions internal/context/submission/domain/submission/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ type RunConfig struct {
MainWorkflowFilePath string
WorkflowEngineParameters map[string]interface{}
Version string
WorkflowVersionID string
}

func NewEventCreateRuns(workspaceID, submissionID, submissionType string, inputs, outputs map[string]interface{}, dataModelID *string, DataModelRowIDs []string, runConfig *RunConfig) *EventCreateRuns {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (h *CreateHandler) genCreateRunEvent(ctx context.Context, event *CreateEven
WorkflowContents: files,
WorkflowEngineParameters: workflowEngineParameters,
Version: getWorkflowVersionResp.Version.LanguageVersion,
WorkflowVersionID: getWorkflowVersionResp.Version.Id,
}), nil

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,19 @@ func StatusCountPOToStatusCountDTO(count *StatusCount) *query.StatusCount {

func RunPOToRunDO(runPO *Run) *run.Run {
return &run.Run{
ID: runPO.ID,
Name: runPO.Name,
SubmissionID: runPO.SubmissionID,
Inputs: runPO.Inputs,
Outputs: runPO.Outputs,
EngineRunID: runPO.EngineRunID,
Status: runPO.Status,
Log: runPO.Log,
Message: runPO.Message,
StartTime: runPO.StartTime,
FinishTime: runPO.FinishTime,
ID: runPO.ID,
Name: runPO.Name,
SubmissionID: runPO.SubmissionID,
WorkflowVersionID: runPO.WorkflowVersionID,
Cache: runPO.Cache,
Inputs: runPO.Inputs,
Outputs: runPO.Outputs,
EngineRunID: runPO.EngineRunID,
Status: runPO.Status,
Log: runPO.Log,
Message: runPO.Message,
StartTime: runPO.StartTime,
FinishTime: runPO.FinishTime,
}
}

Expand All @@ -104,16 +106,18 @@ func RunDOToTaskPOList(runDO *run.Run) []*Task {

func RunDOToRunPO(runDO *run.Run) *Run {
return &Run{
ID: runDO.ID,
Name: runDO.Name,
SubmissionID: runDO.SubmissionID,
Inputs: runDO.Inputs,
Outputs: runDO.Outputs,
EngineRunID: runDO.EngineRunID,
Status: runDO.Status,
Log: runDO.Log,
Message: runDO.Message,
StartTime: runDO.StartTime,
FinishTime: runDO.FinishTime,
ID: runDO.ID,
Name: runDO.Name,
SubmissionID: runDO.SubmissionID,
WorkflowVersionID: runDO.WorkflowVersionID,
Cache: runDO.Cache,
Inputs: runDO.Inputs,
Outputs: runDO.Outputs,
EngineRunID: runDO.EngineRunID,
Status: runDO.Status,
Log: runDO.Log,
Message: runDO.Message,
StartTime: runDO.StartTime,
FinishTime: runDO.FinishTime,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@ import (

// Run ...
type Run struct {
ID string
Name string `gorm:"type:varchar(200);not null;uniqueIndex:sub_run"`
SubmissionID string `gorm:"type:varchar(32);not null;uniqueIndex:sub_run"`
Inputs map[string]interface{} `gorm:"serializer:json"`
Outputs *map[string]interface{} `gorm:"serializer:json"`
EngineRunID string `gorm:"type:varchar(128);not null"`
Status string `gorm:"type:varchar(32);not null"`
Log *string `gorm:"type:longtext"`
Message *string `gorm:"type:longtext"`
StartTime time.Time
FinishTime *time.Time
ID string
Name string `gorm:"type:varchar(200);not null;uniqueIndex:sub_run"`
SubmissionID string `gorm:"type:varchar(32);not null;uniqueIndex:sub_run"`
WorkflowVersionID string `gorm:"type:varchar(32)"`
Cache bool `gorm:"type:bool"`
Inputs map[string]interface{} `gorm:"serializer:json"`
Outputs *map[string]interface{} `gorm:"serializer:json"`
EngineRunID string `gorm:"type:varchar(128);not null"`
Status string `gorm:"type:varchar(32);not null"`
Log *string `gorm:"type:longtext"`
Message *string `gorm:"type:longtext"`
StartTime time.Time
FinishTime *time.Time
}

func (r *Run) TableName() string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package sql

import (
"context"
"encoding/json"
"errors"
"github.com/Bio-OS/bioos/pkg/consts"

"gorm.io/gorm"
"gorm.io/gorm/clause"
Expand Down Expand Up @@ -82,3 +84,23 @@ func (r *runRepository) Delete(ctx context.Context, w *run.Run) error {
return nil
})
}

func (r *runRepository) FindSameRun(ctx context.Context, w *run.Run) (*run.Run, error) {
runPO := RunDOToRunPO(w)

inputs, err := json.Marshal(runPO.Inputs)
if err != nil {
return nil, err
}

var sameRunPo Run
err = r.db.WithContext(ctx).
Where("workflow_version_id = ?", runPO.WorkflowVersionID).
Where("inputs = ?", string(inputs)).
Where("status = ?", consts.RunSucceeded).
First(&sameRunPo).Error
if err != nil {
return nil, err
}
return RunPOToRunDO(&sameRunPo), nil
}