diff --git a/src/runtime/process-worker.ts b/src/runtime/process-worker.ts index eadc2cb..34a7653 100644 --- a/src/runtime/process-worker.ts +++ b/src/runtime/process-worker.ts @@ -26,12 +26,19 @@ export default class ProcessWorker implements TinypoolWorker { options.argv, { ...options, + stdio: 'pipe', env: { ...options.env, TINYPOOL_WORKER_ID: options.workerData[0].workerId.toString(), }, } ) + + process.stdout.setMaxListeners(1 + process.stdout.getMaxListeners()) + process.stderr.setMaxListeners(1 + process.stderr.getMaxListeners()) + this.process.stdout?.pipe(process.stdout) + this.process.stderr?.pipe(process.stderr) + this.threadId = this.process.pid! this.process.on('exit', this.onUnexpectedExit) @@ -54,6 +61,8 @@ export default class ProcessWorker implements TinypoolWorker { this.process.kill() await this.waitForExit + this.process.stdout?.unpipe(process.stdout) + this.process.stderr?.unpipe(process.stderr) this.port?.close() clearTimeout(sigkillTimeout) } @@ -136,6 +145,21 @@ export default class ProcessWorker implements TinypoolWorker { // This requires manual unreffing of its channel. this.process.channel?.unref() + if (hasUnref(this.process.stdout)) { + this.process.stdout.unref() + } + + if (hasUnref(this.process.stderr)) { + this.process.stderr.unref() + } + return this.process.unref() } } + +// unref is untyped for some reason +function hasUnref(stream: null | object): stream is { unref: () => void } { + return ( + stream != null && 'unref' in stream && typeof stream.unref === 'function' + ) +} diff --git a/test/fixtures/stdio.mjs b/test/fixtures/stdio.mjs new file mode 100644 index 0000000..96e4206 --- /dev/null +++ b/test/fixtures/stdio.mjs @@ -0,0 +1,4 @@ +export default function run() { + process.stdout.write('Worker message') + process.stderr.write('Worker error') +} diff --git a/test/worker-stdio.test.ts b/test/worker-stdio.test.ts new file mode 100644 index 0000000..2f7880d --- /dev/null +++ b/test/worker-stdio.test.ts @@ -0,0 +1,56 @@ +import * as path from 'node:path' +import { fileURLToPath } from 'node:url' +import { stripVTControlCharacters } from 'node:util' +import { Tinypool } from 'tinypool' + +const runtimes = ['worker_threads', 'child_process'] as const +const __dirname = path.dirname(fileURLToPath(import.meta.url)) + +test.each(runtimes)( + "worker's stdout and stderr are piped to main thread when { runtime: '%s' }", + async (runtime) => { + const pool = createPool({ + runtime, + minThreads: 1, + maxThreads: 1, + }) + + const getStdout = captureStandardStream('stdout') + const getStderr = captureStandardStream('stderr') + + await pool.run({}) + + const stdout = getStdout() + const stderr = getStderr() + + expect(stdout).toMatch('Worker message') + + expect(stderr).toMatch('Worker error') + } +) + +function createPool(options: Partial) { + const pool = new Tinypool({ + filename: path.resolve(__dirname, 'fixtures/stdio.mjs'), + minThreads: 1, + maxThreads: 1, + ...options, + }) + + return pool +} + +function captureStandardStream(type: 'stdout' | 'stderr') { + const spy = vi.fn() + + // eslint-disable-next-line @typescript-eslint/unbound-method + const original = process[type].write + process[type].write = spy + + return function collect() { + process[type].write = original + return stripVTControlCharacters( + spy.mock.calls.map((call) => call[0]).join('') + ) + } +}