-
-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
connection-manager.js
151 lines (132 loc) · 4.73 KB
/
connection-manager.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
'use strict';
const { ConnectionManager } = require('../abstract/connection-manager');
const SequelizeErrors = require('../../errors');
const { logger } = require('../../utils/logger');
const DataTypes = require('../../data-types').mysql;
const dayjs = require('dayjs');
const debug = logger.debugContext('connection:mysql');
const parserStore = require('../parserStore')('mysql');
const { promisify } = require('util');
/**
* MySQL Connection Manager
*
* Get connections, validate and disconnect them.
* AbstractConnectionManager pooling use it to handle MySQL specific connections
* Use https://github.com/sidorares/node-mysql2 to connect with MySQL server
*
* @private
*/
export class MySqlConnectionManager extends ConnectionManager {
constructor(dialect, sequelize) {
sequelize.config.port = sequelize.config.port || 3306;
super(dialect, sequelize);
this.lib = this._loadDialectModule('mysql2');
this.refreshTypeParser(DataTypes);
}
_refreshTypeParser(dataType) {
parserStore.refresh(dataType);
}
_clearTypeParser() {
parserStore.clear();
}
static _typecast(field, next) {
if (parserStore.get(field.type)) {
return parserStore.get(field.type)(field, this.sequelize.options, next);
}
return next();
}
/**
* Connect with MySQL database based on config, Handle any errors in connection
* Set the pool handlers on connection.error
* Also set proper timezone once connection is connected.
*
* @param {object} config
* @returns {Promise<Connection>}
* @private
*/
async connect(config) {
const connectionConfig = {
host: config.host,
port: config.port,
user: config.username,
flags: '-FOUND_ROWS',
password: config.password,
database: config.database,
timezone: this.sequelize.options.timezone,
typeCast: MySqlConnectionManager._typecast.bind(this),
bigNumberStrings: false,
supportBigNumbers: true,
...config.dialectOptions,
};
try {
const connection = await new Promise((resolve, reject) => {
const connection = this.lib.createConnection(connectionConfig);
const errorHandler = e => {
// clean up connect & error event if there is error
connection.removeListener('connect', connectHandler);
connection.removeListener('error', connectHandler);
reject(e);
};
const connectHandler = () => {
// clean up error event if connected
connection.removeListener('error', errorHandler);
resolve(connection);
};
// don't use connection.once for error event handling here
// mysql2 emit error two times in case handshake was failed
// first error is protocol_lost and second is timeout
// if we will use `once.error` node process will crash on 2nd error emit
connection.on('error', errorHandler);
connection.once('connect', connectHandler);
});
debug('connection acquired');
connection.on('error', error => {
switch (error.code) {
case 'ESOCKET':
case 'ECONNRESET':
case 'EPIPE':
case 'PROTOCOL_CONNECTION_LOST':
this.pool.destroy(connection);
}
});
if (!this.sequelize.config.keepDefaultTimezone) {
// set timezone for this connection
// but named timezone are not directly supported in mysql, so get its offset first
let tzOffset = this.sequelize.options.timezone;
tzOffset = /\//.test(tzOffset) ? dayjs.tz(undefined, tzOffset).format('Z') : tzOffset;
await promisify(cb => connection.query(`SET time_zone = '${tzOffset}'`, cb))();
}
return connection;
} catch (error) {
switch (error.code) {
case 'ECONNREFUSED':
throw new SequelizeErrors.ConnectionRefusedError(error);
case 'ER_ACCESS_DENIED_ERROR':
throw new SequelizeErrors.AccessDeniedError(error);
case 'ENOTFOUND':
throw new SequelizeErrors.HostNotFoundError(error);
case 'EHOSTUNREACH':
throw new SequelizeErrors.HostNotReachableError(error);
case 'EINVAL':
throw new SequelizeErrors.InvalidConnectionError(error);
default:
throw new SequelizeErrors.ConnectionError(error);
}
}
}
async disconnect(connection) {
// Don't disconnect connections with CLOSED state
if (connection._closing) {
debug('connection tried to disconnect but was already at CLOSED state');
return;
}
return await promisify(callback => connection.end(callback))();
}
validate(connection) {
return connection
&& !connection._fatalError
&& !connection._protocolError
&& !connection._closing
&& !connection.stream.destroyed;
}
}