Skip to content

Commit

Permalink
fix(child_process): pipe stdout and stderr to main thread (#104)
Browse files Browse the repository at this point in the history
  • Loading branch information
AriPerkkio authored Nov 14, 2024
1 parent 26ebae1 commit 331151c
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 0 deletions.
24 changes: 24 additions & 0 deletions src/runtime/process-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,19 @@ export default class ProcessWorker implements TinypoolWorker {
options.argv,
{
...options,
stdio: 'pipe',
env: {
...options.env,
TINYPOOL_WORKER_ID: options.workerData[0].workerId.toString(),
},
}
)

process.stdout.setMaxListeners(1 + process.stdout.getMaxListeners())
process.stderr.setMaxListeners(1 + process.stderr.getMaxListeners())
this.process.stdout?.pipe(process.stdout)
this.process.stderr?.pipe(process.stderr)

this.threadId = this.process.pid!

this.process.on('exit', this.onUnexpectedExit)
Expand All @@ -54,6 +61,8 @@ export default class ProcessWorker implements TinypoolWorker {
this.process.kill()
await this.waitForExit

this.process.stdout?.unpipe(process.stdout)
this.process.stderr?.unpipe(process.stderr)
this.port?.close()
clearTimeout(sigkillTimeout)
}
Expand Down Expand Up @@ -136,6 +145,21 @@ export default class ProcessWorker implements TinypoolWorker {
// This requires manual unreffing of its channel.
this.process.channel?.unref()

if (hasUnref(this.process.stdout)) {
this.process.stdout.unref()
}

if (hasUnref(this.process.stderr)) {
this.process.stderr.unref()
}

return this.process.unref()
}
}

// unref is untyped for some reason
function hasUnref(stream: null | object): stream is { unref: () => void } {
return (
stream != null && 'unref' in stream && typeof stream.unref === 'function'
)
}
4 changes: 4 additions & 0 deletions test/fixtures/stdio.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export default function run() {
process.stdout.write('Worker message')
process.stderr.write('Worker error')
}
56 changes: 56 additions & 0 deletions test/worker-stdio.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import * as path from 'node:path'
import { fileURLToPath } from 'node:url'
import { stripVTControlCharacters } from 'node:util'
import { Tinypool } from 'tinypool'

const runtimes = ['worker_threads', 'child_process'] as const
const __dirname = path.dirname(fileURLToPath(import.meta.url))

test.each(runtimes)(
"worker's stdout and stderr are piped to main thread when { runtime: '%s' }",
async (runtime) => {
const pool = createPool({
runtime,
minThreads: 1,
maxThreads: 1,
})

const getStdout = captureStandardStream('stdout')
const getStderr = captureStandardStream('stderr')

await pool.run({})

const stdout = getStdout()
const stderr = getStderr()

expect(stdout).toMatch('Worker message')

expect(stderr).toMatch('Worker error')
}
)

function createPool(options: Partial<Tinypool['options']>) {
const pool = new Tinypool({
filename: path.resolve(__dirname, 'fixtures/stdio.mjs'),
minThreads: 1,
maxThreads: 1,
...options,
})

return pool
}

function captureStandardStream(type: 'stdout' | 'stderr') {
const spy = vi.fn()

// eslint-disable-next-line @typescript-eslint/unbound-method
const original = process[type].write
process[type].write = spy

return function collect() {
process[type].write = original
return stripVTControlCharacters(
spy.mock.calls.map((call) => call[0]).join('')
)
}
}

0 comments on commit 331151c

Please sign in to comment.