connection-manager.js 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. "use strict";
  2. const { Pool, TimeoutError } = require("sequelize-pool");
  3. const _ = require("lodash");
  4. const semver = require("semver");
  5. const errors = require("../../errors");
  6. const { logger } = require("../../utils/logger");
  7. const deprecations = require("../../utils/deprecations");
  8. const debug = logger.debugContext("pool");
  9. class ConnectionManager {
  10. constructor(dialect, sequelize) {
  11. const config = _.cloneDeep(sequelize.config);
  12. this.sequelize = sequelize;
  13. this.config = config;
  14. this.dialect = dialect;
  15. this.versionPromise = null;
  16. this.dialectName = this.sequelize.options.dialect;
  17. if (config.pool === false) {
  18. throw new Error("Support for pool:false was removed in v4.0");
  19. }
  20. config.pool = _.defaults(config.pool || {}, {
  21. max: 5,
  22. min: 0,
  23. idle: 1e4,
  24. acquire: 6e4,
  25. evict: 1e3,
  26. validate: this._validate.bind(this)
  27. });
  28. this.initPools();
  29. }
  30. refreshTypeParser(dataTypes) {
  31. _.each(dataTypes, (dataType) => {
  32. if (Object.prototype.hasOwnProperty.call(dataType, "parse")) {
  33. if (dataType.types[this.dialectName]) {
  34. this._refreshTypeParser(dataType);
  35. } else {
  36. throw new Error(`Parse function not supported for type ${dataType.key} in dialect ${this.dialectName}`);
  37. }
  38. }
  39. });
  40. }
  41. _loadDialectModule(moduleName) {
  42. try {
  43. if (this.sequelize.config.dialectModulePath) {
  44. return require(this.sequelize.config.dialectModulePath);
  45. }
  46. if (this.sequelize.config.dialectModule) {
  47. return this.sequelize.config.dialectModule;
  48. }
  49. return require(moduleName);
  50. } catch (err) {
  51. if (err.code === "MODULE_NOT_FOUND") {
  52. if (this.sequelize.config.dialectModulePath) {
  53. throw new Error(`Unable to find dialect at ${this.sequelize.config.dialectModulePath}`);
  54. }
  55. throw new Error(`Please install ${moduleName} package manually`);
  56. }
  57. throw err;
  58. }
  59. }
  60. async _onProcessExit() {
  61. if (!this.pool) {
  62. return;
  63. }
  64. await this.pool.drain();
  65. debug("connection drain due to process exit");
  66. return await this.pool.destroyAllNow();
  67. }
  68. async close() {
  69. this.getConnection = async function getConnection() {
  70. throw new Error("ConnectionManager.getConnection was called after the connection manager was closed!");
  71. };
  72. return await this._onProcessExit();
  73. }
  74. initPools() {
  75. const config = this.config;
  76. if (!config.replication) {
  77. this.pool = new Pool({
  78. name: "sequelize",
  79. create: () => this._connect(config),
  80. destroy: async (connection) => {
  81. const result = await this._disconnect(connection);
  82. debug("connection destroy");
  83. return result;
  84. },
  85. validate: config.pool.validate,
  86. max: config.pool.max,
  87. min: config.pool.min,
  88. acquireTimeoutMillis: config.pool.acquire,
  89. idleTimeoutMillis: config.pool.idle,
  90. reapIntervalMillis: config.pool.evict,
  91. maxUses: config.pool.maxUses
  92. });
  93. debug(`pool created with max/min: ${config.pool.max}/${config.pool.min}, no replication`);
  94. return;
  95. }
  96. if (!Array.isArray(config.replication.read)) {
  97. config.replication.read = [config.replication.read];
  98. }
  99. config.replication.write = _.defaults(config.replication.write, _.omit(config, "replication"));
  100. config.replication.read = config.replication.read.map((readConfig) => _.defaults(readConfig, _.omit(this.config, "replication")));
  101. let reads = 0;
  102. this.pool = {
  103. release: (client) => {
  104. if (client.queryType === "read") {
  105. this.pool.read.release(client);
  106. } else {
  107. this.pool.write.release(client);
  108. }
  109. },
  110. acquire: (queryType, useMaster) => {
  111. useMaster = useMaster === void 0 ? false : useMaster;
  112. if (queryType === "SELECT" && !useMaster) {
  113. return this.pool.read.acquire();
  114. }
  115. return this.pool.write.acquire();
  116. },
  117. destroy: (connection) => {
  118. this.pool[connection.queryType].destroy(connection);
  119. debug("connection destroy");
  120. },
  121. destroyAllNow: async () => {
  122. await Promise.all([
  123. this.pool.read.destroyAllNow(),
  124. this.pool.write.destroyAllNow()
  125. ]);
  126. debug("all connections destroyed");
  127. },
  128. drain: async () => Promise.all([
  129. this.pool.write.drain(),
  130. this.pool.read.drain()
  131. ]),
  132. read: new Pool({
  133. name: "sequelize:read",
  134. create: async () => {
  135. const nextRead = reads++ % config.replication.read.length;
  136. const connection = await this._connect(config.replication.read[nextRead]);
  137. connection.queryType = "read";
  138. return connection;
  139. },
  140. destroy: (connection) => this._disconnect(connection),
  141. validate: config.pool.validate,
  142. max: config.pool.max,
  143. min: config.pool.min,
  144. acquireTimeoutMillis: config.pool.acquire,
  145. idleTimeoutMillis: config.pool.idle,
  146. reapIntervalMillis: config.pool.evict,
  147. maxUses: config.pool.maxUses
  148. }),
  149. write: new Pool({
  150. name: "sequelize:write",
  151. create: async () => {
  152. const connection = await this._connect(config.replication.write);
  153. connection.queryType = "write";
  154. return connection;
  155. },
  156. destroy: (connection) => this._disconnect(connection),
  157. validate: config.pool.validate,
  158. max: config.pool.max,
  159. min: config.pool.min,
  160. acquireTimeoutMillis: config.pool.acquire,
  161. idleTimeoutMillis: config.pool.idle,
  162. reapIntervalMillis: config.pool.evict,
  163. maxUses: config.pool.maxUses
  164. })
  165. };
  166. debug(`pool created with max/min: ${config.pool.max}/${config.pool.min}, with replication`);
  167. }
  168. async getConnection(options) {
  169. options = options || {};
  170. if (this.sequelize.options.databaseVersion === 0) {
  171. if (!this.versionPromise) {
  172. this.versionPromise = (async () => {
  173. try {
  174. const connection = await this._connect(this.config.replication.write || this.config);
  175. const _options = {};
  176. _options.transaction = { connection };
  177. _options.logging = () => {
  178. };
  179. _options.logging.__testLoggingFn = true;
  180. if (this.sequelize.options.databaseVersion === 0) {
  181. const version = await this.sequelize.databaseVersion(_options);
  182. const parsedVersion = _.get(semver.coerce(version), "version") || version;
  183. this.sequelize.options.databaseVersion = semver.valid(parsedVersion) ? parsedVersion : this.dialect.defaultVersion;
  184. }
  185. if (semver.lt(this.sequelize.options.databaseVersion, this.dialect.defaultVersion)) {
  186. deprecations.unsupportedEngine();
  187. debug(`Unsupported database engine version ${this.sequelize.options.databaseVersion}`);
  188. }
  189. this.versionPromise = null;
  190. return await this._disconnect(connection);
  191. } catch (err) {
  192. this.versionPromise = null;
  193. throw err;
  194. }
  195. })();
  196. }
  197. await this.versionPromise;
  198. }
  199. let result;
  200. try {
  201. await this.sequelize.runHooks("beforePoolAcquire", options);
  202. result = await this.pool.acquire(options.type, options.useMaster);
  203. await this.sequelize.runHooks("afterPoolAcquire", result, options);
  204. } catch (error) {
  205. if (error instanceof TimeoutError)
  206. throw new errors.ConnectionAcquireTimeoutError(error);
  207. throw error;
  208. }
  209. debug("connection acquired");
  210. return result;
  211. }
  212. releaseConnection(connection) {
  213. this.pool.release(connection);
  214. debug("connection released");
  215. }
  216. async destroyConnection(connection) {
  217. await this.pool.destroy(connection);
  218. debug(`connection ${connection.uuid} destroyed`);
  219. }
  220. async _connect(config) {
  221. await this.sequelize.runHooks("beforeConnect", config);
  222. const connection = await this.dialect.connectionManager.connect(config);
  223. await this.sequelize.runHooks("afterConnect", connection, config);
  224. return connection;
  225. }
  226. async _disconnect(connection) {
  227. await this.sequelize.runHooks("beforeDisconnect", connection);
  228. await this.dialect.connectionManager.disconnect(connection);
  229. return this.sequelize.runHooks("afterDisconnect", connection);
  230. }
  231. _validate(connection) {
  232. if (!this.dialect.connectionManager.validate) {
  233. return true;
  234. }
  235. return this.dialect.connectionManager.validate(connection);
  236. }
  237. }
  238. module.exports = ConnectionManager;
  239. module.exports.ConnectionManager = ConnectionManager;
  240. module.exports.default = ConnectionManager;
  241. //# sourceMappingURL=connection-manager.js.map