Skip to content

Commit a73c91a

Browse files
Add new hooks in the replication write (pubkey#4754)
* add new hooks in the replication write * Fix build not passing * Fix code style
1 parent 73a69eb commit a73c91a

File tree

3 files changed

+48
-2
lines changed

3 files changed

+48
-2
lines changed

src/hooks.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,21 @@ export const HOOKS: { [k: string]: any[]; } = {
7979
* runs after a database has been removed
8080
* @async
8181
*/
82-
postRemoveRxDatabase: []
82+
postRemoveRxDatabase: [],
83+
84+
/**
85+
* runs before the replication writes the rows to master
86+
* but before the rows have been modified
87+
* @async
88+
*/
89+
preReplicationMasterWrite: [],
90+
91+
/**
92+
* runs after the replication has been sent to the server
93+
* but before the new documents have been handled
94+
* @async
95+
*/
96+
preReplicationMasterWriteDocumentsHandle: [],
8397
};
8498

8599
export function runPluginHooks(hookKey: string, obj: any) {

src/plugins/replication/index.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ import {
5959
import { addRxPlugin } from '../../plugin';
6060
import { hasEncryption } from '../../rx-storage-helper';
6161
import { overwritable } from '../../overwritable';
62+
import {
63+
runAsyncPluginHooks
64+
} from '../../hooks';
6265

6366

6467
export const REPLICATION_STATE_BY_COLLECTION: WeakMap<RxCollection, RxReplicationState<any, any>[]> = new WeakMap();
@@ -240,6 +243,12 @@ export class RxReplicationState<RxDocType, CheckpointType> {
240243
return [];
241244
}
242245
let done = false;
246+
247+
await runAsyncPluginHooks('preReplicationMasterWrite', {
248+
rows,
249+
collection: this.collection
250+
});
251+
243252
const useRows = await Promise.all(
244253
rows.map(async (row) => {
245254
row.newDocumentState = await pushModifier(row.newDocumentState);
@@ -257,6 +266,13 @@ export class RxReplicationState<RxDocType, CheckpointType> {
257266
);
258267

259268
let result: WithDeleted<RxDocType>[] = null as any;
269+
270+
// In case all the rows have been filtered and nothing has to be sent
271+
if (useRows.length === 0) {
272+
done = true;
273+
result = [];
274+
}
275+
260276
while (!done && !this.isStopped()) {
261277
try {
262278
result = await this.push.handler(useRows);
@@ -290,6 +306,12 @@ export class RxReplicationState<RxDocType, CheckpointType> {
290306
if (this.isStopped()) {
291307
return [];
292308
}
309+
310+
await runAsyncPluginHooks('preReplicationMasterWriteDocumentsHandle', {
311+
result,
312+
collection: this.collection
313+
});
314+
293315
const conflicts = handlePulledDocuments(this.collection, this.deletedField, ensureNotFalsy(result));
294316
return conflicts;
295317
}

src/types/rx-plugin.d.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ import type {
1616
RxDatabase,
1717
RxDatabaseCreator,
1818
RxDocument,
19-
RxStorage
19+
RxStorage,
20+
RxReplicationWriteToMasterRow,
21+
WithDeleted
2022
} from '../types';
2123
import type { RxSchema } from '../rx-schema';
2224

@@ -128,5 +130,13 @@ export interface RxPlugin {
128130
preCreateRxStorageInstance?: RxPluginHooks<RxStorageInstanceCreationParams<any, any>>;
129131
preMigrateDocument?: RxPluginHooks<any>;
130132
postMigrateDocument?: RxPluginHooks<any>;
133+
preReplicationMasterWrite?: RxPluginHooks<{
134+
rows: RxReplicationWriteToMasterRow<any>[];
135+
collection: RxCollection;
136+
}>;
137+
preReplicationMasterWriteDocumentsHandle?: RxPluginHooks<{
138+
result: WithDeleted<any>[];
139+
collection: RxCollection;
140+
}>;
131141
};
132142
}

0 commit comments

Comments
 (0)