Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/handling emitters #34188

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
343 changes: 211 additions & 132 deletions app/authorization/server/startup.js

Large diffs are not rendered by default.

29 changes: 26 additions & 3 deletions app/authorization/server/streamer/permissions/emitter.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ import { Notifications } from '../../../../notifications/server';
import { CONSTANTS } from '../../../lib';
import Permissions from '../../../../models/server/models/Permissions';
import { clearCache } from '../../functions/hasPermission';
import { settings } from '/app/settings/server';
import { publishToRedis } from '/app/redis/redisPublisher';
import { redisMessageHandlers } from '/app/redis/handleRedisMessage';

Permissions.on('change', ({ clientAction, id, data, diff }) => {
const handlePermissions = (clientAction, id, data, diff) => {
if (diff && Object.keys(diff).length === 1 && diff._updatedAt) {
// avoid useless changes
return;
Expand All @@ -25,7 +28,7 @@ Permissions.on('change', ({ clientAction, id, data, diff }) => {
Notifications.notifyLoggedInThisInstance(
'permissions-changed',
clientAction,
data,
data
);

if (data.level && data.level === CONSTANTS.SETTINGS_LEVEL) {
Expand All @@ -39,4 +42,24 @@ Permissions.on('change', ({ clientAction, id, data, diff }) => {
setting,
);
}
});
};

const handlePermissionsRedis = (data) =>
handlePermissions(data.clientAction, data._id, data, data.diff);

if (settings.get('Use_Oplog_As_Real_Time')) {
Permissions.on('change', ({ clientAction, id, data, diff }) => {
handlePermissions(clientAction, id, data, diff);
});
} else {
Permissions.on('change', ({ clientAction, id, data, diff }) => {
data = data || Permissions.findOneById(id);
const newdata = {
...data,
ns: 'rocketchat_permissions',
clientAction,
};
publishToRedis(`all`, newdata);
});
}
redisMessageHandlers['rocketchat_permissions'] = handlePermissionsRedis;
12 changes: 12 additions & 0 deletions app/interfaces/redisMessage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { Timestamp } from "bson";

export interface redisMessage {
ts: Timestamp;
h: number;
op: "i" | "u" | "d" | "c" | "n";
ns: string;
_id: string;

[key: string]: any;
}

30 changes: 30 additions & 0 deletions app/lib/server/lib/loginStream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { Meteor } from 'meteor/meteor';

import { Users } from '/app/models/server';
import ChannelHandler from '/app/ws/channelHandler';

class LoginStream extends Meteor.Streamer {
_publish(publication, eventName, options) {
super._publish(publication, eventName, options);
const connectionId = publication?._session?.id;
console.log(publication?.id, publication?._session?.id);
publication.onStop(() => {
console.log('Publication onStop', connectionId);
ChannelHandler.unsubscribe(Meteor.userId(), connectionId);
});
}
}

const loginStream = new LoginStream('channel-subscriber');

loginStream.allowRead(function(eventName, extraData) {
const [userId, token] = eventName.split('/');
if (!userId || !token) { return false; }
const user = Users.findOneByIdAndLoginToken(userId, token);
if (!user) { return false; }

ChannelHandler.subscribe(userId, this.connection.id);
return true;
}); // TODO-Hi: think what do it in client if the sub wasn't completed

export default loginStream;
41 changes: 36 additions & 5 deletions app/lib/server/startup/userDataStream.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,47 @@
import { Users } from '../../../models/server';
import { Notifications } from '../../../notifications/server';
import { redisMessageHandlers } from '/app/redis/handleRedisMessage';
import { publishToRedis } from '/app/redis/redisPublisher';
import { settings } from '/app/settings/server';

const handleUsers = (clientAction, id, data, diff) => {
console.log('handling users changed');

Users.on('change', ({ clientAction, id, data, diff }) => {
switch (clientAction) {
case 'updated':
Notifications.notifyUserInThisInstance(id, 'userData', { diff, type: clientAction });
Notifications.notifyUserInThisInstance(id, 'userData', {
diff,
type: clientAction,
});
break;
case 'inserted':
Notifications.notifyUserInThisInstance(id, 'userData', { data, type: clientAction });
Notifications.notifyUserInThisInstance(id, 'userData', {
data,
type: clientAction,
});
break;
case 'removed':
Notifications.notifyUserInThisInstance(id, 'userData', { id, type: clientAction });
Notifications.notifyUserInThisInstance(id, 'userData', {
id,
type: clientAction,
});
break;
}
});
};
const redisHandleusers = (data) =>
handleUsers(data.clientAction, data._id, data, data.diff);
if (settings.get('Use_Oplog_As_Real_Time')) {
Users.on('change', ({ clientAction, id, data, diff }) => {
handleUsers(clientAction, data, id, diff);
});
} else {
Users.on('change', ({ clientAction, id, data, diff }) => {
const newdata = {
...data,
ns: 'users',
clientAction,
};
publishToRedis(`user-${id}`, newdata);
});
}
redisMessageHandlers['users'] = redisHandleusers;
5 changes: 5 additions & 0 deletions app/notifications/client/lib/Notifications.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class Notifications {
this.streamAll = new Meteor.Streamer('notify-all');
this.streamLogged = new Meteor.Streamer('notify-logged');
this.streamRoom = new Meteor.Streamer('notify-room');
this.streamBind = new Meteor.Streamer('channel-subscriber');
this.streamRoomUsers = new Meteor.Streamer('notify-room-users');
this.streamUser = new Meteor.Streamer('notify-user');
if (this.debug === true) {
Expand Down Expand Up @@ -66,6 +67,10 @@ class Notifications {
return this.onLogin(() => this.streamLogged.on(eventName, callback));
}

bindChannels(eventName, callback) {
return this.streamBind.on(eventName, callback);
}

onRoom(room, eventName, callback) {
if (this.debug === true) {
this.streamRoom.on(room, function() {
Expand Down
9 changes: 6 additions & 3 deletions app/notifications/server/lib/Notifications.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { Meteor } from 'meteor/meteor';
import { DDPCommon } from 'meteor/ddp-common';
import { Meteor } from 'meteor/meteor';

import { WEB_RTC_EVENTS } from '../../../webrtc';
import { Subscriptions, Rooms } from '../../../models/server';
import { Rooms, Subscriptions } from '../../../models/server';
import { settings } from '../../../settings/server';
import { WEB_RTC_EVENTS } from '../../../webrtc';

import { loginStream } from '/app/lib/server/lib/loginStream';

const changedPayload = function(collection, id, fields) {
return DDPCommon.stringifyDDP({
Expand Down Expand Up @@ -66,6 +68,7 @@ class Notifications {
this.streamAll = new Meteor.Streamer('notify-all');
this.streamLogged = new Meteor.Streamer('notify-logged');
this.streamRoom = new Meteor.Streamer('notify-room');
this.streamBind = loginStream;
this.streamRoomUsers = new Meteor.Streamer('notify-room-users');
this.streamUser = new RoomStreamer('notify-user');
this.streamAll.allowWrite('none');
Expand Down
11 changes: 11 additions & 0 deletions app/redis/handleRedisMessage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import redis from './redis';
import superjson from 'superjson';
export const redisMessageHandlers: object = {};

redis.on('message', (channel: string, msg: string) => {
console.log('new message from redis');

const message = superjson.parse(msg);
const ns = message.ns;
return redisMessageHandlers[ns](message);
});
36 changes: 36 additions & 0 deletions app/redis/redis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import Redis from 'ioredis';

import { settings } from '../settings/server';
import './settings';

// Meteor.startup(() => {
const redis = new Redis({
host: settings.get('Redis_url') as string, // Redis server hostname
port: 6379, // Redis server port
// password: 'your_password', // Redis server password (if any)
db: 0, // Redis database index
autoResubscribe: true,
maxRetriesPerRequest: 3,
});
console.log('Running redis startup');

redis.on('connect', () => {
console.log('Connected to Redis');
});

redis.on('error', (err) => {
console.error('Redis error', err);
});
// });

export const isRedisHealthy = async () => {
try {
return redis.ping();
} catch (err) {
console.error(err);

return false;
}
};

export default redis;
30 changes: 30 additions & 0 deletions app/redis/redisPublisher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import Redis from 'ioredis';

import { settings } from '../settings/server';
import './settings';
// import { stringify } from 'flatted';
import superjson from 'superjson';


const redis = new Redis({
host: settings.get('Redis_url') as string, // Redis server hostname
port: 6379, // Redis server port
// password: 'your_password', // Redis server password (if any)
db: 0, // Redis database index
autoResubscribe: true,
maxRetriesPerRequest: 3,
});
//vrfg
console.log('Running redis startup');

redis.on('connect', () => {
console.log('Connected to Redis');
});

redis.on('error', (err) => {
console.error('Redis error', err);
});

export const publishToRedis = (channel: string, message: object) => {
redis.publish(channel, superjson.stringify(message));
}
20 changes: 20 additions & 0 deletions app/redis/settings.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { Meteor } from 'meteor/meteor';

import { settings } from '../settings';

Meteor.startup(() => {
settings.addGroup('Real Time Architecture', function addSettings() {
this.add('Use_Oplog_As_Real_Time', true, {
type: 'boolean',
public: false,
i18nLabel: 'Should Use Default Oplog Observing or the new Architecure',
});
this.section('Redis', function() {
this.add('Redis_url', 'localhost', {
type: 'string',
public: false,
i18nLabel: 'Redis URL',
});
});
});
});
81 changes: 81 additions & 0 deletions app/ws/channelHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import { Mutex } from 'async-mutex';

import { Subscriptions } from '../models/server';
import redis from '../redis/redis';

const channelListeners: Map<string, number> = new Map();
const connectionToChannels: Map<string, string[]> = new Map(); // TODO-Hi: Change to a Set and check if the complexity if fine
const locks = new Map<string, Mutex>();

async function acquireLock(key: string): Promise<() => void> {
if (!locks.has(key)) {
locks.set(key, new Mutex());
}

const mutex = locks.get(key) as Mutex;

const release = await mutex.acquire(); // Acquire the lock and return the release function
return release;
}


const addToMap = (connectionId: string, channels: string[]): void => {
connectionToChannels.set(connectionId, (connectionToChannels.get(connectionId) || []).concat(channels));
channels.forEach(async (channel: string) => {
const release = await acquireLock(channel);
channelListeners.set(channel, (channelListeners.get(channel) || 0) + 1);
release();
});
};

const updateConnectionChannels = (connectionId: string): void => {
const connectionChannels = connectionToChannels.get(connectionId);
console.log('connectionChannels: ', connectionChannels);
connectionChannels?.forEach(async (channel: string) => {
const release = await acquireLock(channel);
try {
const listeners = channelListeners.get(channel) as number;
console.log('listeners ', listeners);
if (listeners === 1) {
console.log(`Unsubscribing to channel: ${ channel }`);
channelListeners.delete(channel);
redis.unsubscribe(channel);
} else {
channelListeners.set(channel, listeners - 1);
}
} finally {
release();
}
});
};

const subscribe = (userId: string, connectionId: string): void => {
console.log('Subscribing to ', userId);
const channels = Subscriptions.findByUserId(userId, { rid: 1 }).map(({ rid }: { rid: string }) => `room-${ rid }`);
channels.push(`user-${ userId }`);
channels.push('all');

channels.forEach((channel: string) => {
console.log('SUBSCRIBING TO ', channel);
redis.subscribe(channel); // TODO-Hi: What to do if the subsribe fails
});
addToMap(connectionId, channels);

// TODO: subscribe to all channels
};

// setInterval(() => {
// isRedisHealthy()
// }, 5000);

const unsubscribe = (userId: string, connectionId: string): void => {
console.log(`Unsubscribing connectionId: ${ connectionId }, userId: ${ userId }`);
updateConnectionChannels(connectionId);
connectionToChannels.delete(connectionId); // TODO-Hi: Maybe had debounce/something to handle user refreshes
};
// TODO-Hi: Add isalive
// TODO-Hi: Check in carosulle

const ChannelHandler = { subscribe, unsubscribe };

export default ChannelHandler;
1 change: 1 addition & 0 deletions imports/startup/client/index.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import '../../message-read-receipt/client';
import '../../personal-access-tokens/client';
import './listenActiveUsers';
import './listenToChannels';
13 changes: 13 additions & 0 deletions imports/startup/client/listenToChannels.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { Accounts } from 'meteor/accounts-base';
import { Meteor } from 'meteor/meteor';
import { Tracker } from 'meteor/tracker';

import { Notifications } from '../../../app/notifications/client';

Meteor.startup(() => {
Tracker.autorun(() => {
Notifications.bindChannels(`${ Meteor.userId() }/${ Accounts._storedLoginToken() }`, () => {
console.log('client Notifications.bindChannels');
});
});
});
Loading
Loading