diff --git a/README.md b/README.md index d6b981b2..4ba0327f 100644 --- a/README.md +++ b/README.md @@ -205,6 +205,25 @@ health_check: --- +## Config-Driven Job Attributes (For Static or Runtime generated metadata) + +Surface extra links or text on a job in the UI with minimal change in plugin code - by declaring `attributes` on a cluster. Each attribute is a template that Heimdall renders after the job runs and stores in the job's `job_attributes`, which the UI then displays. Check local.yaml for the example. + +Each entry is `label → { kind, value }`: + +- **`kind`** tells the UI how to render `value`: `link` (clickable hyperlink) or `text`. +- **`value`** is a Go [`text/template`](https://pkg.go.dev/text/template) rendered over four namespaces: + - `.Job` — the job object (e.g. `.Job.ID`) + - `.Command` / `.Cluster` — the matched command's and cluster's `context` maps + - `.Outputs` — runtime values published by the plugin during execution + +Static attributes (built only from `.Job`, `.Command`, `.Cluster`) need no plugin changes. For runtime-discovered values, a plugin publishes to the outputs channel with a single call - for example, + +```go +job.SetOutput("some_runtime_metadata", value) +``` +--- + ## 👥 Credits **Heimdall** was created at **Pattern, Inc** by Stan Babourine, with contributions from Will Graham, Gaurav Warale and Josh Diaz. diff --git a/assets/databases/heimdall/tables/jobs.sql b/assets/databases/heimdall/tables/jobs.sql index 67aa6021..e373a83e 100644 --- a/assets/databases/heimdall/tables/jobs.sql +++ b/assets/databases/heimdall/tables/jobs.sql @@ -21,6 +21,7 @@ create table if not exists jobs alter table jobs add column if not exists store_result_sync boolean not null default false; alter table jobs add column if not exists canceled_by varchar(64) null; +alter table jobs add column if not exists job_attributes jsonb not null default '{}'::jsonb; -- Originally had "cancelled_by" column and "cancelling" status, but we aren't british. Whoops. do $$ begin diff --git a/configs/local.yaml b/configs/local.yaml index e5042686..11c535df 100644 --- a/configs/local.yaml +++ b/configs/local.yaml @@ -39,10 +39,19 @@ commands: # supported clusters clusters: - name: localhost-0.0.1 + attributes: + Test Link Label: + kind: link + value: "{{ .Cluster.test_url }}/remaining_static_or_runtime_generated_path" + Test Text Label: + kind: text + value: "static_or_runtime_generated_value_via_templating" status: active version: 0.0.1 description: Just a localhost health_check: true + context: + test_url: http://localhost:8080 tags: - type:localhost - - data:local \ No newline at end of file + - data:local diff --git a/internal/pkg/heimdall/job.go b/internal/pkg/heimdall/job.go index 1aefee49..4eca5916 100644 --- a/internal/pkg/heimdall/job.go +++ b/internal/pkg/heimdall/job.go @@ -6,10 +6,12 @@ import ( _ "embed" "encoding/json" "fmt" + "io" "math/big" "net/http" "os" "strings" + "text/template" "time" "github.com/babourine/x/pkg/set" @@ -42,6 +44,7 @@ var ( ErrCallerNotAllowed = fmt.Errorf(`caller is not allowed to run this command`) runJobMethod = telemetry.NewMethod("runJob", "heimdall") cancelJobMethod = telemetry.NewMethod("db_connection", "cancel_job") + renderAttributesMethod = telemetry.NewMethod("renderAttributes", "heimdall") ) //go:embed queries/job/status_cancel_update.sql @@ -52,6 +55,13 @@ type commandOnCluster struct { cluster *cluster.Cluster } +type attributeTemplateData struct { + Job *job.Job + Outputs map[string]string + Command map[string]any + Cluster map[string]any +} + func (h *Heimdall) submitJob(ctx context.Context, j *job.Job) (any, error) { // set / add job properties @@ -174,6 +184,8 @@ func (h *Heimdall) runJob(ctx context.Context, j *job.Job, command *command.Comm return nil } + renderJobAttributes(j, command, cluster) + // Handle plugin execution result (only if not canceled) if jobErr != nil { j.Status = jobStatus.Failed @@ -195,6 +207,47 @@ func (h *Heimdall) runJob(ctx context.Context, j *job.Job, command *command.Comm } +func renderJobAttributes(j *job.Job, command *command.Command, cluster *cluster.Cluster) { + + if len(cluster.Attributes) == 0 { + return + } + + data := attributeTemplateData{Job: j, Outputs: j.Outputs()} + if data.Outputs == nil { + data.Outputs = map[string]string{} + } + if command.Context != nil { + data.Command = map[string]any(*command.Context) + } + if cluster.Context != nil { + data.Cluster = map[string]any(*cluster.Context) + } + + for label, attr := range cluster.Attributes { + + parsed, err := template.New(`attribute`).Parse(attr.Value) + if err != nil { + renderAttributesMethod.LogAndCountError(fmt.Errorf("attribute %q: invalid template: %w", label, err)) + continue + } + + var value strings.Builder + if err := parsed.Option(`missingkey=error`).Execute(&value, data); err != nil { + if structuralErr := parsed.Option(`missingkey=zero`).Execute(io.Discard, data); structuralErr != nil { + renderAttributesMethod.LogAndCountError(fmt.Errorf("attribute %q: %w", label, structuralErr)) + } + continue + } + + if j.JobAttributes == nil { + j.JobAttributes = make(map[string]job.Attribute) + } + j.JobAttributes[label] = job.Attribute{Kind: attr.Kind, Value: value.String()} + } + +} + func (h *Heimdall) storeResults(runtime *plugin.Runtime, j *job.Job) error { // do we have result to be written? if j.Result == nil { diff --git a/internal/pkg/heimdall/job_dal.go b/internal/pkg/heimdall/job_dal.go index 142be6ae..f52528c6 100644 --- a/internal/pkg/heimdall/job_dal.go +++ b/internal/pkg/heimdall/job_dal.go @@ -213,10 +213,10 @@ func (h *Heimdall) getJob(ctx context.Context, j *jobRequest) (any, error) { }, } - var jobContext string + var jobContext, jobAttributes string if err := row.Scan(&r.SystemID, &r.Status, &r.Name, &r.Version, &r.Description, &jobContext, &r.Error, &r.User, &r.IsSync, - &r.CreatedAt, &r.UpdatedAt, &r.CommandID, &r.CommandName, &r.ClusterID, &r.ClusterName, &r.StoreResultSync, &r.CanceledBy); err != nil { + &r.CreatedAt, &r.UpdatedAt, &r.CommandID, &r.CommandName, &r.ClusterID, &r.ClusterName, &r.StoreResultSync, &r.CanceledBy, &jobAttributes); err != nil { if err == sql.ErrNoRows { return nil, ErrUnknownJobID } else { @@ -224,6 +224,11 @@ func (h *Heimdall) getJob(ctx context.Context, j *jobRequest) (any, error) { } } + if err := parseJobAttributes(r, jobAttributes); err != nil { + getJobMethod.LogAndCountError(err, "parse_job_attributes") + return nil, err + } + if err := jobParseContextAndTags(r, jobContext, sess); err != nil { getJobMethod.LogAndCountError(err, "job_parse_context_and_tags") return nil, err @@ -439,15 +444,20 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) for rows.Next() { - jobContext := `` + jobContext, jobAttributes := ``, `` r := &job.Job{} if err := rows.Scan(&r.SystemID, &r.ID, &r.Status, &r.Name, &r.Version, &r.Description, &jobContext, &r.Error, &r.User, &r.IsSync, - &r.CreatedAt, &r.UpdatedAt, &r.CommandID, &r.CommandName, &r.ClusterID, &r.ClusterName, &r.StoreResultSync, &r.CanceledBy); err != nil { + &r.CreatedAt, &r.UpdatedAt, &r.CommandID, &r.CommandName, &r.ClusterID, &r.ClusterName, &r.StoreResultSync, &r.CanceledBy, &jobAttributes); err != nil { getJobsMethod.LogAndCountError(err, "scan") return nil, err } + if err := parseJobAttributes(r, jobAttributes); err != nil { + getJobsMethod.LogAndCountError(err, "parse_job_attributes") + return nil, err + } + if err := jobParseContextAndTags(r, jobContext, sess); err != nil { getJobsMethod.LogAndCountError(err, "job_parse_context_and_tags") return nil, err @@ -522,6 +532,16 @@ func (h *Heimdall) getJobStatus(ctx context.Context, j *jobRequest) (any, error) } +func parseJobAttributes(j *job.Job, jobAttributes string) error { + if jobAttributes == `` || jobAttributes == `{}` { + return nil + } + if err := json.Unmarshal([]byte(jobAttributes), &j.JobAttributes); err != nil { + j.JobAttributes = nil + } + return nil +} + func jobParseContextAndTags(j *job.Job, jobContext string, sess *database.Session) (err error) { // ...and add job context diff --git a/internal/pkg/heimdall/jobs_async.go b/internal/pkg/heimdall/jobs_async.go index 0f2e82ab..750ac9b3 100644 --- a/internal/pkg/heimdall/jobs_async.go +++ b/internal/pkg/heimdall/jobs_async.go @@ -3,6 +3,7 @@ package heimdall import ( "context" _ "embed" + "encoding/json" "fmt" "time" @@ -35,6 +36,17 @@ var queryJobStatusUpdate string //go:embed queries/job/active_delete.sql var queryActiveJobDelete string +func jobAttributesJSON(j *job.Job) string { + if len(j.JobAttributes) == 0 { + return `` + } + data, err := json.Marshal(j.JobAttributes) + if err != nil { + return `` + } + return string(data) +} + func (h *Heimdall) getAsyncJobs(limit int) ([]*job.Job, error) { // Track DB connection for async jobs retrieval @@ -118,7 +130,7 @@ func (h *Heimdall) runAsyncJob(ctx context.Context, j *job.Job) error { } defer sess.Close() - if _, err := sess.Exec(queryJobStatusUpdate, status.Running, ``, j.SystemID); err != nil { + if _, err := sess.Exec(queryJobStatusUpdate, status.Running, ``, jobAttributesJSON(j), j.SystemID); err != nil { return h.updateAsyncJobStatus(j, err) } @@ -154,7 +166,7 @@ func (h *Heimdall) updateAsyncJobStatus(j *job.Job, jobError error) error { } defer sess.Close() - if _, err := sess.Exec(queryJobStatusUpdate, j.Status, j.Error, j.SystemID); err != nil { + if _, err := sess.Exec(queryJobStatusUpdate, j.Status, j.Error, jobAttributesJSON(j), j.SystemID); err != nil { // TODO: implement proper logging fmt.Println(`job status update error:`, err) } diff --git a/internal/pkg/heimdall/queries/job/select.sql b/internal/pkg/heimdall/queries/job/select.sql index 3019fcdd..fc0643da 100644 --- a/internal/pkg/heimdall/queries/job/select.sql +++ b/internal/pkg/heimdall/queries/job/select.sql @@ -15,7 +15,8 @@ select cl.cluster_id, cl.cluster_name, j.store_result_sync, - j.canceled_by + j.canceled_by, + j.job_attributes from jobs j left join commands cm on cm.system_command_id = j.job_command_id diff --git a/internal/pkg/heimdall/queries/job/select_jobs.sql b/internal/pkg/heimdall/queries/job/select_jobs.sql index 74b26ac2..2a2c9e99 100644 --- a/internal/pkg/heimdall/queries/job/select_jobs.sql +++ b/internal/pkg/heimdall/queries/job/select_jobs.sql @@ -16,7 +16,8 @@ select cl.cluster_id, cl.cluster_name, j.store_result_sync, - j.canceled_by + j.canceled_by, + j.job_attributes from jobs j join job_statuses js on js.job_status_id = j.job_status_id diff --git a/internal/pkg/heimdall/queries/job/status_update.sql b/internal/pkg/heimdall/queries/job/status_update.sql index 89786365..d044abaa 100644 --- a/internal/pkg/heimdall/queries/job/status_update.sql +++ b/internal/pkg/heimdall/queries/job/status_update.sql @@ -2,6 +2,7 @@ update jobs set job_status_id = $1, job_error = left($2::text, 1024), + job_attributes = coalesce(nullif($3::text, '')::jsonb, job_attributes), updated_at = extract(epoch from now())::int where - system_job_id = $3; + system_job_id = $4; diff --git a/internal/pkg/object/command/sparkeks/sparkeks.go b/internal/pkg/object/command/sparkeks/sparkeks.go index 4e44ab03..e497ef50 100644 --- a/internal/pkg/object/command/sparkeks/sparkeks.go +++ b/internal/pkg/object/command/sparkeks/sparkeks.go @@ -43,6 +43,7 @@ const ( statusReportInterval = 30 * time.Second defaultNamespace = "default" applicationPrefix = "spark-sql-job" + sparkApplicationIDOutput = "spark_application_id" awsRegionEnvVar = "AWS_REGION" s3Prefix = "s3://" s3aPrefix = "s3a://" @@ -929,6 +930,10 @@ func (e *executionContext) monitorJobAndCollectLogs(ctx context.Context) error { } finalSparkApp = sparkApp + + if id := sparkApp.Status.SparkApplicationID; id != "" { + e.job.SetOutput(sparkApplicationIDOutput, id) + } state := sparkApp.Status.AppState.State if time.Since(lastReport) >= statusReportInterval || isTerminalState(state) { diff --git a/pkg/object/cluster/cluster.go b/pkg/object/cluster/cluster.go index a31fc647..43009aec 100644 --- a/pkg/object/cluster/cluster.go +++ b/pkg/object/cluster/cluster.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/patterninc/heimdall/pkg/object" + "github.com/patterninc/heimdall/pkg/object/job" "github.com/patterninc/heimdall/pkg/object/status" "github.com/patterninc/heimdall/pkg/rbac" ) @@ -14,10 +15,11 @@ var ( type Cluster struct { object.Object `yaml:",inline" json:",inline"` - Status status.Status `yaml:"status,omitempty" json:"status,omitempty"` - HealthCheck bool `yaml:"health_check,omitempty" json:"health_check,omitempty"` - RBACNames []string `yaml:"rbacs,omitempty" json:"rbacs,omitempty"` - RBACs []rbac.RBAC `yaml:"-" json:"-"` + Status status.Status `yaml:"status,omitempty" json:"status,omitempty"` + HealthCheck bool `yaml:"health_check,omitempty" json:"health_check,omitempty"` + RBACNames []string `yaml:"rbacs,omitempty" json:"rbacs,omitempty"` + RBACs []rbac.RBAC `yaml:"-" json:"-"` + Attributes map[string]job.Attribute `yaml:"attributes,omitempty" json:"attributes,omitempty"` } type Clusters map[string]*Cluster diff --git a/pkg/object/job/job.go b/pkg/object/job/job.go index 2ca17142..59185836 100644 --- a/pkg/object/job/job.go +++ b/pkg/object/job/job.go @@ -10,19 +10,33 @@ import ( ) type Job struct { - object.Object `yaml:",inline" json:",inline"` - Status status.Status `yaml:"status,omitempty" json:"status,omitempty"` - IsSync bool `yaml:"is_sync,omitempty" json:"is_sync,omitempty"` - StoreResultSync bool `yaml:"store_result_sync,omitempty" json:"store_result_sync,omitempty"` - Error string `yaml:"error,omitempty" json:"error,omitempty"` - CommandCriteria *set.Set[string] `yaml:"command_criteria,omitempty" json:"command_criteria,omitempty"` - ClusterCriteria *set.Set[string] `yaml:"cluster_criteria,omitempty" json:"cluster_criteria,omitempty"` - CommandID string `yaml:"command_id,omitempty" json:"command_id,omitempty"` - CommandName string `yaml:"command_name,omitempty" json:"command_name,omitempty"` - ClusterID string `yaml:"cluster_id,omitempty" json:"cluster_id,omitempty"` - ClusterName string `yaml:"cluster_name,omitempty" json:"cluster_name,omitempty"` - CanceledBy string `yaml:"canceled_by,omitempty" json:"canceled_by,omitempty"` - Result *result.Result `yaml:"result,omitempty" json:"result,omitempty"` + object.Object `yaml:",inline" json:",inline"` + Status status.Status `yaml:"status,omitempty" json:"status,omitempty"` + IsSync bool `yaml:"is_sync,omitempty" json:"is_sync,omitempty"` + StoreResultSync bool `yaml:"store_result_sync,omitempty" json:"store_result_sync,omitempty"` + Error string `yaml:"error,omitempty" json:"error,omitempty"` + CommandCriteria *set.Set[string] `yaml:"command_criteria,omitempty" json:"command_criteria,omitempty"` + ClusterCriteria *set.Set[string] `yaml:"cluster_criteria,omitempty" json:"cluster_criteria,omitempty"` + CommandID string `yaml:"command_id,omitempty" json:"command_id,omitempty"` + CommandName string `yaml:"command_name,omitempty" json:"command_name,omitempty"` + ClusterID string `yaml:"cluster_id,omitempty" json:"cluster_id,omitempty"` + ClusterName string `yaml:"cluster_name,omitempty" json:"cluster_name,omitempty"` + CanceledBy string `yaml:"canceled_by,omitempty" json:"canceled_by,omitempty"` + JobAttributes map[string]Attribute `yaml:"job_attributes,omitempty" json:"job_attributes,omitempty"` + Result *result.Result `yaml:"result,omitempty" json:"result,omitempty"` + outputs map[string]string +} + +// Attribute kinds for JobAttributes. Kind tells the UI how to render Value. +const ( + AttributeKindLink = "link" + AttributeKindText = "text" +) + +// Attribute is a generic, plugin-agnostic value surfaced on a job in the UI. +type Attribute struct { + Kind string `yaml:"kind,omitempty" json:"kind,omitempty"` + Value string `yaml:"value,omitempty" json:"value,omitempty"` } func (j *Job) Init() error { @@ -41,3 +55,14 @@ func (j *Job) Init() error { return nil } + +func (j *Job) SetOutput(key, value string) { + if j.outputs == nil { + j.outputs = make(map[string]string) + } + j.outputs[key] = value +} + +func (j *Job) Outputs() map[string]string { + return j.outputs +} diff --git a/web/src/modules/Jobs/Helper.tsx b/web/src/modules/Jobs/Helper.tsx index 39a308d3..610b30f6 100644 --- a/web/src/modules/Jobs/Helper.tsx +++ b/web/src/modules/Jobs/Helper.tsx @@ -77,6 +77,7 @@ export type JobType = { cluster_id: string cluster_name: string canceled_by?: string + job_attributes?: Record error?: string context?: { properties: { diff --git a/web/src/modules/Jobs/JobDetails/JobInformationPane.tsx b/web/src/modules/Jobs/JobDetails/JobInformationPane.tsx index 55608078..59e5fe98 100644 --- a/web/src/modules/Jobs/JobDetails/JobInformationPane.tsx +++ b/web/src/modules/Jobs/JobDetails/JobInformationPane.tsx @@ -13,6 +13,7 @@ const JobInformationPane = ({ isLoading, }: JobDataTypesProps): React.JSX.Element => { const router = useRouter() + const jobAttributes = Object.entries(jobData?.job_attributes ?? {}) const jobdetailsData = [ { label: 'Version', data: jobData?.version, check: !!jobData?.version }, { @@ -124,6 +125,38 @@ const JobInformationPane = ({ ]} isTwoColumns /> + {jobAttributes.length > 0 && ( + <> + + ({ + label: '', + data: + attr?.kind === 'link' ? ( + + ) : ( + + {label}: {attr?.value} + + ), + check: true, + }))} + isTwoColumns + /> + + )} )}