Node.jsにworkerが入った

Node.jsにworkerが入った

2018 / 06 / 26

目次
この記事は1年以上更新されていません
この記事は Hatena Blog からの移行記事です

[email protected]で入った worker の話です。

この記事は、 Roppongi.js #4 の登壇資料です。 5min で話しきれないので記事にまとめました。

実は、自分が Node.js に関わって、最初から最後(今現在)までずっと追っている珍しいモジュールです。

worker_threads とは?


worker: initial implementation by addaleax · Pull Request #20876 · nodejs/node

Hi everyone! 👋 This PR adds threading support for to Node.js. I realize that this is not exactly a small PR and is going to take a while to review, so: I appreciate comments, questions (any kind, a...

github.com
worker: initial implementation by addaleax · Pull Request #20876 · nodejs/node

実装著者は Anna (この PR は io.js 時代に petkaantonov が実装したのをベースに現環境へ移した)

worker_threads は独立したスレッドで動作する環境を構築し、それらの間にメッセージチャンネルを構築をする手段を提供します。 注意点として、Node.js の非同期は worker よりも効率的なため、I/O には使用しないほうが良いです。

目的

Node.js において、大量に負荷の高い処理することは苦手です。 なので、CPU 負荷の高い作業を別のスレッドに委ねて、負荷を分散させることが目的です。

child_process や cluster と違う部分

worker_threads の場合、ArrayBuffer のインスタンス間の転送をしたり、 それらの間で SharedArrayBuffer のインスタンスを共有をすることによりメモリを効率的に共有することが可能です。 child_process.fork()cluster.fork() と比べるとマルチスレッドに特化したモジュールということです。

会話する方法

child_process の IPC と異なります。 ブラウザ同様に、postMessage を使用し、会話します。 また、シリアライズされたデータはプロセスを離れる必要がないため全体的に通信に伴うオーバーヘッドが少なくなります。

// child_process
// parent
const { fork } = require("child_process");
const child = fork("child.js");
child.on("message", (message) => {
console.log("message", message);
child.send("from parent");
});
// child
process.on("message", (message) => {
console.log("message", message);
});
if (process.send) process.send("from child");

require("worker_threads");
const { port1, port2 } = new MessageChannel();
port1.on("message", (message) => console.log(message));
port2.postMessage("hi");
port2.on("message", (message) => console.log(message));
port1.postMessage("bye");

共有範囲

各々の woker は自身のイベントループを持ちますが、いくつかのリソースに関しては worker 間で共有されます。 (e.g. libuv のスレッドプール、V8 Isolate、V8 Environment)

これは現時点では実装されてないだけで、今後変わる可能性はあります。

またグローバルはスレッド間で共有されません。

API 制限

禁止
  • process.chdir() 及びグループまたはユーザー ID を設定するプロセスメソッド
  • process.abort()
  • domain
  • 親プロセスからの IPC チャンネルのアクセス
変更
  • process.env は読み取り専用
  • process.titleは変更不可
  • process.exit()は単一スレッドのみが処理対象
  • process.stdin, process.stdout, process.stderr は null
  • シグナルは行われない(process.on)

歴史

実は、worker とは Ayo.js から来ており、 2017/10/23 の時点で初期の実装は完了していました。

Page Not Found

blog.hiroppy.me

上記の記事と大幅な変化は無いため、ある程度説明は省きます。

また、当時はworker という名前で決定するつもりでしたが、ユーザーランドのライブラリ名と衝突してしまい、最終的(?)にworker_threadsという名前になりました。

npm module name / Node.js built-in `worker` module · Issue #1 · blake-regalia/worker.js

Hi there! I’m currently working on built-in Worker support (as in, threaded workers rather than cluster/child process) in Node.js, and there is only little left to do for that. (It’s basically just...

github.com
npm module name / Node.js built-in `worker` module · Issue #1 · blake-regalia/worker.js

Node.jsのビルトインモジュールに名前空間が使われるかもしれない - hiroppy's site

Node.jsにビルトイン専用の名前空間が提案されている話

blog.hiroppy.me
Node.jsのビルトインモジュールに名前空間が使われるかもしれない - hiroppy's site

このように、長い期間や様々な問題がありながらもようやく入った珍しいモジュールです。

使い方

現時点では実験段階(stability:1 )なので、起動時に --experimental-worker が必要です。

const {
threadId, isMainThread,
Worker, workerData, parentPort
MessageChannel, MessagePort
} = require('worker_threads');

長くなるので、変数、メソッド、クラス等の説明は以下を参照してください。

Page Not Found

blog.hiroppy.me

複数の worker を動かす

const { Worker, isMainThread, workerData } = require("worker_threads");
let current = 0;
function counter(title, cnt) {
console.log(`| ${title} |: ${cnt}`);
}
if (isMainThread) {
console.log("Main Thread");
// 4つ作成する
for (let i = 0; i < 4; i++) {
// 相対パスはダメ
// この場合(__filename)はworkerも同じこのファイルを参照する
// 第二引数はグローバルパラメーター
new Worker(__filename, { workerData: i });
}
setInterval(
(title) => {
counter(title, ++current);
},
1000,
"MainThread",
);
} else {
console.log(`worker: ${workerData}`);
setInterval(
(title) => {
counter(title, ++current);
},
1000,
`worker: ${workerData}`,
);
}

worker: 1 worker: 2 worker: 3 | MainThread |: 1 | worker: 0 |: 1 | worker: 1 |:
1 | worker: 2 |: 1 | worker: 3 |: 1 | MainThread |: 2 | worker: 0 |: 2 | worker:
1 |: 2 | worker: 2 |: 2 | worker: 3 |: 2 | MainThread |: 3 | worker: 0 |: 3 |
worker: 1 |: 3 | worker: 2 |: 3 | worker: 3 |: 3 ```
### スレッド間でメッセージングを行う
```javascript
// parent.js
// main thread
const { resolve } = require("path");
const { Worker, workerData } = require("worker_threads");
console.log("| Main Thread |");
function createWorker(path, cb) {
const worker = new Worker(path, { workerData: null });
worker.on("message", (msg) => {
cb(null, msg);
});
worker.on("error", cb);
worker.on("exit", (code) => {
if (code !== 0) throw new Error("worker stopped");
console.log("| Main Thread | worker stopped");
});
return worker;
}
const w = createWorker(resolve("child.js"), (err, res) => {
// workerから結果を受け取る
if (err) {
console.error(err);
process.exit(1);
}
console.log(`| Main Thread | execution time: ${res}ms from Worker Thread`);
// workerを停止させるためにメッセージを送る
w.postMessage("thx;)");
});

const { PerformanceObserver, performance } = require("perf_hooks");
const { parentPort } = require("worker_threads");
console.log("| Worker |");
const obs = new PerformanceObserver((items) => {
const time = items.getEntries()[0].duration;
// main threadへ処理時間の結果を返す
parentPort.postMessage(time);
performance.clearMarks();
});
obs.observe({ entryTypes: ["measure"] });
const len = 64 _ 1024 _ 1024;
const b = Buffer.allocUnsafe(len);
let s = "";
// 重い処理なためこの箇所の実行時間を計測する
performance.mark("A");
for (let i = 0; i < 256; ++i) s += String.fromCharCode(i);
for (let i = 0; i < 64 _ 1024 _ 1024; i += 256) b.write(s, i, 256, "ascii");
for (let i = 0; i < 32; ++i) b.toString("base64");
performance.mark("B");
performance.measure("A to B", "A", "B");
// main threadから返答が来たらこのworkerを終了する
parentPort.on("message", (msg) => {
console.log(`| Worker | ${msg} from Main Thread`);
process.exit();
});

# output
$ node --experimental-worker parent.js
| Main Thread |
| Worker |
| Main Thread | execution time: 3128.782205ms from Worker Thread
| Worker | thx;) from Main Thread
| Main Thread | worker stopped

パフォーマンス

$ ./node benchmark/cluster/echo.js
cluster/echo.js n=100000 sendsPerBroadcast=1 payload="string" workers=1: 33,647.30473442063
cluster/echo.js n=100000 sendsPerBroadcast=10 payload="string" workers=1: 12,927.907405288383
cluster/echo.js n=100000 sendsPerBroadcast=1 payload="object" workers=1: 28,496.37373941151
cluster/echo.js n=100000 sendsPerBroadcast=10 payload="object" workers=1: 8,975.53747186485
$ ./node --experimental-worker benchmark/worker/echo.js
worker/echo.js n=100000 sendsPerBroadcast=1 payload="string" workers=1: 88,044.32902365089
worker/echo.js n=100000 sendsPerBroadcast=10 payload="string" workers=1: 39,873.33697018837
worker/echo.js n=100000 sendsPerBroadcast=1 payload="object" workers=1: 64,451.29132425621
worker/echo.js n=100000 sendsPerBroadcast=10 payload="object" workers=1: 22,325.635443739284

まだオーバーヘッドが大きく、パフォーマンスには改善の余地があります。

まだできないこと

  • ネイティブアドオンを worker からロードする
  • inspector のサポート
  • handle(ポインタを示す)はネットワークソケット同様に転送ができない
  • File クラスや Blob クラスや File API などのサポート(対応するかは未定)
  • …等々

また具体的な計画が出てないため、今後に期待。

さいごに

まだ入ったばかりでこれからどんどんおもしろくなるモジュールです! パフォーマンスのチューニングには欠かせないモジュールなので、stability2 に入ると今後使う機会が増えると思います。


前後の記事

関連する記事