diff --git a/.changeset/forty-areas-remain.md b/.changeset/forty-areas-remain.md new file mode 100644 index 000000000..c01a3887d --- /dev/null +++ b/.changeset/forty-areas-remain.md @@ -0,0 +1,5 @@ +--- +'@openfn/lightning-mock': patch +--- + +Add validation to the provisioner endpoint diff --git a/integration-tests/cli/test/deploy.test.ts b/integration-tests/cli/test/deploy.test.ts index 2425d0177..3f07a7ec8 100644 --- a/integration-tests/cli/test/deploy.test.ts +++ b/integration-tests/cli/test/deploy.test.ts @@ -12,6 +12,26 @@ const port = 8967; const endpoint = `http://localhost:${port}`; let tmpDir = path.resolve('tmp/deploy'); +const testProjectV2 = ` +id: my-project +name: My Project +schema_version: '4.0' +workflows: + - id: my-workflow + name: My Workflow + start: webhook + steps: + - id: webhook + type: webhook + enabled: true + next: + transform-data: {} + - id: transform-data + name: Transform data + expression: 'fn(s => s)' + adaptor: '@openfn/language-common@latest' +`.trim(); + const testProject = ` name: test-project workflows: @@ -97,7 +117,6 @@ test.serial('deploy a local project', async (t) => { --log-json \ -l debug` ); - t.falsy(stderr); const logs = extractLogs(stdout); @@ -269,7 +288,7 @@ test.serial('redirect to v2 protocol if openfn.yaml is present', async (t) => { ); }); -test.serial('deploy a v2 spec file', async (t) => { +test.serial.only('deploy a v2 spec file', async (t) => { const testProjectV2 = ` name: test-project schema_version: '4.0' @@ -301,7 +320,7 @@ workflows: --log-json \ -l debug` ); - + console.log(stdout); t.falsy(stderr); const logs = extractLogs(stdout); @@ -378,3 +397,25 @@ test.serial('deploy then pull, changes one workflow, deploy', async (t) => { t.is(Object.keys(server.state.projects).length, 1); t.truthy(server.state.projects[projectId]); }); + +test.serial('deploy a v2 project.yaml', async (t) => { + await fs.writeFile(path.join(tmpDir, 'project.yaml'), testProjectV2); + + const { stdout, stderr } = await run( + `openfn deploy \ + --project-path ${tmpDir}/project.yaml \ + --state-path ${tmpDir}/.state.json \ + --no-confirm \ + --log-json \ + -l debug` + ); + + t.falsy(stderr); + + const logs = extractLogs(stdout); + assertLog(t, logs, /Deployed/); + + t.is(Object.keys(server.state.projects).length, 1); + const [project] = Object.values(server.state.projects) as any[]; + t.is(project.name, 'My Project'); +}); diff --git a/packages/cli/src/deploy/handler.ts b/packages/cli/src/deploy/handler.ts index 4c04b9c2d..9620e4079 100644 --- a/packages/cli/src/deploy/handler.ts +++ b/packages/cli/src/deploy/handler.ts @@ -150,7 +150,7 @@ export const maybeConvertV2spec = async (yaml: string): Promise => { const json = yamlToJson(yaml) as any; if (detectVersion(json) > 1) { const project = await Project.from('project', json); - return project.serialize('state', { format: 'yaml' }) as string; + return project.serialize('state', { format: 'yaml', asSpec: true }) as string; } return yaml; }; diff --git a/packages/cli/test/deploy/deploy.test.ts b/packages/cli/test/deploy/deploy.test.ts index f47633d9a..cec8b1dbe 100644 --- a/packages/cli/test/deploy/deploy.test.ts +++ b/packages/cli/test/deploy/deploy.test.ts @@ -260,6 +260,24 @@ test('maybeConvertV2spec: converts v2 (schema_version) to v1', async (t) => { t.falsy(json.schema_version); }); +test('maybeConvertV2spec: converted edges use key references, not UUIDs', async (t) => { + const result = await maybeConvertV2spec(v2Yaml); + const json = yamlToJson(result) as any; + + const workflow = Object.values(json.workflows)[0] as any; + const edge = Object.values(workflow.edges)[0] as any; + + // edge must use spec format (key references) so mergeSpecIntoState can resolve them + t.truthy(edge.source_trigger); + t.truthy(edge.target_job); + t.falsy(edge.source_trigger_id); + t.falsy(edge.target_job_id); + + // source_trigger must match a trigger key; target_job must match a job key + t.truthy(workflow.triggers[edge.source_trigger]); + t.truthy(workflow.jobs[edge.target_job]); +}); + test('maybeConvertV2spec: converts legacy v2 (cli.version: 2) to v1', async (t) => { const legacyV2Yaml = `id: my-project name: My Project diff --git a/packages/deploy/src/index.ts b/packages/deploy/src/index.ts index 5d3ecc558..215aceb38 100644 --- a/packages/deploy/src/index.ts +++ b/packages/deploy/src/index.ts @@ -118,7 +118,6 @@ export async function deploy(config: DeployConfig, logger: Logger) { throw new DeployError(`${config.specPath} has errors`, 'VALIDATION_ERROR'); } const nextState = mergeSpecIntoState(state, spec.doc); - validateProjectState(nextState); // Convert the state to a payload for the API. diff --git a/packages/lightning-mock/src/api-rest.ts b/packages/lightning-mock/src/api-rest.ts index 0727c8f74..342bfa1c4 100644 --- a/packages/lightning-mock/src/api-rest.ts +++ b/packages/lightning-mock/src/api-rest.ts @@ -83,6 +83,43 @@ workflows: enabled: true `; +// Validates a provisioner payload, returning an error body if invalid or null if valid. +// Mirrors Lightning's error format so deploy code sees realistic rejection responses. +export function validateProvisionPayload(incoming: any): Record | null { + const workflowErrors: Record = {}; + + const wfList: any[] = Array.isArray(incoming.workflows) + ? incoming.workflows + : Object.values(incoming.workflows ?? {}); + + for (const wf of wfList) { + const edgeErrors: Record = {}; + const edgeList: any[] = Array.isArray(wf.edges) + ? wf.edges + : Object.values(wf.edges ?? {}); + + for (const edge of edgeList) { + if (!edge.source_trigger_id && !edge.source_job_id) { + const key = edge.id ?? '->'; + edgeErrors[key] = { + source_job_id: ['source_job_id or source_trigger_id must be present'], + }; + } + } + + if (Object.keys(edgeErrors).length > 0) { + const wfKey = wf.name ?? wf.id ?? 'unknown'; + workflowErrors[wfKey] = { edges: edgeErrors }; + } + } + + if (Object.keys(workflowErrors).length > 0) { + return { errors: { workflows: workflowErrors } }; + } + + return null; +} + export default ( app: DevServer, state: ServerState, @@ -121,6 +158,14 @@ export default ( router.post('/api/provision', (ctx) => { const incoming: any = ctx.request.body; + + const validationErrors = validateProvisionPayload(incoming); + if (validationErrors) { + ctx.response.status = 422; + ctx.response.body = validationErrors; + return; + } + const now = new Date().toISOString(); if (!state.projects[incoming.id]) { diff --git a/packages/lightning-mock/test/rest.test.ts b/packages/lightning-mock/test/rest.test.ts index d5dc67ed9..7a03f0f5a 100644 --- a/packages/lightning-mock/test/rest.test.ts +++ b/packages/lightning-mock/test/rest.test.ts @@ -2,7 +2,7 @@ import test from 'ava'; import { setup } from './util'; -import { DEFAULT_PROJECT_ID } from '../src/api-rest'; +import { DEFAULT_PROJECT_ID, validateProvisionPayload } from '../src/api-rest'; // @ts-ignore let server: any; @@ -84,3 +84,121 @@ test.serial("should return 404 if a collection isn't found", async (t) => { }); test.todo("should return 403 if a collection isn't authorized"); + +test('validateProvisionPayload: returns null for a valid edge with source_trigger_id', (t) => { + const payload = { + id: 'proj-1', + workflows: [ + { + name: 'wf1', + edges: [ + { + id: 'e1', + source_trigger_id: 'trig-uuid', + target_job_id: 'job-uuid', + enabled: true, + }, + ], + }, + ], + }; + t.is(validateProvisionPayload(payload), null); +}); + +test('validateProvisionPayload: returns null for a valid edge with source_job_id', (t) => { + const payload = { + id: 'proj-1', + workflows: [ + { + name: 'wf1', + edges: [ + { + id: 'e1', + source_job_id: 'job-uuid', + target_job_id: 'job-uuid-2', + enabled: true, + }, + ], + }, + ], + }; + t.is(validateProvisionPayload(payload), null); +}); + +test('validateProvisionPayload: returns errors when edge has no source', (t) => { + const payload = { + id: 'proj-1', + workflows: [ + { + name: 'wf1', + edges: [ + { + id: 'edge-1', + source_trigger_id: null, + target_job_id: '', + enabled: true, + }, + ], + }, + ], + }; + const result = validateProvisionPayload(payload); + t.truthy(result); + t.deepEqual(result, { + errors: { + workflows: { + wf1: { + edges: { + 'edge-1': { + source_job_id: [ + 'source_job_id or source_trigger_id must be present', + ], + }, + }, + }, + }, + }, + }); +}); + +test('validateProvisionPayload: returns null when there are no edges', (t) => { + const payload = { + id: 'proj-1', + workflows: [{ name: 'wf1', edges: [] }], + }; + t.is(validateProvisionPayload(payload), null); +}); + +test.serial( + 'should return 422 when a workflow edge has no source', + async (t) => { + const response = await fetch(`${endpoint}/api/provision`, { + method: 'POST', + body: JSON.stringify({ + id: 'bad-proj', + name: 'Bad Project', + workflows: [ + { + id: 'wf-uuid', + name: 'wf1', + jobs: [], + triggers: [], + edges: [ + { + id: 'e1', + source_trigger_id: null, + target_job_id: '', + enabled: true, + }, + ], + }, + ], + }), + headers: { 'content-type': 'application/json' }, + }); + + t.is(response.status, 422); + const body = await response.json(); + t.truthy(body.errors?.workflows?.wf1?.edges); + } +); diff --git a/packages/project/src/serialize/to-app-state.ts b/packages/project/src/serialize/to-app-state.ts index 7805910a1..6fc7e61de 100644 --- a/packages/project/src/serialize/to-app-state.ts +++ b/packages/project/src/serialize/to-app-state.ts @@ -10,7 +10,7 @@ import Workflow from '../Workflow'; import slugify from '../util/slugify'; import getCredentialName from '../util/get-credential-name'; -type Options = { format?: 'json' | 'yaml' }; +type Options = { format?: 'json' | 'yaml'; asSpec?: boolean }; const defaultJobProps = { // TODO why does the provisioner throw if these keys are not set? @@ -56,7 +56,7 @@ export default function ( })); state.workflows = project.workflows - .map((w) => mapWorkflow(w, credentialsWithUuids)) + .map((w) => mapWorkflow(w, credentialsWithUuids, options)) .reduce((obj: any, wf) => { obj[slugify(wf.name ?? wf.id)] = wf; return obj; @@ -75,8 +75,11 @@ export default function ( export const mapWorkflow = ( workflow: Workflow, - credentials: CredentialState[] = [] + credentials: CredentialState[] = [], + options: Options = {} ) => { + const useUuids = !options.asSpec; + if (workflow instanceof Workflow) { // @ts-ignore workflow = workflow.toJSON(); @@ -85,7 +88,7 @@ export const mapWorkflow = ( const { uuid, ...originalOpenfnProps } = workflow.openfn ?? {}; const wfState = { ...originalOpenfnProps, - id: workflow.openfn?.uuid ?? randomUUID(), + ...(useUuids ? { id: workflow.openfn?.uuid ?? randomUUID() } : {}), jobs: {}, triggers: {}, edges: {}, @@ -96,17 +99,17 @@ export const mapWorkflow = ( wfState.name = workflow.name; } - // lookup of local-ids to project-ids + // lookup of local-ids to project-ids (only needed when using UUIDs) const lookup = workflow.steps.reduce((obj, next) => { - if (!next.openfn?.uuid) { - // If there's no tracked id, we generate one here - // TODO there is no unit test on this - next.openfn ??= {}; - next.openfn.uuid = randomUUID(); + if (useUuids) { + if (!next.openfn?.uuid) { + // If there's no tracked id, we generate one here + next.openfn ??= {}; + next.openfn.uuid = randomUUID(); + } + // @ts-ignore + obj[next.id] = next.openfn.uuid; } - - // @ts-ignore - obj[next.id] = next.openfn.uuid; return obj; }, {}) as Record; @@ -122,13 +125,15 @@ export const mapWorkflow = ( node = { ...rest, type: s.type ?? 'webhook', // this is mostly for tests - ...renameKeys(openfn, { uuid: 'id' }), + ...(useUuids ? renameKeys(openfn, { uuid: 'id' }) : {}), } as Provisioner.Trigger; wfState.triggers[node.type] = node; } else { node = omitBy(pick(s, ['name', 'adaptor']), isNil) as Provisioner.Job; const { uuid, ...otherOpenFnProps } = s.openfn ?? {}; - node.id = uuid; + if (useUuids) { + node.id = uuid; + } if (s.expression) { node.body = s.expression; } @@ -145,16 +150,11 @@ export const mapWorkflow = ( if (mappedCredential) { projectCredentialId = mappedCredential.uuid; } - // else { - // console.warn(`WARING! Failed to map credential ${projectCredentialId} - Lightning may throw an error. - - // Ensure the credential exists in project.yaml and try again (maybe ensure the credential is attached to the project in the app and run project fetch)`); - // } otherOpenFnProps.project_credential_id = projectCredentialId; } } - Object.assign(node, defaultJobProps, otherOpenFnProps); + Object.assign(node, useUuids ? defaultJobProps : {}, otherOpenFnProps); wfState.jobs[s.id ?? slugify(s.name)] = node; } @@ -165,18 +165,31 @@ export const mapWorkflow = ( const { uuid, ...otherOpenFnProps } = rules.openfn ?? {}; - const e = { - id: uuid ?? randomUUID(), - target_job_id: lookup[next], - enabled: !rules.disabled, - source_trigger_id: null, // lightning complains if this isn't set, even if its falsy :( - } as Provisioner.Edge; - Object.assign(e, otherOpenFnProps); - - if (isTrigger) { - e.source_trigger_id = node.id; + let e: any; + if (useUuids) { + e = { + id: uuid ?? randomUUID(), + target_job_id: lookup[next], + enabled: !rules.disabled, + source_trigger_id: null, // lightning complains if this isn't set, even if its falsy :( + } as Provisioner.Edge; + Object.assign(e, otherOpenFnProps); + if (isTrigger) { + e.source_trigger_id = node.id; + } else { + e.source_job_id = node.id; + } } else { - e.source_job_id = node.id; + e = { + enabled: !rules.disabled, + target_job: next, + }; + Object.assign(e, otherOpenFnProps); + if (isTrigger) { + e.source_trigger = s.type; + } else { + e.source_job = s.id; + } } if (rules.label) { @@ -202,16 +215,18 @@ export const mapWorkflow = ( }); }); - // Sort edges by UUID (for more predictable comparisons in test) - wfState.edges = Object.keys(wfState.edges) - // convert edge ids to strings just in case a number creeps in (it might in test) - .sort((a, b) => - `${wfState.edges[a].id}`.localeCompare('' + wfState.edges[b].id) - ) - .reduce((obj: any, key) => { - obj[key] = wfState.edges[key]; - return obj; - }, {}); + if (useUuids) { + // Sort edges by UUID (for more predictable comparisons in test) + wfState.edges = Object.keys(wfState.edges) + // convert edge ids to strings just in case a number creeps in (it might in test) + .sort((a, b) => + `${wfState.edges[a].id}`.localeCompare('' + wfState.edges[b].id) + ) + .reduce((obj: any, key) => { + obj[key] = wfState.edges[key]; + return obj; + }, {}); + } return wfState; }; diff --git a/packages/project/test/serialize/to-app-state.test.ts b/packages/project/test/serialize/to-app-state.test.ts index c007f114c..bcfd7ea76 100644 --- a/packages/project/test/serialize/to-app-state.test.ts +++ b/packages/project/test/serialize/to-app-state.test.ts @@ -612,6 +612,76 @@ test('should convert a project back to app state in json', (t) => { // TODO this test is failing because the order of keys in the yaml have changed! // We probably need to force alphabetical sorting on yaml keys +// asSpec: true — spec format for deploy pipeline + +const v2ProjectData: any = { + id: 'my-project', + name: 'My Project', + schema_version: '4.0', + workflows: [ + { + id: 'my-workflow', + name: 'My Workflow', + start: 'webhook', + steps: [ + { + id: 'webhook', + type: 'webhook', + enabled: true, + next: { 'transform-data': {} }, + }, + { + id: 'transform-data', + name: 'Transform data', + expression: 'fn(s => s)', + adaptor: '@openfn/language-common@latest', + }, + ], + }, + ], +}; + +test('asSpec:true - edges use source_trigger/target_job keys, not UUIDs', (t) => { + const project = new Project(v2ProjectData, { formats: { project: 'json' } }); + const result = toAppState(project, { format: 'json', asSpec: true }) as any; + + const edge = Object.values(result.workflows['my-workflow'].edges)[0] as any; + t.truthy(edge.source_trigger); + t.truthy(edge.target_job); + t.falsy(edge.source_trigger_id); + t.falsy(edge.target_job_id); + t.falsy(edge.id); +}); + +test('asSpec:true - source_trigger matches the trigger key', (t) => { + const project = new Project(v2ProjectData, { formats: { project: 'json' } }); + const result = toAppState(project, { format: 'json', asSpec: true }) as any; + + const wf = result.workflows['my-workflow']; + const edge = Object.values(wf.edges)[0] as any; + t.truthy(wf.triggers[edge.source_trigger]); +}); + +test('asSpec:true - target_job matches the job key', (t) => { + const project = new Project(v2ProjectData, { formats: { project: 'json' } }); + const result = toAppState(project, { format: 'json', asSpec: true }) as any; + + const wf = result.workflows['my-workflow']; + const edge = Object.values(wf.edges)[0] as any; + t.truthy(wf.jobs[edge.target_job]); +}); + +test('asSpec:true - triggers and jobs have no generated id', (t) => { + const project = new Project(v2ProjectData, { formats: { project: 'json' } }); + const result = toAppState(project, { format: 'json', asSpec: true }) as any; + + const wf = result.workflows['my-workflow']; + const trigger = Object.values(wf.triggers)[0] as any; + const job = Object.values(wf.jobs)[0] as any; + t.falsy(trigger.id); + t.falsy(job.id); +}); + test.skip('should convert a project back to app state in yaml', (t) => { // this is a serialized project file const data: any = {