-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.js
119 lines (99 loc) · 4.13 KB
/
server.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import { Meteor } from 'meteor/meteor';
import { Mongo, MongoInternals } from 'meteor/mongo';
import { DDP } from 'meteor/ddp-client';
const currentSession = new Meteor.EnvironmentVariable();
const { client } = MongoInternals.defaultRemoteCollectionDriver().mongo;
const RawCollection = MongoInternals.NpmModules.mongodb.module.Collection;
const replaceMethods = ['replaceOne', 'findOneAndReplace'];
const hasOperator = obj => Object.keys(obj).some(k => k.includes('$'));
const isUpdateOrReplace = (methodName, args) => args.length === 2 && (replaceMethods.includes(methodName) || hasOperator(args[1]));
function wrapWithSession(methodName, args) {
const session = currentSession.get();
if (!session || session?.hasEnded) {
return args;
}
let options;
let callback; // 2.X support, eventually can remove. callbacks are not supported but this is to enable 2.X support with *Async as people are migrating to 3.X. for some reason updateAsync was still expecting a callback in 2.X, might be a Meteor bug or maybe it's by design, either way it works as expected in 3.X so not sure it's worth addressing.
if (args.length > 1) {
if (typeof args[args.length - 1] === 'function') { // 2.X support, eventually can remove.
callback = args.pop();
}
if (!isUpdateOrReplace(methodName, args)) {
options = args.pop();
}
}
const finalOptions = { ...options, session };
const finalArgs = args.length ? [...args, finalOptions] : [{}, finalOptions];
if (callback) finalArgs.push(callback); // 2.X support, eventually can remove.
return finalArgs;
}
function getMethodNames(obj) {
let methodNames = [];
const descriptors = Object.getOwnPropertyDescriptors(obj.prototype);
for (const prop in descriptors) {
const value = descriptors[prop].value
if (prop !== 'constructor' && typeof value === 'function') {
methodNames.push(prop);
}
}
return methodNames;
}
// patch rawCollection methods to add session for Transactions
getMethodNames(RawCollection).forEach(methodName => {
const originalMethod = RawCollection.prototype[methodName];
RawCollection.prototype[methodName] = function(...args) {
return originalMethod.call(this, ...wrapWithSession(methodName, args))
}
});
/**
* Checks whether the current session is in a Transaction.
*
* @function
* @returns {boolean} Returns `true` if the current session is in a Transaction, `false` otherwise.
*/
export const inTransaction = () => {
const session = currentSession.get();
return session?.inTransaction() ?? false;
};
/**
* Executes a function within a MongoDB Transaction, providing error handling and optional retry functionality.
*
* @async
* @function
* @template T
* @param {() => Promise<T>} fn - The function to be executed within the Transaction.
* @param {boolean} [options.autoRetry=true] - If true, uses the Mongo Transactions Callback API for automatic retry on certain errors (refer to Mongo Docs); otherwise, uses the Core API.
* @param {...any} [options] - Options specific to MongoDB Transactions (writeConcern, readConcern, etc). See the Mongo Docs for more details.
* @returns {Promise<T>} - A promise resolving to the result of the provided function.
* @throws {Error} - Throws an error if the Transaction encounters an issue and cannot be committed.
*/
export const withTransaction = async(fn, { autoRetry = true, ...options } = {}) => {
const txnOptions = { readPreference: 'primary', ...options };
const session = client.startSession();
return await currentSession.withValue(session, async function () {
try {
let result;
const txnFn = async () => {
result = await fn();
};
if (autoRetry) {
await session.withTransaction(txnFn, txnOptions);
} else {
try {
session.startTransaction(txnOptions);
await txnFn();
await session.commitTransaction();
} catch(error) {
await session.abortTransaction();
throw error;
}
}
return result;
} finally {
await session.endSession();
currentSession._set(undefined);
}
});
};
Mongo.withTransaction = withTransaction;
Mongo.inTransaction = inTransaction;