Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/forty-areas-remain.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/lightning-mock': patch
---

Add validation to the provisioner endpoint
47 changes: 44 additions & 3 deletions integration-tests/cli/test/deploy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,26 @@ const port = 8967;
const endpoint = `http://localhost:${port}`;
let tmpDir = path.resolve('tmp/deploy');

const testProjectV2 = `
id: my-project
name: My Project
schema_version: '4.0'
workflows:
- id: my-workflow
name: My Workflow
start: webhook
steps:
- id: webhook
type: webhook
enabled: true
next:
transform-data: {}
- id: transform-data
name: Transform data
expression: 'fn(s => s)'
adaptor: '@openfn/language-common@latest'
`.trim();

const testProject = `
name: test-project
workflows:
Expand Down Expand Up @@ -97,7 +117,6 @@ test.serial('deploy a local project', async (t) => {
--log-json \
-l debug`
);

t.falsy(stderr);

const logs = extractLogs(stdout);
Expand Down Expand Up @@ -269,7 +288,7 @@ test.serial('redirect to v2 protocol if openfn.yaml is present', async (t) => {
);
});

test.serial('deploy a v2 spec file', async (t) => {
test.serial.only('deploy a v2 spec file', async (t) => {
const testProjectV2 = `
name: test-project
schema_version: '4.0'
Expand Down Expand Up @@ -301,7 +320,7 @@ workflows:
--log-json \
-l debug`
);

console.log(stdout);
t.falsy(stderr);

const logs = extractLogs(stdout);
Expand Down Expand Up @@ -378,3 +397,25 @@ test.serial('deploy then pull, changes one workflow, deploy', async (t) => {
t.is(Object.keys(server.state.projects).length, 1);
t.truthy(server.state.projects[projectId]);
});

test.serial('deploy a v2 project.yaml', async (t) => {
await fs.writeFile(path.join(tmpDir, 'project.yaml'), testProjectV2);

const { stdout, stderr } = await run(
`openfn deploy \
--project-path ${tmpDir}/project.yaml \
--state-path ${tmpDir}/.state.json \
--no-confirm \
--log-json \
-l debug`
);

t.falsy(stderr);

const logs = extractLogs(stdout);
assertLog(t, logs, /Deployed/);

t.is(Object.keys(server.state.projects).length, 1);
const [project] = Object.values(server.state.projects) as any[];
t.is(project.name, 'My Project');
});
2 changes: 1 addition & 1 deletion packages/cli/src/deploy/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ export const maybeConvertV2spec = async (yaml: string): Promise<string> => {
const json = yamlToJson(yaml) as any;
if (detectVersion(json) > 1) {
const project = await Project.from('project', json);
return project.serialize('state', { format: 'yaml' }) as string;
return project.serialize('state', { format: 'yaml', asSpec: true }) as string;
}
return yaml;
};
Expand Down
18 changes: 18 additions & 0 deletions packages/cli/test/deploy/deploy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,24 @@ test('maybeConvertV2spec: converts v2 (schema_version) to v1', async (t) => {
t.falsy(json.schema_version);
});

test('maybeConvertV2spec: converted edges use key references, not UUIDs', async (t) => {
const result = await maybeConvertV2spec(v2Yaml);
const json = yamlToJson(result) as any;

const workflow = Object.values(json.workflows)[0] as any;
const edge = Object.values(workflow.edges)[0] as any;

// edge must use spec format (key references) so mergeSpecIntoState can resolve them
t.truthy(edge.source_trigger);
t.truthy(edge.target_job);
t.falsy(edge.source_trigger_id);
t.falsy(edge.target_job_id);

// source_trigger must match a trigger key; target_job must match a job key
t.truthy(workflow.triggers[edge.source_trigger]);
t.truthy(workflow.jobs[edge.target_job]);
});

test('maybeConvertV2spec: converts legacy v2 (cli.version: 2) to v1', async (t) => {
const legacyV2Yaml = `id: my-project
name: My Project
Expand Down
1 change: 0 additions & 1 deletion packages/deploy/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ export async function deploy(config: DeployConfig, logger: Logger) {
throw new DeployError(`${config.specPath} has errors`, 'VALIDATION_ERROR');
}
const nextState = mergeSpecIntoState(state, spec.doc);

validateProjectState(nextState);

// Convert the state to a payload for the API.
Expand Down
45 changes: 45 additions & 0 deletions packages/lightning-mock/src/api-rest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,43 @@ workflows:
enabled: true
`;

// Validates a provisioner payload, returning an error body if invalid or null if valid.
// Mirrors Lightning's error format so deploy code sees realistic rejection responses.
export function validateProvisionPayload(incoming: any): Record<string, any> | null {
const workflowErrors: Record<string, any> = {};

const wfList: any[] = Array.isArray(incoming.workflows)
? incoming.workflows
: Object.values(incoming.workflows ?? {});

for (const wf of wfList) {
const edgeErrors: Record<string, any> = {};
const edgeList: any[] = Array.isArray(wf.edges)
? wf.edges
: Object.values(wf.edges ?? {});

for (const edge of edgeList) {
if (!edge.source_trigger_id && !edge.source_job_id) {
const key = edge.id ?? '->';
edgeErrors[key] = {
source_job_id: ['source_job_id or source_trigger_id must be present'],
};
}
}

if (Object.keys(edgeErrors).length > 0) {
const wfKey = wf.name ?? wf.id ?? 'unknown';
workflowErrors[wfKey] = { edges: edgeErrors };
}
}

if (Object.keys(workflowErrors).length > 0) {
return { errors: { workflows: workflowErrors } };
}

return null;
}

export default (
app: DevServer,
state: ServerState,
Expand Down Expand Up @@ -121,6 +158,14 @@ export default (

router.post('/api/provision', (ctx) => {
const incoming: any = ctx.request.body;

const validationErrors = validateProvisionPayload(incoming);
if (validationErrors) {
ctx.response.status = 422;
ctx.response.body = validationErrors;
return;
}

const now = new Date().toISOString();

if (!state.projects[incoming.id]) {
Expand Down
120 changes: 119 additions & 1 deletion packages/lightning-mock/test/rest.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import test from 'ava';

import { setup } from './util';
import { DEFAULT_PROJECT_ID } from '../src/api-rest';
import { DEFAULT_PROJECT_ID, validateProvisionPayload } from '../src/api-rest';

// @ts-ignore
let server: any;
Expand Down Expand Up @@ -84,3 +84,121 @@ test.serial("should return 404 if a collection isn't found", async (t) => {
});

test.todo("should return 403 if a collection isn't authorized");

test('validateProvisionPayload: returns null for a valid edge with source_trigger_id', (t) => {
const payload = {
id: 'proj-1',
workflows: [
{
name: 'wf1',
edges: [
{
id: 'e1',
source_trigger_id: 'trig-uuid',
target_job_id: 'job-uuid',
enabled: true,
},
],
},
],
};
t.is(validateProvisionPayload(payload), null);
});

test('validateProvisionPayload: returns null for a valid edge with source_job_id', (t) => {
const payload = {
id: 'proj-1',
workflows: [
{
name: 'wf1',
edges: [
{
id: 'e1',
source_job_id: 'job-uuid',
target_job_id: 'job-uuid-2',
enabled: true,
},
],
},
],
};
t.is(validateProvisionPayload(payload), null);
});

test('validateProvisionPayload: returns errors when edge has no source', (t) => {
const payload = {
id: 'proj-1',
workflows: [
{
name: 'wf1',
edges: [
{
id: 'edge-1',
source_trigger_id: null,
target_job_id: '',
enabled: true,
},
],
},
],
};
const result = validateProvisionPayload(payload);
t.truthy(result);
t.deepEqual(result, {
errors: {
workflows: {
wf1: {
edges: {
'edge-1': {
source_job_id: [
'source_job_id or source_trigger_id must be present',
],
},
},
},
},
},
});
});

test('validateProvisionPayload: returns null when there are no edges', (t) => {
const payload = {
id: 'proj-1',
workflows: [{ name: 'wf1', edges: [] }],
};
t.is(validateProvisionPayload(payload), null);
});

test.serial(
'should return 422 when a workflow edge has no source',
async (t) => {
const response = await fetch(`${endpoint}/api/provision`, {
method: 'POST',
body: JSON.stringify({
id: 'bad-proj',
name: 'Bad Project',
workflows: [
{
id: 'wf-uuid',
name: 'wf1',
jobs: [],
triggers: [],
edges: [
{
id: 'e1',
source_trigger_id: null,
target_job_id: '',
enabled: true,
},
],
},
],
}),
headers: { 'content-type': 'application/json' },
});

t.is(response.status, 422);
const body = await response.json();
t.truthy(body.errors?.workflows?.wf1?.edges);
}
);
Loading
Loading