diff --git a/integration-tests/worker/CHANGELOG.md b/integration-tests/worker/CHANGELOG.md index 0dde8b32c..c9890bd50 100644 --- a/integration-tests/worker/CHANGELOG.md +++ b/integration-tests/worker/CHANGELOG.md @@ -1,5 +1,18 @@ # @openfn/integration-tests-worker +## 1.1.0 + +### Minor Changes + +- 8ebc086: Add support for state memory limit via worker + +### Patch Changes + +- Updated dependencies [8ebc086] + - @openfn/engine-multi@1.12.0 + - @openfn/ws-worker@1.27.0 + - @openfn/lightning-mock@2.4.20 + ## 1.0.96 ### Patch Changes diff --git a/integration-tests/worker/package.json b/integration-tests/worker/package.json index 7b88e3a83..01cd47ca2 100644 --- a/integration-tests/worker/package.json +++ b/integration-tests/worker/package.json @@ -1,7 +1,7 @@ { "name": "@openfn/integration-tests-worker", "private": true, - "version": "1.0.96", + "version": "1.1.0", "description": "Lightning WOrker integration tests", "author": "Open Function Group ", "license": "ISC", diff --git a/integration-tests/worker/test/exit-reasons.test.ts b/integration-tests/worker/test/exit-reasons.test.ts index f378c6ecd..d2bc91fa6 100644 --- a/integration-tests/worker/test/exit-reasons.test.ts +++ b/integration-tests/worker/test/exit-reasons.test.ts @@ -226,6 +226,92 @@ test.serial('kill: oom (large, kill vm)', async (t) => { t.is(error_message, 'Run exceeded maximum memory usage'); }); +test.serial('kill: state exceeds the configured state limit', async (t) => { + const attempt = { + id: crypto.randomUUID(), + jobs: [ + { + adaptor: '@openfn/language-common@latest', + // ~2mb string for a 1mb limit + body: `fn((s) => { + s.data = new Array(2 * 1024 * 1024).fill('a').join(''); + return s; + })`, + }, + ], + options: { + state_limit_mb: 1, + }, + }; + + const result = await run(attempt); + + const { reason, error_type, error_message } = result; + t.is(reason, 'kill'); + t.is(error_type, 'StateTooLargeError'); + t.regex(error_message, /State exceeds the limit of 1mb/); +}); + +test.serial( + 'kill: state limit is enforced between jobs (downstream job does not run)', + async (t) => { + const jobOne = { + id: crypto.randomUUID(), + adaptor: '@openfn/language-common@latest', + // ~2mb state, over the 1mb limit set below + body: `fn((s) => { + s.data = new Array(2 * 1024 * 1024).fill('a').join(''); + return s; + })`, + }; + + // not expected to run because the first job is expected to trigger state size crash + const jobTwo = { + id: crypto.randomUUID(), + adaptor: '@openfn/language-common@latest', + body: `fn(() => ({ data: 'ok' }))`, + }; + + const attempt = { + id: crypto.randomUUID(), + jobs: [jobOne, jobTwo], + edges: [ + { + id: crypto.randomUUID(), + source_job_id: jobOne.id, + target_job_id: jobTwo.id, + condition: 'always', + }, + ], + options: { + state_limit_mb: 1, + }, + }; + + const startedJobs: string[] = []; + const unsubscribe = lightning.onSocketEvent( + 'step:start', + attempt.id, + (evt) => { + if (evt.runId === attempt.id) { + startedJobs.push(evt.payload.job_id); + } + }, + false + ); + + const result = await run(attempt); + unsubscribe(); + + const { reason, error_type, error_message } = result; + t.is(reason, 'kill'); + t.is(error_type, 'StateTooLargeError'); + t.regex(error_message, /State exceeds the limit of 1mb/); + + t.deepEqual(startedJobs, [jobOne.id]); + } +); + test.serial('crash: process.exit() triggered by postgres', async (t) => { const attempt = { id: crypto.randomUUID(), diff --git a/packages/engine-multi/CHANGELOG.md b/packages/engine-multi/CHANGELOG.md index 7687e6f44..2e47b95ae 100644 --- a/packages/engine-multi/CHANGELOG.md +++ b/packages/engine-multi/CHANGELOG.md @@ -1,5 +1,17 @@ # engine-multi +## 1.12.0 + +### Minor Changes + +- 8ebc086: Add support for state memory limit via worker +- Reduce support size of state objects + +### Patch Changes + +- Updated dependencies [8ebc086] + - @openfn/lexicon@2.3.0 + ## 1.11.4 ### Patch Changes diff --git a/packages/engine-multi/package.json b/packages/engine-multi/package.json index 1e4a1f6c4..35ef12543 100644 --- a/packages/engine-multi/package.json +++ b/packages/engine-multi/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/engine-multi", - "version": "1.11.4", + "version": "1.12.0", "description": "Multi-process runtime engine", "main": "dist/index.js", "type": "module", diff --git a/packages/engine-multi/src/api/execute.ts b/packages/engine-multi/src/api/execute.ts index 4281eae9f..57f5fa561 100644 --- a/packages/engine-multi/src/api/execute.ts +++ b/packages/engine-multi/src/api/execute.ts @@ -50,7 +50,7 @@ const execute = async (context: ExecutionContext) => { // This must be fairly high to prevent crashes stateLimitMb: options.stateLimitMb ?? - Math.max((options.memoryLimitMb ?? 1000) * 0.25), + Math.max(50, options.memoryLimitMb ?? 1000 * 0.15), } as RunOptions; logger.debug( diff --git a/packages/engine-multi/src/engine.ts b/packages/engine-multi/src/engine.ts index 9360d5c82..38467b97c 100644 --- a/packages/engine-multi/src/engine.ts +++ b/packages/engine-multi/src/engine.ts @@ -74,6 +74,7 @@ export type EngineOptions = { logger: Logger; maxWorkers?: number; memoryLimitMb?: number; + stateLimitMb?: number; payloadLimitMb?: number; logPayloadLimitMb?: number; repoDir: string; @@ -169,7 +170,7 @@ const createEngine = async ( callWorker, options: { ...options, - stateLimitMb: opts.stateLimitMb, + stateLimitMb: opts.stateLimitMb ?? options.stateLimitMb, sanitize: opts.sanitize, resolvers: opts.resolvers, runTimeoutMs: opts.runTimeoutMs ?? defaultTimeout, diff --git a/packages/lexicon/CHANGELOG.md b/packages/lexicon/CHANGELOG.md index a7a38c52e..85715c9ec 100644 --- a/packages/lexicon/CHANGELOG.md +++ b/packages/lexicon/CHANGELOG.md @@ -1,5 +1,11 @@ # lexicon +## 2.3.0 + +### Minor Changes + +- 8ebc086: Add support for state memory limit via worker + ## 2.2.1 ### Patch Changes diff --git a/packages/lexicon/lightning.d.ts b/packages/lexicon/lightning.d.ts index aa93a7304..8554c0fb6 100644 --- a/packages/lexicon/lightning.d.ts +++ b/packages/lexicon/lightning.d.ts @@ -59,6 +59,7 @@ export type LightningPlanOptions = { output_dataclips?: boolean; run_memory_limit_mb?: number; + state_limit_mb?: number; payload_limit_mb?: number; log_payload_limit_mb?: number; job_log_level?: LogLevel; diff --git a/packages/lexicon/package.json b/packages/lexicon/package.json index b8671c947..7ea4cc990 100644 --- a/packages/lexicon/package.json +++ b/packages/lexicon/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/lexicon", - "version": "2.2.1", + "version": "2.3.0", "description": "Central repo of names and type definitions", "author": "Open Function Group ", "license": "ISC", diff --git a/packages/lightning-mock/CHANGELOG.md b/packages/lightning-mock/CHANGELOG.md index bef369a56..b26ec2745 100644 --- a/packages/lightning-mock/CHANGELOG.md +++ b/packages/lightning-mock/CHANGELOG.md @@ -1,5 +1,13 @@ # @openfn/lightning-mock +## 2.4.20 + +### Patch Changes + +- Updated dependencies [8ebc086] + - @openfn/engine-multi@1.12.0 + - @openfn/lexicon@2.3.0 + ## 2.4.19 ### Patch Changes diff --git a/packages/lightning-mock/package.json b/packages/lightning-mock/package.json index 649bfbb18..6f92f16ed 100644 --- a/packages/lightning-mock/package.json +++ b/packages/lightning-mock/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/lightning-mock", - "version": "2.4.19", + "version": "2.4.20", "private": true, "description": "A mock Lightning server", "main": "dist/index.js", diff --git a/packages/runtime/package.json b/packages/runtime/package.json index 32967ce27..0f9ded42a 100644 --- a/packages/runtime/package.json +++ b/packages/runtime/package.json @@ -53,6 +53,7 @@ "fast-safe-stringify": "^2.1.1", "json-stream-stringify": "^3.1.6", "semver": "^7.7.4", - "source-map": "^0.7.6" + "source-map": "^0.7.6", + "stream-json": "^3.4.0" } } diff --git a/packages/runtime/src/execute/step.ts b/packages/runtime/src/execute/step.ts index e5ef79a52..cb1ae4871 100644 --- a/packages/runtime/src/execute/step.ts +++ b/packages/runtime/src/execute/step.ts @@ -90,13 +90,6 @@ const prepareFinalState = async ( ) => { if (isNullState(state)) return undefined; if (state) { - try { - await ensureStateSize(state, stateLimit_mb); - } catch (e: any) { - logger.error('Critical error processing state: ', e.message); - throw e; - } - if (!statePropsToRemove) { // As a strict default, remove the configuration key // tbh this should happen higher up in the stack but it causes havoc in unit testing @@ -115,8 +108,16 @@ const prepareFinalState = async ( `Cleaning up state. Removing keys: ${removedProps.join(', ')}` ); - return clone(state); + try { + // TODO we're now using this function to + // safely serialize + return ensureStateSize(state, stateLimit_mb, logger); + } catch (e: any) { + logger.error('Critical error processing state: ', e.message); + throw e; + } } + return state; }; // The job handler is responsible for preparing the job diff --git a/packages/runtime/src/stream-json.d.ts b/packages/runtime/src/stream-json.d.ts new file mode 100644 index 000000000..cb2bc8fa9 --- /dev/null +++ b/packages/runtime/src/stream-json.d.ts @@ -0,0 +1,3 @@ +declare module 'stream-json/assembler.js' { + export { Assembler as default, Assembler, AssemblerOptions } from 'stream-json/src/assembler'; +} diff --git a/packages/runtime/src/util/ensure-state-size.ts b/packages/runtime/src/util/ensure-state-size.ts index 84ab9744b..52947f266 100644 --- a/packages/runtime/src/util/ensure-state-size.ts +++ b/packages/runtime/src/util/ensure-state-size.ts @@ -1,5 +1,10 @@ import { JsonStreamStringify } from 'json-stream-stringify'; import { StateTooLargeError } from '../errors'; +import { Logger } from '@openfn/logger'; +import { parser } from 'stream-json'; +import Assembler from 'stream-json/assembler.js'; +import { Transform } from 'stream'; +import { pipeline } from 'stream/promises'; const replacer = (_key: string, value: any) => { // Ignore non serializable keys @@ -15,18 +20,65 @@ const replacer = (_key: string, value: any) => { }; // throws if state exceeds a particular size limit -export default async (value: any, limit_mb: number = 500) => { +export default async (value: any, limit_mb: number = 500, logger?: Logger) => { if (value && !isNaN(limit_mb) && limit_mb > 0) { const limitBytes = limit_mb * 1024 * 1024; let size_bytes = 0; - const stream = new JsonStreamStringify(value, replacer, 0, true); - for await (const chunk of stream) { - // Each chunk is a string token from the JSON output - size_bytes += Buffer.byteLength(chunk, 'utf8'); - if (size_bytes > limitBytes) { - throw new StateTooLargeError(limit_mb); + const start = Date.now(); + const source = new JsonStreamStringify(value, replacer, 0, true); + + const sizeGuard = new Transform({ + transform(chunk, _enc, cb) { + size_bytes += Buffer.byteLength(chunk, 'utf8'); + if (size_bytes > limitBytes) { + return cb(new StateTooLargeError(limit_mb)); + } + + cb(null, chunk); + }, + }); + + const jsonParser = parser.asStream(); + const asm = Assembler.connectTo(jsonParser, { + reviver: (_key, value) => { + if ( + value && + typeof value === 'object' && + typeof value.$ref === 'string' + ) { + return '[Circular]'; + } + return value; + }, + }); + + try { + await pipeline(source, sizeGuard, jsonParser); + } catch (e) { + if (e instanceof StateTooLargeError) { + logger?.info( + `state object exceeds limit ${limit_mb} (${( + size_bytes / + 1024 / + 1024 + ).toFixed(2)}mb)` + ); } + throw e; } + const duration = Date.now() - start; + + if (size_bytes < 1024 * 1024) { + logger?.debug(`State object serializes to less than 1mb (${duration}ms)`); + } else { + logger?.debug( + `State object serializes to ${(size_bytes / 1024 / 1024).toFixed( + 2 + )}mb (${duration}ms)` + ); + } + + return asm.current; } }; diff --git a/packages/runtime/test/util/ensure-state-size.test.ts b/packages/runtime/test/util/ensure-state-size.test.ts index dfb5616e3..8b1f52f5d 100644 --- a/packages/runtime/test/util/ensure-state-size.test.ts +++ b/packages/runtime/test/util/ensure-state-size.test.ts @@ -106,3 +106,34 @@ test('handle Set in state', async (t) => { }; await t.notThrowsAsync(() => ensureStateSize(state, 2 / 1024)); }); + +test('returns the state object', async (t) => { + const state = { data: 'hello', count: 42 }; + const result = await ensureStateSize(state, 2 / 1024); + t.deepEqual(result, { data: 'hello', count: 42 }); +}); + +test('circular references become [Circular] in the result', async (t) => { + const state: any = { data: 'test' }; + state.self = state; + const result: any = await ensureStateSize(state, 2 / 1024); + t.is(result.self, '[Circular]'); +}); + +test('functions are stripped from the result', async (t) => { + const state = { data: 'test', fn: () => 'hello' }; + const result: any = await ensureStateSize(state, 2 / 1024); + t.false('fn' in result); +}); + +test('undefined values are stripped from the result', async (t) => { + const state = { data: 'test', undef: undefined }; + const result: any = await ensureStateSize(state, 2 / 1024); + t.false('undef' in result); +}); + +test('promises are stripped from the result', async (t) => { + const state = { data: 'test', promise: new Promise((r) => r(null)) }; + const result: any = await ensureStateSize(state, 2 / 1024); + t.false('promise' in result); +}); diff --git a/packages/ws-worker/CHANGELOG.md b/packages/ws-worker/CHANGELOG.md index 479a569e1..86a2dbd26 100644 --- a/packages/ws-worker/CHANGELOG.md +++ b/packages/ws-worker/CHANGELOG.md @@ -1,5 +1,18 @@ # ws-worker +## 1.27.0 + +### Minor Changes + +- 8ebc086: Add support for state memory limit via worker +- Reduce support size of state objects + +### Patch Changes + +- Updated dependencies [8ebc086] + - @openfn/engine-multi@1.12.0 + - @openfn/lexicon@2.3.0 + ## 1.26.1 ### Patch Changes diff --git a/packages/ws-worker/package.json b/packages/ws-worker/package.json index 8739662d3..affd051da 100644 --- a/packages/ws-worker/package.json +++ b/packages/ws-worker/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/ws-worker", - "version": "1.26.1", + "version": "1.27.0", "description": "A Websocket Worker to connect Lightning to a Runtime Engine", "main": "dist/index.js", "type": "module", diff --git a/packages/ws-worker/src/start.ts b/packages/ws-worker/src/start.ts index 9d901c7af..4e08e2601 100644 --- a/packages/ws-worker/src/start.ts +++ b/packages/ws-worker/src/start.ts @@ -111,6 +111,7 @@ if (args.mock) { const engineOptions = { repoDir: args.repoDir, memoryLimitMb: args.runMemory, + stateLimitMb: args.stateMemory, maxWorkers: effectiveCapacity, statePropsToRemove: args.statePropsToRemove, runTimeoutMs: args.maxRunDurationSeconds * 1000, diff --git a/packages/ws-worker/src/util/cli.ts b/packages/ws-worker/src/util/cli.ts index ebae7ccb5..f2850a383 100644 --- a/packages/ws-worker/src/util/cli.ts +++ b/packages/ws-worker/src/util/cli.ts @@ -38,6 +38,7 @@ type Args = { profilePollIntervalMs?: number; repoDir?: string; runMemory?: number; + stateMemory?: number; secret?: string; sentryDsn?: string; sentryEnv?: string; @@ -91,6 +92,7 @@ export default function parseArgs(argv: string[]): Args { WORKER_MAX_LOG_PAYLOAD_MB, WORKER_MAX_RUN_DURATION_SECONDS, WORKER_MAX_RUN_MEMORY_MB, + WORKER_MAX_STATE_MEMORY_MB, WORKER_MESSAGE_TIMEOUT_SECONDS, WORKER_PORT, WORKER_PROFILE_POLL_INTERVAL_MS, @@ -219,6 +221,11 @@ export default function parseArgs(argv: string[]): Args { 'Maximum memory allocated to a single run, in mb. Env: WORKER_MAX_RUN_MEMORY_MB', type: 'number', }) + .option('state-memory', { + description: + 'Maximum size of the state object returned by each step, in mb. Defaults to 25% of run-memory. Env: WORKER_MAX_STATE_MEMORY_MB', + type: 'number', + }) .option('payload-memory', { description: 'Maximum memory allocated to a single run, in mb. Env: WORKER_MAX_PAYLOAD_MB', @@ -326,6 +333,12 @@ export default function parseArgs(argv: string[]): Args { ['configuration', 'response'] ), runMemory: setArg(args.runMemory, WORKER_MAX_RUN_MEMORY_MB, 500), + // No default: when unset the engine derives the limit from run-memory (25%) + stateMemory: + args.stateMemory ?? + (WORKER_MAX_STATE_MEMORY_MB + ? parseInt(WORKER_MAX_STATE_MEMORY_MB, 10) + : undefined), payloadMemory: setArg(args.payloadMemory, WORKER_MAX_PAYLOAD_MB, 10), logPayloadMemory: setArg( args.logPayloadMemory, diff --git a/packages/ws-worker/src/util/convert-lightning-plan.ts b/packages/ws-worker/src/util/convert-lightning-plan.ts index 4307e7dbe..f9fe2180b 100644 --- a/packages/ws-worker/src/util/convert-lightning-plan.ts +++ b/packages/ws-worker/src/util/convert-lightning-plan.ts @@ -146,6 +146,9 @@ export default ( if ('run_memory_limit_mb' in run.options) { engineOpts.memoryLimitMb = run.options.run_memory_limit_mb; } + if ('state_limit_mb' in run.options) { + engineOpts.stateLimitMb = run.options.state_limit_mb; + } if ('sanitize' in run.options) { engineOpts.sanitize = run.options.sanitize; } diff --git a/packages/ws-worker/test/api/execute.test.ts b/packages/ws-worker/test/api/execute.test.ts index 8d0e70686..1e33e615d 100644 --- a/packages/ws-worker/test/api/execute.test.ts +++ b/packages/ws-worker/test/api/execute.test.ts @@ -301,8 +301,11 @@ test('loadDataclip report to sentry on fail', async (t) => { } catch (e) {} const reports = await waitForSentryReport(testkit); - t.is(reports.length, 1); - t.is(reports[0].error.name, 'LightningSocketError'); + const dataclip_report = reports.find((r: any) => + /fetch\:dataclip.+not_found/i.test(r.error.message) + ); + t.truthy(dataclip_report); + t.is(dataclip_report.error.name, 'LightningSocketError'); }); test('loadCredential should fetch a credential', async (t) => { diff --git a/packages/ws-worker/test/util/cli.test.ts b/packages/ws-worker/test/util/cli.test.ts index 698008dc8..be156dd72 100644 --- a/packages/ws-worker/test/util/cli.test.ts +++ b/packages/ws-worker/test/util/cli.test.ts @@ -62,6 +62,7 @@ test('cli should set default values for unspecified options', (t) => { t.falsy(args.sentryDsn); t.deepEqual(args.statePropsToRemove, ['configuration', 'response']); t.is(args.runMemory, 500); + t.is(args.stateMemory, undefined); t.is(args.maxRunDurationSeconds, 300); t.is(args.engineValidationRetries, 3); t.is(args.engineValidationTimeoutMs, 5000); diff --git a/packages/ws-worker/test/util/convert-lightning-plan.test.ts b/packages/ws-worker/test/util/convert-lightning-plan.test.ts index 12a11a0e4..224855125 100644 --- a/packages/ws-worker/test/util/convert-lightning-plan.test.ts +++ b/packages/ws-worker/test/util/convert-lightning-plan.test.ts @@ -123,6 +123,25 @@ test('convert a single job with options', (t) => { }); }); +test('convert a single job with state_limit_mb', (t) => { + const run: Partial = { + id: 'w', + jobs: [createNode()], + triggers: [], + edges: [], + options: { + run_memory_limit_mb: 500, + state_limit_mb: 50, + }, + }; + const { options } = convertPlan(run as LightningPlan); + + t.deepEqual(options, { + memoryLimitMb: 500, + stateLimitMb: 50, + }); +}); + test('convert a single job with log_payload_limit_mb', (t) => { const run: Partial = { id: 'w', diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f6c86ac69..b8cce651d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -668,6 +668,9 @@ importers: source-map: specifier: ^0.7.6 version: 0.7.6 + stream-json: + specifier: ^3.4.0 + version: 3.4.0 devDependencies: '@openfn/compiler': specifier: workspace:^ @@ -4011,9 +4014,17 @@ packages: stream-chain@2.2.5: resolution: {integrity: sha512-1TJmBx6aSWqZ4tx7aTpBDXK0/e2hhcNSTV8+CbFJtDjbb+I1mZ8lHit0Grw9GRT+6JbIrrDd8esncgBi8aBXGA==} + stream-chain@4.2.5: + resolution: {integrity: sha512-Wtyq3bNE3ggLR0v2vftqvuhltym3WbZAkZpfIrkr5F/6vpeUmWmwTgXa16zD87gpahwJ/Qulq3zVfUlgIc0J2A==} + engines: {node: '>=22'} + stream-json@1.9.1: resolution: {integrity: sha512-uWkjJ+2Nt/LO9Z/JyKZbMusL8Dkh97uUBTv3AJQ74y07lVahLY4eEFsPsE97pxYBwr8nnjMAIch5eqI0gPShyw==} + stream-json@3.4.0: + resolution: {integrity: sha512-7NEiOhtS0W7ZrwnqJc/fugsGQMdpNJng+v1s2xLTrmfBN2vcYDyBglCK2+eITif0VkHLXnTdALgQI9TnZMBhYA==} + engines: {node: '>=22'} + stream-shift@1.0.3: resolution: {integrity: sha512-76ORR0DO1o1hlKwTbi/DM3EXWGf3ZJYO8cXX5RJwnul2DEg2oyoZyjLNoQM8WsvZiFKCRfC1O0J7iCvie3RZmQ==} @@ -7641,10 +7652,16 @@ snapshots: stream-chain@2.2.5: {} + stream-chain@4.2.5: {} + stream-json@1.9.1: dependencies: stream-chain: 2.2.5 + stream-json@3.4.0: + dependencies: + stream-chain: 4.2.5 + stream-shift@1.0.3: {} streamx@2.23.0: