Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,25 @@ health_check:

---

## Config-Driven Job Attributes (For Static or Runtime generated metadata)

Surface extra links or text on a job in the UI with minimal change in plugin code - by declaring `attributes` on a cluster. Each attribute is a template that Heimdall renders after the job runs and stores in the job's `job_attributes`, which the UI then displays. Check local.yaml for the example.

Each entry is `label → { kind, value }`:

- **`kind`** tells the UI how to render `value`: `link` (clickable hyperlink) or `text`.
- **`value`** is a Go [`text/template`](https://pkg.go.dev/text/template) rendered over four namespaces:
- `.Job` — the job object (e.g. `.Job.ID`)
- `.Command` / `.Cluster` — the matched command's and cluster's `context` maps
- `.Outputs` — runtime values published by the plugin during execution

Static attributes (built only from `.Job`, `.Command`, `.Cluster`) need no plugin changes. For runtime-discovered values, a plugin publishes to the outputs channel with a single call - for example,

```go
job.SetOutput("some_runtime_metadata", value)
```
---

## 👥 Credits

**Heimdall** was created at **Pattern, Inc** by Stan Babourine, with contributions from Will Graham, Gaurav Warale and Josh Diaz.
1 change: 1 addition & 0 deletions assets/databases/heimdall/tables/jobs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ create table if not exists jobs

alter table jobs add column if not exists store_result_sync boolean not null default false;
alter table jobs add column if not exists canceled_by varchar(64) null;
alter table jobs add column if not exists job_attributes jsonb not null default '{}'::jsonb;

-- Originally had "cancelled_by" column and "cancelling" status, but we aren't british. Whoops.
do $$ begin
Expand Down
11 changes: 10 additions & 1 deletion configs/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,19 @@ commands:
# supported clusters
clusters:
- name: localhost-0.0.1
attributes:
Test Link Label:
kind: link
value: "{{ .Cluster.test_url }}/remaining_static_or_runtime_generated_path"
Test Text Label:
kind: text
value: "static_or_runtime_generated_value_via_templating"
status: active
version: 0.0.1
description: Just a localhost
health_check: true
context:
test_url: http://localhost:8080
tags:
- type:localhost
- data:local
- data:local
53 changes: 53 additions & 0 deletions internal/pkg/heimdall/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
_ "embed"
"encoding/json"
"fmt"
"io"
"math/big"
"net/http"
"os"
"strings"
"text/template"
"time"

"github.com/babourine/x/pkg/set"
Expand Down Expand Up @@ -42,6 +44,7 @@ var (
ErrCallerNotAllowed = fmt.Errorf(`caller is not allowed to run this command`)
runJobMethod = telemetry.NewMethod("runJob", "heimdall")
cancelJobMethod = telemetry.NewMethod("db_connection", "cancel_job")
renderAttributesMethod = telemetry.NewMethod("renderAttributes", "heimdall")
)

//go:embed queries/job/status_cancel_update.sql
Expand All @@ -52,6 +55,13 @@ type commandOnCluster struct {
cluster *cluster.Cluster
}

type attributeTemplateData struct {
Job *job.Job
Outputs map[string]string
Command map[string]any
Cluster map[string]any
}

func (h *Heimdall) submitJob(ctx context.Context, j *job.Job) (any, error) {

// set / add job properties
Expand Down Expand Up @@ -174,6 +184,8 @@ func (h *Heimdall) runJob(ctx context.Context, j *job.Job, command *command.Comm
return nil
}

renderJobAttributes(j, command, cluster)

// Handle plugin execution result (only if not canceled)
if jobErr != nil {
j.Status = jobStatus.Failed
Expand All @@ -195,6 +207,47 @@ func (h *Heimdall) runJob(ctx context.Context, j *job.Job, command *command.Comm

}

func renderJobAttributes(j *job.Job, command *command.Command, cluster *cluster.Cluster) {

if len(cluster.Attributes) == 0 {
return
}

data := attributeTemplateData{Job: j, Outputs: j.Outputs()}
if data.Outputs == nil {
data.Outputs = map[string]string{}
}
if command.Context != nil {
data.Command = map[string]any(*command.Context)
}
if cluster.Context != nil {
data.Cluster = map[string]any(*cluster.Context)
}

for label, attr := range cluster.Attributes {

parsed, err := template.New(`attribute`).Parse(attr.Value)
if err != nil {
renderAttributesMethod.LogAndCountError(fmt.Errorf("attribute %q: invalid template: %w", label, err))
continue
}

var value strings.Builder
if err := parsed.Option(`missingkey=error`).Execute(&value, data); err != nil {
if structuralErr := parsed.Option(`missingkey=zero`).Execute(io.Discard, data); structuralErr != nil {
renderAttributesMethod.LogAndCountError(fmt.Errorf("attribute %q: %w", label, structuralErr))
}
continue
}

if j.JobAttributes == nil {
j.JobAttributes = make(map[string]job.Attribute)
}
j.JobAttributes[label] = job.Attribute{Kind: attr.Kind, Value: value.String()}
}

}

func (h *Heimdall) storeResults(runtime *plugin.Runtime, j *job.Job) error {
// do we have result to be written?
if j.Result == nil {
Expand Down
28 changes: 24 additions & 4 deletions internal/pkg/heimdall/job_dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,17 +213,22 @@ func (h *Heimdall) getJob(ctx context.Context, j *jobRequest) (any, error) {
},
}

var jobContext string
var jobContext, jobAttributes string

if err := row.Scan(&r.SystemID, &r.Status, &r.Name, &r.Version, &r.Description, &jobContext, &r.Error, &r.User, &r.IsSync,
&r.CreatedAt, &r.UpdatedAt, &r.CommandID, &r.CommandName, &r.ClusterID, &r.ClusterName, &r.StoreResultSync, &r.CanceledBy); err != nil {
&r.CreatedAt, &r.UpdatedAt, &r.CommandID, &r.CommandName, &r.ClusterID, &r.ClusterName, &r.StoreResultSync, &r.CanceledBy, &jobAttributes); err != nil {
if err == sql.ErrNoRows {
return nil, ErrUnknownJobID
} else {
return nil, err
}
}

if err := parseJobAttributes(r, jobAttributes); err != nil {
getJobMethod.LogAndCountError(err, "parse_job_attributes")
return nil, err
}

if err := jobParseContextAndTags(r, jobContext, sess); err != nil {
getJobMethod.LogAndCountError(err, "job_parse_context_and_tags")
return nil, err
Expand Down Expand Up @@ -439,15 +444,20 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error)

for rows.Next() {

jobContext := ``
jobContext, jobAttributes := ``, ``
r := &job.Job{}

if err := rows.Scan(&r.SystemID, &r.ID, &r.Status, &r.Name, &r.Version, &r.Description, &jobContext, &r.Error, &r.User, &r.IsSync,
&r.CreatedAt, &r.UpdatedAt, &r.CommandID, &r.CommandName, &r.ClusterID, &r.ClusterName, &r.StoreResultSync, &r.CanceledBy); err != nil {
&r.CreatedAt, &r.UpdatedAt, &r.CommandID, &r.CommandName, &r.ClusterID, &r.ClusterName, &r.StoreResultSync, &r.CanceledBy, &jobAttributes); err != nil {
getJobsMethod.LogAndCountError(err, "scan")
return nil, err
}

if err := parseJobAttributes(r, jobAttributes); err != nil {
getJobsMethod.LogAndCountError(err, "parse_job_attributes")
return nil, err
}

if err := jobParseContextAndTags(r, jobContext, sess); err != nil {
getJobsMethod.LogAndCountError(err, "job_parse_context_and_tags")
return nil, err
Expand Down Expand Up @@ -522,6 +532,16 @@ func (h *Heimdall) getJobStatus(ctx context.Context, j *jobRequest) (any, error)

}

func parseJobAttributes(j *job.Job, jobAttributes string) error {
if jobAttributes == `` || jobAttributes == `{}` {
return nil
}
if err := json.Unmarshal([]byte(jobAttributes), &j.JobAttributes); err != nil {
j.JobAttributes = nil
}
return nil
}

func jobParseContextAndTags(j *job.Job, jobContext string, sess *database.Session) (err error) {

// ...and add job context
Expand Down
16 changes: 14 additions & 2 deletions internal/pkg/heimdall/jobs_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package heimdall
import (
"context"
_ "embed"
"encoding/json"
"fmt"
"time"

Expand Down Expand Up @@ -35,6 +36,17 @@ var queryJobStatusUpdate string
//go:embed queries/job/active_delete.sql
var queryActiveJobDelete string

func jobAttributesJSON(j *job.Job) string {
if len(j.JobAttributes) == 0 {
return ``
}
data, err := json.Marshal(j.JobAttributes)
if err != nil {
return ``
}
return string(data)
}

func (h *Heimdall) getAsyncJobs(limit int) ([]*job.Job, error) {

// Track DB connection for async jobs retrieval
Expand Down Expand Up @@ -118,7 +130,7 @@ func (h *Heimdall) runAsyncJob(ctx context.Context, j *job.Job) error {
}
defer sess.Close()

if _, err := sess.Exec(queryJobStatusUpdate, status.Running, ``, j.SystemID); err != nil {
if _, err := sess.Exec(queryJobStatusUpdate, status.Running, ``, jobAttributesJSON(j), j.SystemID); err != nil {
return h.updateAsyncJobStatus(j, err)
}

Expand Down Expand Up @@ -154,7 +166,7 @@ func (h *Heimdall) updateAsyncJobStatus(j *job.Job, jobError error) error {
}
defer sess.Close()

if _, err := sess.Exec(queryJobStatusUpdate, j.Status, j.Error, j.SystemID); err != nil {
if _, err := sess.Exec(queryJobStatusUpdate, j.Status, j.Error, jobAttributesJSON(j), j.SystemID); err != nil {
// TODO: implement proper logging
fmt.Println(`job status update error:`, err)
}
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/heimdall/queries/job/select.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ select
cl.cluster_id,
cl.cluster_name,
j.store_result_sync,
j.canceled_by
j.canceled_by,
j.job_attributes
from
jobs j
left join commands cm on cm.system_command_id = j.job_command_id
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/heimdall/queries/job/select_jobs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ select
cl.cluster_id,
cl.cluster_name,
j.store_result_sync,
j.canceled_by
j.canceled_by,
j.job_attributes
from
jobs j
join job_statuses js on js.job_status_id = j.job_status_id
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/heimdall/queries/job/status_update.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ update jobs
set
job_status_id = $1,
job_error = left($2::text, 1024),
job_attributes = coalesce(nullif($3::text, '')::jsonb, job_attributes),
updated_at = extract(epoch from now())::int
where
system_job_id = $3;
system_job_id = $4;
5 changes: 5 additions & 0 deletions internal/pkg/object/command/sparkeks/sparkeks.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
statusReportInterval = 30 * time.Second
defaultNamespace = "default"
applicationPrefix = "spark-sql-job"
sparkApplicationIDOutput = "spark_application_id"
awsRegionEnvVar = "AWS_REGION"
s3Prefix = "s3://"
s3aPrefix = "s3a://"
Expand Down Expand Up @@ -929,6 +930,10 @@ func (e *executionContext) monitorJobAndCollectLogs(ctx context.Context) error {
}

finalSparkApp = sparkApp

if id := sparkApp.Status.SparkApplicationID; id != "" {
e.job.SetOutput(sparkApplicationIDOutput, id)
}
state := sparkApp.Status.AppState.State

if time.Since(lastReport) >= statusReportInterval || isTerminalState(state) {
Expand Down
10 changes: 6 additions & 4 deletions pkg/object/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/patterninc/heimdall/pkg/object"
"github.com/patterninc/heimdall/pkg/object/job"
"github.com/patterninc/heimdall/pkg/object/status"
"github.com/patterninc/heimdall/pkg/rbac"
)
Expand All @@ -14,10 +15,11 @@ var (

type Cluster struct {
object.Object `yaml:",inline" json:",inline"`
Status status.Status `yaml:"status,omitempty" json:"status,omitempty"`
HealthCheck bool `yaml:"health_check,omitempty" json:"health_check,omitempty"`
RBACNames []string `yaml:"rbacs,omitempty" json:"rbacs,omitempty"`
RBACs []rbac.RBAC `yaml:"-" json:"-"`
Status status.Status `yaml:"status,omitempty" json:"status,omitempty"`
HealthCheck bool `yaml:"health_check,omitempty" json:"health_check,omitempty"`
RBACNames []string `yaml:"rbacs,omitempty" json:"rbacs,omitempty"`
RBACs []rbac.RBAC `yaml:"-" json:"-"`
Attributes map[string]job.Attribute `yaml:"attributes,omitempty" json:"attributes,omitempty"`
}

type Clusters map[string]*Cluster
Expand Down
51 changes: 38 additions & 13 deletions pkg/object/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,33 @@ import (
)

type Job struct {
object.Object `yaml:",inline" json:",inline"`
Status status.Status `yaml:"status,omitempty" json:"status,omitempty"`
IsSync bool `yaml:"is_sync,omitempty" json:"is_sync,omitempty"`
StoreResultSync bool `yaml:"store_result_sync,omitempty" json:"store_result_sync,omitempty"`
Error string `yaml:"error,omitempty" json:"error,omitempty"`
CommandCriteria *set.Set[string] `yaml:"command_criteria,omitempty" json:"command_criteria,omitempty"`
ClusterCriteria *set.Set[string] `yaml:"cluster_criteria,omitempty" json:"cluster_criteria,omitempty"`
CommandID string `yaml:"command_id,omitempty" json:"command_id,omitempty"`
CommandName string `yaml:"command_name,omitempty" json:"command_name,omitempty"`
ClusterID string `yaml:"cluster_id,omitempty" json:"cluster_id,omitempty"`
ClusterName string `yaml:"cluster_name,omitempty" json:"cluster_name,omitempty"`
CanceledBy string `yaml:"canceled_by,omitempty" json:"canceled_by,omitempty"`
Result *result.Result `yaml:"result,omitempty" json:"result,omitempty"`
object.Object `yaml:",inline" json:",inline"`
Status status.Status `yaml:"status,omitempty" json:"status,omitempty"`
IsSync bool `yaml:"is_sync,omitempty" json:"is_sync,omitempty"`
StoreResultSync bool `yaml:"store_result_sync,omitempty" json:"store_result_sync,omitempty"`
Error string `yaml:"error,omitempty" json:"error,omitempty"`
CommandCriteria *set.Set[string] `yaml:"command_criteria,omitempty" json:"command_criteria,omitempty"`
ClusterCriteria *set.Set[string] `yaml:"cluster_criteria,omitempty" json:"cluster_criteria,omitempty"`
CommandID string `yaml:"command_id,omitempty" json:"command_id,omitempty"`
CommandName string `yaml:"command_name,omitempty" json:"command_name,omitempty"`
ClusterID string `yaml:"cluster_id,omitempty" json:"cluster_id,omitempty"`
ClusterName string `yaml:"cluster_name,omitempty" json:"cluster_name,omitempty"`
CanceledBy string `yaml:"canceled_by,omitempty" json:"canceled_by,omitempty"`
JobAttributes map[string]Attribute `yaml:"job_attributes,omitempty" json:"job_attributes,omitempty"`
Result *result.Result `yaml:"result,omitempty" json:"result,omitempty"`
outputs map[string]string
}

// Attribute kinds for JobAttributes. Kind tells the UI how to render Value.
const (
AttributeKindLink = "link"
AttributeKindText = "text"
)

// Attribute is a generic, plugin-agnostic value surfaced on a job in the UI.
type Attribute struct {
Kind string `yaml:"kind,omitempty" json:"kind,omitempty"`
Value string `yaml:"value,omitempty" json:"value,omitempty"`
}

func (j *Job) Init() error {
Expand All @@ -41,3 +55,14 @@ func (j *Job) Init() error {
return nil

}

func (j *Job) SetOutput(key, value string) {
if j.outputs == nil {
j.outputs = make(map[string]string)
}
j.outputs[key] = value
}

func (j *Job) Outputs() map[string]string {
return j.outputs
}
1 change: 1 addition & 0 deletions web/src/modules/Jobs/Helper.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export type JobType = {
cluster_id: string
cluster_name: string
canceled_by?: string
job_attributes?: Record<string, { kind?: string; value?: string }>
error?: string
context?: {
properties: {
Expand Down
Loading
Loading