Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
# @openfn/integration-tests-worker

## 1.1.0

### Minor Changes

- 8ebc086: Add support for state memory limit via worker

### Patch Changes

- Updated dependencies [8ebc086]
- @openfn/engine-multi@1.12.0
- @openfn/ws-worker@1.27.0
- @openfn/lightning-mock@2.4.20

## 1.0.96

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.96",
"version": "1.1.0",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <admin@openfn.org>",
"license": "ISC",
Expand Down
86 changes: 86 additions & 0 deletions integration-tests/worker/test/exit-reasons.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,92 @@ test.serial('kill: oom (large, kill vm)', async (t) => {
t.is(error_message, 'Run exceeded maximum memory usage');
});

test.serial('kill: state exceeds the configured state limit', async (t) => {
const attempt = {
id: crypto.randomUUID(),
jobs: [
{
adaptor: '@openfn/language-common@latest',
// ~2mb string for a 1mb limit
body: `fn((s) => {
s.data = new Array(2 * 1024 * 1024).fill('a').join('');
return s;
})`,
},
],
options: {
state_limit_mb: 1,
},
};

const result = await run(attempt);

const { reason, error_type, error_message } = result;
t.is(reason, 'kill');
t.is(error_type, 'StateTooLargeError');
t.regex(error_message, /State exceeds the limit of 1mb/);
});

test.serial(
'kill: state limit is enforced between jobs (downstream job does not run)',
async (t) => {
const jobOne = {
id: crypto.randomUUID(),
adaptor: '@openfn/language-common@latest',
// ~2mb state, over the 1mb limit set below
body: `fn((s) => {
s.data = new Array(2 * 1024 * 1024).fill('a').join('');
return s;
})`,
};

// not expected to run because the first job is expected to trigger state size crash
const jobTwo = {
id: crypto.randomUUID(),
adaptor: '@openfn/language-common@latest',
body: `fn(() => ({ data: 'ok' }))`,
};

const attempt = {
id: crypto.randomUUID(),
jobs: [jobOne, jobTwo],
edges: [
{
id: crypto.randomUUID(),
source_job_id: jobOne.id,
target_job_id: jobTwo.id,
condition: 'always',
},
],
options: {
state_limit_mb: 1,
},
};

const startedJobs: string[] = [];
const unsubscribe = lightning.onSocketEvent(
'step:start',
attempt.id,
(evt) => {
if (evt.runId === attempt.id) {
startedJobs.push(evt.payload.job_id);
}
},
false
);

const result = await run(attempt);
unsubscribe();

const { reason, error_type, error_message } = result;
t.is(reason, 'kill');
t.is(error_type, 'StateTooLargeError');
t.regex(error_message, /State exceeds the limit of 1mb/);

t.deepEqual(startedJobs, [jobOne.id]);
}
);

