/
multiprocess.js
190 lines (168 loc) · 5.22 KB
/
multiprocess.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
const debug = require('debug')('log4js:multiprocess');
const net = require('net');
const LoggingEvent = require('../LoggingEvent');
const END_MSG = '__LOG4JS__';
/**
* Creates a server, listening on config.loggerPort, config.loggerHost.
* Output goes to config.actualAppender (config.appender is used to
* set up that appender).
*/
function logServer(config, actualAppender, levels) {
/**
* Takes a utf-8 string, returns an object with
* the correct log properties.
*/
function deserializeLoggingEvent(clientSocket, msg) {
debug('(master) deserialising log event');
const loggingEvent = LoggingEvent.deserialise(msg);
loggingEvent.remoteAddress = clientSocket.remoteAddress;
loggingEvent.remotePort = clientSocket.remotePort;
return loggingEvent;
}
const server = net.createServer((clientSocket) => {
debug('(master) connection received');
clientSocket.setEncoding('utf8');
let logMessage = '';
function logTheMessage(msg) {
debug('(master) deserialising log event and sending to actual appender');
actualAppender(deserializeLoggingEvent(clientSocket, msg));
}
function chunkReceived(chunk) {
debug('(master) chunk of data received');
let event;
logMessage += chunk || '';
if (logMessage.indexOf(END_MSG) > -1) {
event = logMessage.slice(0, logMessage.indexOf(END_MSG));
logTheMessage(event);
logMessage = logMessage.slice(event.length + END_MSG.length) || '';
// check for more, maybe it was a big chunk
chunkReceived();
}
}
function handleError(error) {
const loggingEvent = {
startTime: new Date(),
categoryName: 'log4js',
level: levels.ERROR,
data: ['A worker log process hung up unexpectedly', error],
remoteAddress: clientSocket.remoteAddress,
remotePort: clientSocket.remotePort,
};
actualAppender(loggingEvent);
}
clientSocket.on('data', chunkReceived);
clientSocket.on('end', chunkReceived);
clientSocket.on('error', handleError);
});
server.listen(
config.loggerPort || 5000,
config.loggerHost || 'localhost',
(e) => {
debug('(master) master server listening, error was ', e);
// allow the process to exit, if this is the only socket active
server.unref();
}
);
function app(event) {
debug('(master) log event sent directly to actual appender (local event)');
return actualAppender(event);
}
app.shutdown = function(cb) {
debug('(master) master shutdown called, closing server');
server.close(cb);
};
return app;
}
function workerAppender(config) {
let canWrite = false;
const buffer = [];
let socket;
let shutdownAttempts = 3;
function write(loggingEvent) {
debug('(worker) Writing log event to socket');
socket.write(loggingEvent.serialise(), 'utf8');
socket.write(END_MSG, 'utf8');
}
function emptyBuffer() {
let evt;
debug('(worker) emptying worker buffer');
while ((evt = buffer.shift())) {
write(evt);
}
}
function createSocket() {
debug(
`(worker) worker appender creating socket to ${config.loggerHost ||
'localhost'}:${config.loggerPort || 5000}`
);
socket = net.createConnection(
config.loggerPort || 5000,
config.loggerHost || 'localhost'
);
socket.on('connect', () => {
debug('(worker) worker socket connected');
emptyBuffer();
canWrite = true;
});
socket.on('timeout', socket.end.bind(socket));
socket.on('error', (e) => {
debug('connection error', e);
canWrite = false;
emptyBuffer();
});
socket.on('close', createSocket);
}
createSocket();
function log(loggingEvent) {
if (canWrite) {
write(loggingEvent);
} else {
debug(
'(worker) worker buffering log event because it cannot write at the moment'
);
buffer.push(loggingEvent);
}
}
log.shutdown = function(cb) {
debug('(worker) worker shutdown called');
if (buffer.length && shutdownAttempts) {
debug('(worker) worker buffer has items, waiting 100ms to empty');
shutdownAttempts -= 1;
setTimeout(() => {
log.shutdown(cb);
}, 100);
} else {
socket.removeAllListeners('close');
socket.end(cb);
}
};
return log;
}
function createAppender(config, appender, levels) {
if (config.mode === 'master') {
debug('Creating master appender');
return logServer(config, appender, levels);
}
debug('Creating worker appender');
return workerAppender(config);
}
function configure(config, layouts, findAppender, levels) {
let appender;
debug(`configure with mode = ${config.mode}`);
if (config.mode === 'master') {
if (!config.appender) {
debug(`no appender found in config ${config}`);
throw new Error('multiprocess master must have an "appender" defined');
}
debug(`actual appender is ${config.appender}`);
appender = findAppender(config.appender);
if (!appender) {
debug(`actual appender "${config.appender}" not found`);
throw new Error(
`multiprocess master appender "${config.appender}" not defined`
);
}
}
return createAppender(config, appender, levels);
}
module.exports.configure = configure;