diff --git a/Dockerfile b/Dockerfile index 72805704c..028e2b1f1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,7 +18,7 @@ WORKDIR /app # TODO: remove simple build once prod optimized build is working --------------- FROM base AS ws-worker -RUN apk add --no-cache git +RUN apk add --no-cache git util-linux RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install --frozen-lockfile RUN pnpm build WORKDIR /app/packages/ws-worker diff --git a/packages/engine-multi/src/api.ts b/packages/engine-multi/src/api.ts index 29450b16a..3a5a7ec59 100644 --- a/packages/engine-multi/src/api.ts +++ b/packages/engine-multi/src/api.ts @@ -27,7 +27,7 @@ export type APIOptions = Partial>; const DEFAULT_REPO_DIR = path.join(os.homedir(), '.openfn/worker/repo'); -const DEFAULT_MEMORY_LIMIT = 500; +const DEFAULT_RUN_MEMORY_LIMIT = 500; // Create the engine and handle user-facing stuff, like options parsing // and defaulting @@ -57,7 +57,7 @@ const createAPI = async function ( autoinstall: options.autoinstall, maxWorkers: options.maxWorkers, - memoryLimitMb: options.memoryLimitMb || DEFAULT_MEMORY_LIMIT, + memoryLimitMb: options.memoryLimitMb || DEFAULT_RUN_MEMORY_LIMIT, runTimeoutMs: options.runTimeoutMs, statePropsToRemove: options.statePropsToRemove ?? [ diff --git a/packages/engine-multi/src/api/call-worker.ts b/packages/engine-multi/src/api/call-worker.ts index c3a062407..6e9f7aff0 100644 --- a/packages/engine-multi/src/api/call-worker.ts +++ b/packages/engine-multi/src/api/call-worker.ts @@ -10,6 +10,7 @@ export type WorkerEvent = { type WorkerOptions = { maxWorkers?: number; + maxWorkerMemoryMb?: number; // kernel-level memory limit per child process (cgroup v2) env?: any; timeout?: number; // ms memoryLimitMb?: number; @@ -26,6 +27,7 @@ export default function initWorkers( const { env = {}, maxWorkers = 5, + maxWorkerMemoryMb, memoryLimitMb, proxyStdout = false, } = options; @@ -34,6 +36,7 @@ export default function initWorkers( workerPath, { maxWorkers, + maxWorkerMemoryMb, env, memoryLimitMb, proxyStdout, diff --git a/packages/engine-multi/src/engine.ts b/packages/engine-multi/src/engine.ts index 9360d5c82..93a030225 100644 --- a/packages/engine-multi/src/engine.ts +++ b/packages/engine-multi/src/engine.ts @@ -73,6 +73,11 @@ export type EngineOptions = { // compile?: { skip?: boolean } // TODO no support yet logger: Logger; maxWorkers?: number; + + /** Sets the maximum total memory usable by the engine. Otherwise uses all available memory */ + maxEngineMemoryMb?: number; + + /** Memory limit per run */ memoryLimitMb?: number; payloadLimitMb?: number; logPayloadLimitMb?: number; @@ -141,6 +146,7 @@ const createEngine = async ( { maxWorkers: options.maxWorkers, memoryLimitMb: defaultMemoryLimit, + maxWorkerMemoryMb: defaultMemoryLimit, proxyStdout: options.proxyStdout, }, options.logger diff --git a/packages/engine-multi/src/util/memory.ts b/packages/engine-multi/src/util/memory.ts new file mode 100644 index 000000000..ee813fbb8 --- /dev/null +++ b/packages/engine-multi/src/util/memory.ts @@ -0,0 +1,3 @@ +export const mb = (valueInBytes: number) => valueInBytes / 1024 / 1024; + +export const b = (valueInMb: number) => valueInMb * 1024 * 1024; diff --git a/packages/engine-multi/src/worker/limits.ts b/packages/engine-multi/src/worker/limits.ts new file mode 100644 index 000000000..8fd96ffdf --- /dev/null +++ b/packages/engine-multi/src/worker/limits.ts @@ -0,0 +1,118 @@ +import process from 'node:process'; +import os from 'node:os'; +import { execSync } from 'node:child_process'; +import type { Logger } from '@openfn/logger'; +import { ChildProcess } from 'node:child_process'; +import { b, mb } from '../util/memory'; +import type { PoolOptions } from './pool'; + +let prlimitAvailable: boolean | null = null; + +/** + * Check if the prlimit command is available (Linux with util-linux). + * Result is cached for the process lifetime. + */ +export function detectPrlimitSupport(): boolean { + if (prlimitAvailable === null) { + try { + execSync('prlimit', ['--version'], { stdio: 'ignore' }); + prlimitAvailable = true; + } catch { + prlimitAvailable = false; + } + } + + return prlimitAvailable; +} + +export const getAvailableMemory = (options: PoolOptions): number => { + if (options.totalMemoryMb) { + return b(options.totalMemoryMb); + } + const total = os.totalmem(); + const constrained = process.constrainedMemory?.() ?? total; + return Math.floor(Math.min(total, constrained)); +}; + +export const calculateLimits = ( + totalMemory_bytes: number, + mainProcessOverhead_bytes: number, + capacity: number +): number => { + return Math.floor((totalMemory_bytes - mainProcessOverhead_bytes) / capacity); +}; + +export function setHardMemoryLimit( + child: ChildProcess, + limit_bytes: number, + logger: Logger +) { + if (prlimitAvailable) { + logger.debug( + `pool: setting hard limit on pid ${child.pid} to ${Math.floor( + mb(limit_bytes) + )}mb` + ); + const roundLimit = Math.round(limit_bytes); + + // this will set the hard and soft limits at once + // The hard limit is the absolute maximum we can possibly set + // (but doesn't actually allocate memory, just sets a ceiling) + // The soft limit can be changed per run + execSync(`prlimit --pid=${child.pid} --as=${roundLimit}`); + + // Pretty print the result + const out = execSync( + `prlimit --pid=${child.pid} --as --noheadings -o "SOFT,HARD,UNITS"` + ); + logger.debug(' > ', out.toString()); + } +} + +// TODO before each run starst, we should set the soft limit +// to that run's limit +/** + * Set a memory limit on a child process + * If prlmit is available, this will set the soft limit + * + * Apply RLIMIT_AS (virtual address space limit) to a child process. + * When exceeded, mmap/brk fails with ENOMEM, causing the process to crash. + */ + +// TODO: it's plausible that du to bad config the hard limit +// is lower than the actual run memory limit +// if this true we'll get an error here +// we should probably od a check and raise a warning, then use the smallest of soft and hard limit +// most runs should be quite happy to run in this +export function applyMemoryLimit( + child: ChildProcess, + limitBytes: number, + logger: Logger +): boolean { + if (prlimitAvailable) { + const pid = child.pid; + try { + console.log({ hardLimit }); + console.log(`prlimit --pid ${child.pid} --as=${limitBytes}:`); + // note we still have to pass the hard limit here + const out = execSync(`prlimit --pid ${child.pid} --as=${limitBytes}:`); + console.log({ out: out.toString() }); + logger.debug( + `Soft memory limit on worker ${pid} set to ${Math.round( + mb(limitBytes) + )}MB` + ); + return true; + } catch (e: any) { + logger.warn(`Failed to set soft for worker ${pid}:`, e.message); + return false; + } + } + return false; +} + +// TODO rename this. maybe make it setprlimit rather than a global reset +// Exported for testing only +export function _resetCache(): void { + prlimitAvailable = null; +} diff --git a/packages/engine-multi/src/worker/pool.ts b/packages/engine-multi/src/worker/pool.ts index 48a268803..565f40a16 100644 --- a/packages/engine-multi/src/worker/pool.ts +++ b/packages/engine-multi/src/worker/pool.ts @@ -13,16 +13,30 @@ import { import { HANDLED_EXIT_CODE } from '../events'; import { Logger } from '@openfn/logger'; import type { PayloadLimits } from './thread/runtime'; +import { + detectPrlimitSupport, + getAvailableMemory, + calculateLimits, + setHardMemoryLimit, + applyMemoryLimit, +} from './limits'; +import { b, mb } from '../util/memory'; export type PoolOptions = { capacity?: number; // defaults to 5 maxWorkers?: number; // alias for capacity. Which is best? + + /* Total memory available to the whole engine (will auto-detect if not set) */ + totalMemoryMb?: number; env?: Record; // default environment for workers memoryLimitMb?: number; // --max-old-space-size for child processes proxyStdout?: boolean; // print internal stdout to console }; +// TODO set through options +const ENGINE_MAIN_OVERHEAD_MB = 400; + type RunTaskEvent = { type: typeof ENGINE_RUN_TASK; task: string; @@ -72,7 +86,22 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { logger.debug(`pool: Creating new child process pool | capacity: ${capacity}`); let destroyed = false; - // a pool of processes + const hasPrlimit = detectPrlimitSupport(); + let hardMemCap_bytes; + + if (hasPrlimit) { + hardMemCap_bytes = calculateLimits( + getAvailableMemory(options), + ENGINE_MAIN_OVERHEAD_MB, + capacity + ); + logger.info( + `Memory enforcement enabled | hard limit: ${Math.floor( + mb(hardMemCap_bytes) + )}mb per child` + ); + } + const pool: ChildProcessPool = new Array(capacity).fill(false); const queue: QueuedTask[] = []; @@ -86,6 +115,8 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { // create a new child process and load the module script into it const execArgv = ['--experimental-vm-modules', '--no-warnings']; if (options.memoryLimitMb) { + // TODO this is just a fallback value - prlimit is the primary + // mechanism for controlling memory use execArgv.push(`--max-old-space-size=${options.memoryLimitMb}`); } @@ -105,12 +136,20 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { }); } + setHardMemoryLimit(child, hardMemCap_bytes!, logger); + logger.debug('pool: Created new child process', child.pid); allWorkers[child.pid!] = child; } else { child = maybeChild as ChildProcess; logger.debug('pool: Using existing child process', child.pid); } + + // TODO: memory limit is currently set for all runs in the worker according to config + // We could now set this per-run by passing a run option + if (options.memoryLimitMb) { + applyMemoryLimit(child, b(options.memoryLimitMb), logger); + } return child; }; @@ -148,7 +187,8 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { const promise = new Promise(async (resolve, reject) => { // TODO what should we do if a process in the pool dies, perhaps due to OOM? - const onExit = async (code: number) => { + const onExit = async (code: number, e) => { + console.log(code, e); if (code !== HANDLED_EXIT_CODE) { logger.debug(`pool: Worker exited unexpectedly with code ${code}`); clearTimeout(timeout); diff --git a/packages/engine-multi/test/util/memory.test.ts b/packages/engine-multi/test/util/memory.test.ts new file mode 100644 index 000000000..d28079d60 --- /dev/null +++ b/packages/engine-multi/test/util/memory.test.ts @@ -0,0 +1,17 @@ +import test from 'ava'; +import { b, mb } from '../../src/util/memory'; + +test('mb converts bytes to megabytes', (t) => { + t.is(mb(1024 * 1024), 1); + t.is(mb(512 * 1024 * 1024), 512); +}); + +test('b converts megabytes to bytes', (t) => { + t.is(b(1), 1024 * 1024); + t.is(b(512), 512 * 1024 * 1024); +}); + +test('b and mb are inverses', (t) => { + t.is(mb(b(256)), 256); + t.is(b(mb(256 * 1024 * 1024)), 256 * 1024 * 1024); +}); diff --git a/packages/engine-multi/test/worker/limit.test.ts b/packages/engine-multi/test/worker/limit.test.ts new file mode 100644 index 000000000..68d227dce --- /dev/null +++ b/packages/engine-multi/test/worker/limit.test.ts @@ -0,0 +1,66 @@ +import test from 'ava'; +import { createMockLogger } from '@openfn/logger'; + +import { + calculateLimits, + detectPrlimitSupport, + applyMemoryLimit, + _resetCache, +} from '../../src/worker/limits'; +const logger = createMockLogger(); + +test.beforeEach(() => { + _resetCache(); +}); + +test('calculateLimits divides available memory evenly across workers', (t) => { + const result = calculateLimits(100, 20, 2); + t.is(result, 40); +}); + +test('calculateLimits floors the result', (t) => { + const result = calculateLimits(100, 20, 3); + t.is(result, 26); +}); + +test('calculateLimits with capacity of 1', (t) => { + const result = calculateLimits(100, 20, 1); + t.is(result, 80); +}); + +test('detectPrlimitSupport caches the result across calls', (t) => { + const result1 = detectPrlimitSupport(logger); + const result2 = detectPrlimitSupport(logger); + t.is(result1, result2); +}); + +// On macOS, prlimit is not available +const isLinux = process.platform === 'linux'; + +if (!isLinux) { + test('detectPrlimitSupport returns false on non-Linux', (t) => { + const result = detectPrlimitSupport(logger); + t.false(result); + }); +} + +test('applyMemoryLimit returns false when prlimit is not available', (t) => { + if (detectPrlimitSupport(logger)) { + t.pass('prlimit is available — skipping negative test'); + return; + } + const result = applyMemoryLimit(99999, 500 * 1024 * 1024, logger); + t.false(result); +}); + +// Integration tests — only run on Linux with prlimit available +const hasPrlimit = isLinux && detectPrlimitSupport(createMockLogger()); +_resetCache(); // reset after the check so tests start clean + +const prlimitTest = hasPrlimit ? test : test.skip; + +prlimitTest('applyMemoryLimit succeeds on own process', (t) => { + // Apply a very generous limit to our own process (won't interfere with test) + const result = applyMemoryLimit(process.pid, 8 * 1024 * 1024 * 1024, logger); + t.true(result); +});