Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions engine/api/workflow/execute_node_job_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,10 @@ func PrepareSpawnInfos(infos []sdk.SpawnInfo) []sdk.SpawnInfo {
prepared := []sdk.SpawnInfo{}
for _, info := range infos {
prepared = append(prepared, sdk.SpawnInfo{
APITime: now,
RemoteTime: info.RemoteTime,
Message: info.Message,
APITime: now,
RemoteTime: info.RemoteTime,
Message: info.Message,
UserMessage: info.Message.DefaultUserMessage(),
})
}
return prepared
Expand Down
15 changes: 9 additions & 6 deletions engine/api/workflow/execute_node_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,16 +529,19 @@ jobLoop:
}
msg.Args = []interface{}{sdk.Cause(e).Error()}
wjob.SpawnInfos = append(wjob.SpawnInfos, sdk.SpawnInfo{
APITime: time.Now(),
Message: msg,
RemoteTime: time.Now(),
APITime: time.Now(),
Message: msg,
RemoteTime: time.Now(),
UserMessage: msg.DefaultUserMessage(),
})
}
} else {
sp := sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobInQueue.ID}
wjob.SpawnInfos = []sdk.SpawnInfo{{
APITime: time.Now(),
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobInQueue.ID},
RemoteTime: time.Now(),
APITime: time.Now(),
Message: sp,
RemoteTime: time.Now(),
UserMessage: sp.DefaultUserMessage(),
}}
}

Expand Down
9 changes: 5 additions & 4 deletions engine/api/workflow/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,11 @@ func checkCondition(ctx context.Context, wr *sdk.WorkflowRun, conditions sdk.Wor
func AddWorkflowRunInfo(run *sdk.WorkflowRun, infos ...sdk.SpawnMsg) {
for _, i := range infos {
run.Infos = append(run.Infos, sdk.WorkflowRunInfo{
APITime: time.Now(),
Message: i,
Type: i.Type,
SubNumber: run.LastSubNumber,
APITime: time.Now(),
Message: i,
Type: i.Type,
SubNumber: run.LastSubNumber,
UserMessage: i.DefaultUserMessage(),
})
}
}
Expand Down
20 changes: 10 additions & 10 deletions engine/api/workflow/run_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,14 +589,14 @@ queueRun:
t.FailNow()
}

sp := sdk.SpawnMsg{ID: sdk.MsgSpawnInfoHatcheryStarts.ID}
//AddSpawnInfosNodeJobRun
err := workflow.AddSpawnInfosNodeJobRun(db, j.WorkflowNodeRunID, j.ID, []sdk.SpawnInfo{
{
APITime: time.Now(),
RemoteTime: time.Now(),
Message: sdk.SpawnMsg{
ID: sdk.MsgSpawnInfoHatcheryStarts.ID,
},
APITime: time.Now(),
RemoteTime: time.Now(),
Message: sp,
UserMessage: sp.DefaultUserMessage(),
},
})
assert.NoError(t, err)
Expand All @@ -605,15 +605,15 @@ queueRun:
t.FailNow()
}

sp = sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobTaken.ID}
//TakeNodeJobRun
takenJobID := j.ID
takenJob, _, _ := workflow.TakeNodeJobRun(context.TODO(), db, cache, *proj, takenJobID, "model", "worker", "1", []sdk.SpawnInfo{
{
APITime: time.Now(),
RemoteTime: time.Now(),
Message: sdk.SpawnMsg{
ID: sdk.MsgSpawnInfoJobTaken.ID,
},
APITime: time.Now(),
RemoteTime: time.Now(),
Message: sp,
UserMessage: sp.DefaultUserMessage(),
},
}, "hatchery_name")

Expand Down
18 changes: 12 additions & 6 deletions engine/api/workflow_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,18 @@ func takeJob(ctx context.Context, dbFunc func() *gorp.DbMap, store cache.Store,
defer tx.Rollback() // nolint

//Prepare spawn infos
m1 := sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobTaken.ID, Args: []interface{}{fmt.Sprintf("%d", id), wk.Name}}
m2 := sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobTakenWorkerVersion.ID, Args: []interface{}{wk.Name, wk.Version, wk.OS, wk.Arch}}
infos := []sdk.SpawnInfo{
{
RemoteTime: getRemoteTime(ctx),
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobTaken.ID, Args: []interface{}{fmt.Sprintf("%d", id), wk.Name}},
RemoteTime: getRemoteTime(ctx),
Message: m1,
UserMessage: m1.DefaultUserMessage(),
},
{
RemoteTime: getRemoteTime(ctx),
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobTakenWorkerVersion.ID, Args: []interface{}{wk.Name, wk.Version, wk.OS, wk.Arch}},
RemoteTime: getRemoteTime(ctx),
Message: m2,
UserMessage: m2.DefaultUserMessage(),
},
}

Expand Down Expand Up @@ -444,9 +448,11 @@ func postJobResult(ctx context.Context, dbFunc func(context.Context) *gorp.DbMap
observability.Tag(observability.TagWorkflowNodeRun, job.WorkflowNodeRunID),
observability.Tag(observability.TagJob, job.Job.Action.Name))

msg := sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerEnd.ID, Args: []interface{}{wr.Name, res.Duration}}
infos := []sdk.SpawnInfo{{
RemoteTime: res.RemoteTime,
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerEnd.ID, Args: []interface{}{wr.Name, res.Duration}},
RemoteTime: res.RemoteTime,
Message: msg,
UserMessage: msg.DefaultUserMessage(),
}}

if err := workflow.AddSpawnInfosNodeJobRun(tx, job.WorkflowNodeRunID, job.ID, workflow.PrepareSpawnInfos(infos)); err != nil {
Expand Down
21 changes: 13 additions & 8 deletions engine/api/workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,10 @@ func stopWorkflowRun(ctx context.Context, dbFunc func() *gorp.DbMap, store cache
spwnMsg := sdk.SpawnMsg{ID: sdk.MsgWorkflowNodeStop.ID, Args: []interface{}{ident.GetUsername()}, Type: sdk.MsgWorkflowNodeStop.Type}

stopInfos := sdk.SpawnInfo{
APITime: time.Now(),
RemoteTime: time.Now(),
Message: spwnMsg,
APITime: time.Now(),
RemoteTime: time.Now(),
Message: spwnMsg,
UserMessage: spwnMsg.DefaultUserMessage(),
}

workflow.AddWorkflowRunInfo(run, spwnMsg)
Expand Down Expand Up @@ -685,10 +686,12 @@ func (api *API) stopWorkflowNodeRunHandler() service.Handler {
return sdk.WrapError(err, "unable to load workflow node run with id %d for workflow %s and run with number %d", workflowNodeRunID, workflowName, workflowRun.Number)
}

sp := sdk.SpawnMsg{ID: sdk.MsgWorkflowNodeStop.ID, Args: []interface{}{getAPIConsumer(ctx).GetUsername()}}
report, err := workflow.StopWorkflowNodeRun(ctx, api.mustDB, api.Cache, *p, *workflowRun, *workflowNodeRun, sdk.SpawnInfo{
APITime: time.Now(),
RemoteTime: time.Now(),
Message: sdk.SpawnMsg{ID: sdk.MsgWorkflowNodeStop.ID, Args: []interface{}{getAPIConsumer(ctx).GetUsername()}},
APITime: time.Now(),
RemoteTime: time.Now(),
Message: sp,
UserMessage: sp.DefaultUserMessage(),
})
if err != nil {
return sdk.WrapError(err, "unable to stop workflow node run")
Expand Down Expand Up @@ -1238,8 +1241,10 @@ func (api *API) getWorkflowNodeRunJobSpawnInfosHandler() service.Handler {

l := r.Header.Get("Accept-Language")
for ki, info := range spawnInfos {
m := sdk.NewMessage(sdk.Messages[info.Message.ID], info.Message.Args...)
spawnInfos[ki].UserMessage = m.String(l)
if _, ok := sdk.Messages[info.Message.ID]; ok {
m := sdk.NewMessage(sdk.Messages[info.Message.ID], info.Message.Args...)
spawnInfos[ki].UserMessage = m.String(l)
}
}
return service.WriteJSON(w, spawnInfos, http.StatusOK)
}
Expand Down
12 changes: 8 additions & 4 deletions engine/worker/internal/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,11 @@ func processBookedWJob(ctx context.Context, w *CurrentWorker, wjobs chan<- sdk.W
for _, r := range errRequirements {
details += fmt.Sprintf(" %s(%s)", r.Value, r.Type)
}
sp := sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerForJobError.ID, Args: []interface{}{w.Name(), details}}
infos := []sdk.SpawnInfo{{
RemoteTime: time.Now(),
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerForJobError.ID, Args: []interface{}{w.Name(), details}},
RemoteTime: time.Now(),
Message: sp,
UserMessage: sp.DefaultUserMessage(),
}}
if err := w.Client().QueueJobSendSpawnInfo(ctx, wjob.ID, infos); err != nil {
return sdk.WrapError(err, "Cannot record QueueJobSendSpawnInfo for job (err spawn): %d", wjob.ID)
Expand All @@ -166,9 +168,11 @@ func processBookedWJob(ctx context.Context, w *CurrentWorker, wjobs chan<- sdk.W
if !pluginsOK {
var details = errPlugins.Error()

sp := sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerForJobError.ID, Args: []interface{}{w.Name(), details}}
infos := []sdk.SpawnInfo{{
RemoteTime: time.Now(),
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerForJobError.ID, Args: []interface{}{w.Name(), details}},
RemoteTime: time.Now(),
Message: sp,
UserMessage: sp.DefaultUserMessage(),
}}
if err := w.Client().QueueJobSendSpawnInfo(ctx, wjob.ID, infos); err != nil {
return sdk.WrapError(err, "Cannot record QueueJobSendSpawnInfo for job (err spawn): %d", wjob.ID)
Expand Down
6 changes: 4 additions & 2 deletions engine/worker/internal/take.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,11 @@ func (w *CurrentWorker) Take(ctx context.Context, job sdk.WorkflowNodeJobRun) er

// Send the reason as a spawninfo
if res.Status != sdk.StatusSuccess && res.Reason != "" {
sp := sdk.SpawnMsg{ID: sdk.MsgWorkflowError.ID, Args: []interface{}{res.Reason}}
infos := []sdk.SpawnInfo{{
RemoteTime: time.Now(),
Message: sdk.SpawnMsg{ID: sdk.MsgWorkflowError.ID, Args: []interface{}{res.Reason}},
RemoteTime: time.Now(),
Message: sp,
UserMessage: sp.DefaultUserMessage(),
}}
if err := w.Client().QueueJobSendSpawnInfo(ctx, job.ID, infos); err != nil {
log.Error(ctx, "processJob> Unable to send spawn info: %v", err)
Expand Down
9 changes: 9 additions & 0 deletions sdk/build.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sdk

import (
"fmt"
"time"
)

Expand All @@ -20,6 +21,14 @@ type SpawnMsg struct {
Type string `json:"type" db:"-"`
}

func (s SpawnMsg) DefaultUserMessage() string {
if _, ok := Messages[s.ID]; ok {
m := Messages[s.ID]
return fmt.Sprintf(m.Format[EN], s.Args...)
}
return ""
}

// ExecutedJob represents a running job
type ExecutedJob struct {
Job
Expand Down
2 changes: 1 addition & 1 deletion sdk/hatchery/hatchery.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func SendSpawnInfo(ctx context.Context, h Interface, jobID int64, spawnMsg sdk.S
if h.CDSClient() == nil || jobID == 0 {
return
}
infos := []sdk.SpawnInfo{{RemoteTime: time.Now(), Message: spawnMsg}}
infos := []sdk.SpawnInfo{{RemoteTime: time.Now(), Message: spawnMsg, UserMessage: spawnMsg.DefaultUserMessage()}}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := h.CDSClient().QueueJobSendSpawnInfo(ctx, jobID, infos); err != nil {
Expand Down
12 changes: 8 additions & 4 deletions sdk/workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,10 @@ type WorkflowRunNumber struct {
// Translate translates messages in WorkflowNodeRun
func (r *WorkflowRun) Translate(lang string) {
for ki, info := range r.Infos {
m := NewMessage(Messages[info.Message.ID], info.Message.Args...)
r.Infos[ki].UserMessage = m.String(lang)
if _, ok := Messages[info.Message.ID]; ok {
m := NewMessage(Messages[info.Message.ID], info.Message.Args...)
r.Infos[ki].UserMessage = m.String(lang)
}
}
}

Expand Down Expand Up @@ -419,8 +421,10 @@ type WorkflowNodeJobRunInfo struct {
// Translate translates messages in WorkflowNodeJobRun
func (wnjr *WorkflowNodeJobRun) Translate(lang string) {
for ki, info := range wnjr.SpawnInfos {
m := NewMessage(Messages[info.Message.ID], info.Message.Args...)
wnjr.SpawnInfos[ki].UserMessage = m.String(lang)
if _, ok := Messages[info.Message.ID]; ok {
m := NewMessage(Messages[info.Message.ID], info.Message.Args...)
wnjr.SpawnInfos[ki].UserMessage = m.String(lang)
}
}
}

Expand Down