diff --git a/src/listener/listener.ts b/src/listener/listener.ts index 692c12b6..6a6967d6 100644 --- a/src/listener/listener.ts +++ b/src/listener/listener.ts @@ -46,6 +46,17 @@ function getPipeline(this: WebSocket, guilds: string[], channels: string[] = []) ]; } +async function rabbitListen(this: WebSocket, id: string) { + await this.rabbitCh!.assertExchange(id, "fanout", { durable: false }); + const q = await this.rabbitCh!.assertQueue("", { exclusive: true, autoDelete: true }); + + this.rabbitCh!.bindQueue(q.queue, id, ""); + this.rabbitCh!.consume(q.queue, consume.bind(this), { + noAck: false, + }); + this.rabbitCh!.queues[id] = q.queue; +} + // TODO: use already required guilds/channels of Identify and don't fetch them again export async function setupListener(this: WebSocket) { const user = await UserModel.findOne({ id: this.user_id }, { guilds: true }).exec(); @@ -57,11 +68,14 @@ export async function setupListener(this: WebSocket) { const guild_channels = channels.filter((x) => x.guild_id); if (RabbitMQ.connection) { + // @ts-ignore this.rabbitCh = await RabbitMQ.connection.createChannel(); - this.rabbitCh!.assertQueue(this.user_id).then(() => this.rabbitCh!.consume(this.user_id, consume.bind(this))); + this.rabbitCh!.queues = {}; + + rabbitListen.call(this, this.user_id); for (const channel of dm_channels) { - this.rabbitCh!.assertQueue(channel.id).then(() => this.rabbitCh!.consume(channel.id, consume.bind(this))); + rabbitListen.call(this, channel.id); } for (const guild of user.guilds) { // contains guild and dm channels @@ -69,12 +83,10 @@ export async function setupListener(this: WebSocket) { getPermission(this.user_id, guild) .then((x) => { this.permissions[guild] = x; - this.rabbitCh!.assertQueue(guild).then(() => this.rabbitCh!.consume(guild, consume.bind(this))); + rabbitListen.call(this, guild); for (const channel of guild_channels) { if (x.overwriteChannel(channel.permission_overwrites).has("VIEW_CHANNEL")) { - this.rabbitCh!.assertQueue(channel.id).then(() => - this.rabbitCh!.consume(channel.id, consume.bind(this)) - ); + rabbitListen.call(this, channel.id); } } }) @@ -126,17 +138,19 @@ function consume(this: WebSocket, opts: ConsumeMessage | null) { case "CHANNEL_CREATE": // TODO: check if user has permission to channel case "GUILD_CREATE": - this.rabbitCh!.assertQueue(id).then(() => this.rabbitCh!.consume(id, consume.bind(this))); + rabbitListen.call(this, id); break; case "CHANNEL_UPDATE": + const queue_id = this.rabbitCh.queues[id]; // @ts-ignore const exists = this.rabbitCh.consumers[id]; if (permission.overwriteChannel(data.permission_overwrites).has("VIEW_CHANNEL")) { if (exists) break; - this.rabbitCh!.assertQueue(id).then(() => this.rabbitCh!.consume(id, consume.bind(this))); + rabbitListen.call(this, id); } else { if (!exists) break; - this.rabbitCh.cancel(id); + this.rabbitCh.cancel(queue_id); + this.rabbitCh.unbindQueue(queue_id, id, ""); } break; } diff --git a/src/util/WebSocket.ts b/src/util/WebSocket.ts index 11db47e0..1bd0ff2f 100644 --- a/src/util/WebSocket.ts +++ b/src/util/WebSocket.ts @@ -15,7 +15,7 @@ interface WebSocket extends WS { readyTimeout: NodeJS.Timeout; intents: Intents; sequence: number; - rabbitCh?: Channel; + rabbitCh?: Channel & { queues: Record }; permissions: Record; }