1
0
mirror of https://github.com/spacebarchat/server.git synced 2024-11-11 05:02:37 +01:00

🐛 fix rabbit mq -> fanout instead of work queue

This commit is contained in:
Flam3rboy 2021-08-12 17:43:53 +02:00
parent 421cf4c3db
commit f4e49a34ad
2 changed files with 24 additions and 10 deletions

View File

@ -46,6 +46,17 @@ function getPipeline(this: WebSocket, guilds: string[], channels: string[] = [])
];
}
async function rabbitListen(this: WebSocket, id: string) {
await this.rabbitCh!.assertExchange(id, "fanout", { durable: false });
const q = await this.rabbitCh!.assertQueue("", { exclusive: true, autoDelete: true });
this.rabbitCh!.bindQueue(q.queue, id, "");
this.rabbitCh!.consume(q.queue, consume.bind(this), {
noAck: false,
});
this.rabbitCh!.queues[id] = q.queue;
}
// TODO: use already required guilds/channels of Identify and don't fetch them again
export async function setupListener(this: WebSocket) {
const user = await UserModel.findOne({ id: this.user_id }, { guilds: true }).exec();
@ -57,11 +68,14 @@ export async function setupListener(this: WebSocket) {
const guild_channels = channels.filter((x) => x.guild_id);
if (RabbitMQ.connection) {
// @ts-ignore
this.rabbitCh = await RabbitMQ.connection.createChannel();
this.rabbitCh!.assertQueue(this.user_id).then(() => this.rabbitCh!.consume(this.user_id, consume.bind(this)));
this.rabbitCh!.queues = {};
rabbitListen.call(this, this.user_id);
for (const channel of dm_channels) {
this.rabbitCh!.assertQueue(channel.id).then(() => this.rabbitCh!.consume(channel.id, consume.bind(this)));
rabbitListen.call(this, channel.id);
}
for (const guild of user.guilds) {
// contains guild and dm channels
@ -69,12 +83,10 @@ export async function setupListener(this: WebSocket) {
getPermission(this.user_id, guild)
.then((x) => {
this.permissions[guild] = x;
this.rabbitCh!.assertQueue(guild).then(() => this.rabbitCh!.consume(guild, consume.bind(this)));
rabbitListen.call(this, guild);
for (const channel of guild_channels) {
if (x.overwriteChannel(channel.permission_overwrites).has("VIEW_CHANNEL")) {
this.rabbitCh!.assertQueue(channel.id).then(() =>
this.rabbitCh!.consume(channel.id, consume.bind(this))
);
rabbitListen.call(this, channel.id);
}
}
})
@ -126,17 +138,19 @@ function consume(this: WebSocket, opts: ConsumeMessage | null) {
case "CHANNEL_CREATE":
// TODO: check if user has permission to channel
case "GUILD_CREATE":
this.rabbitCh!.assertQueue(id).then(() => this.rabbitCh!.consume(id, consume.bind(this)));
rabbitListen.call(this, id);
break;
case "CHANNEL_UPDATE":
const queue_id = this.rabbitCh.queues[id];
// @ts-ignore
const exists = this.rabbitCh.consumers[id];
if (permission.overwriteChannel(data.permission_overwrites).has("VIEW_CHANNEL")) {
if (exists) break;
this.rabbitCh!.assertQueue(id).then(() => this.rabbitCh!.consume(id, consume.bind(this)));
rabbitListen.call(this, id);
} else {
if (!exists) break;
this.rabbitCh.cancel(id);
this.rabbitCh.cancel(queue_id);
this.rabbitCh.unbindQueue(queue_id, id, "");
}
break;
}

View File

@ -15,7 +15,7 @@ interface WebSocket extends WS {
readyTimeout: NodeJS.Timeout;
intents: Intents;
sequence: number;
rabbitCh?: Channel;
rabbitCh?: Channel & { queues: Record<string, string> };
permissions: Record<string, Permissions>;
}