test.serial('crash: process.exit() triggered by postgres', async (t) => {
const attempt = {
id: crypto.randomUUID(),
Expand Down
12 changes: 12 additions & 0 deletions packages/engine-multi/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# engine-multi

## 1.12.0

### Minor Changes

- 8ebc086: Add support for state memory limit via worker
- Reduce support size of state objects

### Patch Changes

- Updated dependencies [8ebc086]
- @openfn/lexicon@2.3.0

## 1.11.4

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/engine-multi",
"version": "1.11.4",
"version": "1.12.0",
"description": "Multi-process runtime engine",
"main": "dist/index.js",
"type": "module",
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/src/api/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const execute = async (context: ExecutionContext) => {
// This must be fairly high to prevent crashes
stateLimitMb:
options.stateLimitMb ??
Math.max((options.memoryLimitMb ?? 1000) * 0.25),
Math.max(50, options.memoryLimitMb ?? 1000 * 0.15),
} as RunOptions;

logger.debug(
Expand Down
3 changes: 2 additions & 1 deletion packages/engine-multi/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ export type EngineOptions = {
logger: Logger;
maxWorkers?: number;
memoryLimitMb?: number;
stateLimitMb?: number;
payloadLimitMb?: number;
logPayloadLimitMb?: number;
repoDir: string;
Expand Down Expand Up @@ -169,7 +170,7 @@ const createEngine = async (
callWorker,
options: {
...options,
stateLimitMb: opts.stateLimitMb,
stateLimitMb: opts.stateLimitMb ?? options.stateLimitMb,
sanitize: opts.sanitize,
resolvers: opts.resolvers,
runTimeoutMs: opts.runTimeoutMs ?? defaultTimeout,
Expand Down
6 changes: 6 additions & 0 deletions packages/lexicon/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# lexicon

## 2.3.0

### Minor Changes

- 8ebc086: Add support for state memory limit via worker

## 2.2.1

### Patch Changes
Expand Down
1 change: 1 addition & 0 deletions packages/lexicon/lightning.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ export type LightningPlanOptions = {
output_dataclips?: boolean;

run_memory_limit_mb?: number;
state_limit_mb?: number;
payload_limit_mb?: number;
log_payload_limit_mb?: number;
job_log_level?: LogLevel;
Expand Down
2 changes: 1 addition & 1 deletion packages/lexicon/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/lexicon",
"version": "2.2.1",
"version": "2.3.0",
"description": "Central repo of names and type definitions",
"author": "Open Function Group <admin@openfn.org>",
"license": "ISC",
Expand Down
8 changes: 8 additions & 0 deletions packages/lightning-mock/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# @openfn/lightning-mock

## 2.4.20

### Patch Changes

- Updated dependencies [8ebc086]
- @openfn/engine-multi@1.12.0
- @openfn/lexicon@2.3.0

## 2.4.19

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/lightning-mock/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/lightning-mock",
"version": "2.4.19",
"version": "2.4.20",
"private": true,
"description": "A mock Lightning server",
"main": "dist/index.js",
Expand Down
3 changes: 2 additions & 1 deletion packages/runtime/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"fast-safe-stringify": "^2.1.1",
"json-stream-stringify": "^3.1.6",
"semver": "^7.7.4",
"source-map": "^0.7.6"
"source-map": "^0.7.6",
"stream-json": "^3.4.0"
}
}
17 changes: 9 additions & 8 deletions packages/runtime/src/execute/step.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,6 @@ const prepareFinalState = async (
) => {
if (isNullState(state)) return undefined;
if (state) {
try {
await ensureStateSize(state, stateLimit_mb);
} catch (e: any) {
logger.error('Critical error processing state: ', e.message);
throw e;
}

if (!statePropsToRemove) {
// As a strict default, remove the configuration key
// tbh this should happen higher up in the stack but it causes havoc in unit testing
Expand All @@ -115,8 +108,16 @@ const prepareFinalState = async (
`Cleaning up state. Removing keys: ${removedProps.join(', ')}`
);

return clone(state);
try {
// TODO we're now using this function to
// safely serialize
return ensureStateSize(state, stateLimit_mb, logger);
} catch (e: any) {
logger.error('Critical error processing state: ', e.message);
throw e;
}
}

return state;
};
// The job handler is responsible for preparing the job
Expand Down
3 changes: 3 additions & 0 deletions packages/runtime/src/stream-json.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
declare module 'stream-json/assembler.js' {
export { Assembler as default, Assembler, AssemblerOptions } from 'stream-json/src/assembler';
}
66 changes: 59 additions & 7 deletions packages/runtime/src/util/ensure-state-size.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { JsonStreamStringify } from 'json-stream-stringify';
import { StateTooLargeError } from '../errors';
import { Logger } from '@openfn/logger';
import { parser } from 'stream-json';
import Assembler from 'stream-json/assembler.js';
import { Transform } from 'stream';
import { pipeline } from 'stream/promises';

const replacer = (_key: string, value: any) => {
// Ignore non serializable keys
Expand All @@ -15,18 +20,65 @@ const replacer = (_key: string, value: any) => {
};

// throws if state exceeds a particular size limit
export default async (value: any, limit_mb: number = 500) => {
export default async (value: any, limit_mb: number = 500, logger?: Logger) => {
if (value && !isNaN(limit_mb) && limit_mb > 0) {
const limitBytes = limit_mb * 1024 * 1024;
let size_bytes = 0;
const stream = new JsonStreamStringify(value, replacer, 0, true);
for await (const chunk of stream) {
// Each chunk is a string token from the JSON output
size_bytes += Buffer.byteLength(chunk, 'utf8');

if (size_bytes > limitBytes) {
throw new StateTooLargeError(limit_mb);
const start = Date.now();
const source = new JsonStreamStringify(value, replacer, 0, true);

const sizeGuard = new Transform({
transform(chunk, _enc, cb) {
size_bytes += Buffer.byteLength(chunk, 'utf8');
if (size_bytes > limitBytes) {
return cb(new StateTooLargeError(limit_mb));
}

cb(null, chunk);
},
});

const jsonParser = parser.asStream();
const asm = Assembler.connectTo(jsonParser, {
reviver: (_key, value) => {
if (
value &&
typeof value === 'object' &&
typeof value.$ref === 'string'
) {
return '[Circular]';
}
return value;
},
});

try {
await pipeline(source, sizeGuard, jsonParser);
} catch (e) {
if (e instanceof StateTooLargeError) {
logger?.info(
`state object exceeds limit ${limit_mb} (${(
size_bytes /
1024 /
1024
).toFixed(2)}mb)`
);
}
throw e;
}
const duration = Date.now() - start;

if (size_bytes < 1024 * 1024) {
logger?.debug(`State object serializes to less than 1mb (${duration}ms)`);
} else {
logger?.debug(
`State object serializes to ${(size_bytes / 1024 / 1024).toFixed(
2
)}mb (${duration}ms)`
);
}

return asm.current;
}
};
31 changes: 31 additions & 0 deletions packages/runtime/test/util/ensure-state-size.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,34 @@ test('handle Set in state', async (t) => {
};
await t.notThrowsAsync(() => ensureStateSize(state, 2 / 1024));
});

test('returns the state object', async (t) => {
const state = { data: 'hello', count: 42 };
const result = await ensureStateSize(state, 2 / 1024);
t.deepEqual(result, { data: 'hello', count: 42 });
});

test('circular references become [Circular] in the result', async (t) => {
const state: any = { data: 'test' };
state.self = state;
const result: any = await ensureStateSize(state, 2 / 1024);
t.is(result.self, '[Circular]');
});

test('functions are stripped from the result', async (t) => {
const state = { data: 'test', fn: () => 'hello' };
const result: any = await ensureStateSize(state, 2 / 1024);
t.false('fn' in result);
});

test('undefined values are stripped from the result', async (t) => {
const state = { data: 'test', undef: undefined };
const result: any = await ensureStateSize(state, 2 / 1024);
t.false('undef' in result);
});

test('promises are stripped from the result', async (t) => {
const state = { data: 'test', promise: new Promise((r) => r(null)) };
const result: any = await ensureStateSize(state, 2 / 1024);
t.false('promise' in result);
});
Loading
Loading