From 48cc31a59f3b069eb5869fde7954bff38e99d764 Mon Sep 17 00:00:00 2001 From: BlackDex Date: Wed, 12 Apr 2023 15:59:05 +0200 Subject: [PATCH] Small update to Rocket WebSockets Switched from channels to stream. This is able to use yield, and the code looks a bit nicer this way. Also updated all the crates. --- Cargo.lock | 46 ++++++++++++++++++++-------------------- Cargo.toml | 4 ++-- src/api/notifications.rs | 26 +++++++++++------------ 3 files changed, 37 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 01e0309f..39f9f084 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -199,7 +199,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.13", + "syn 2.0.14", ] [[package]] @@ -216,7 +216,7 @@ checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842" dependencies = [ "proc-macro2", "quote", - "syn 2.0.13", + "syn 2.0.14", ] [[package]] @@ -356,9 +356,9 @@ checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" [[package]] name = "cached" -version = "0.42.0" +version = "0.43.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e5877db5d1af7fae60d06b5db9430b68056a69b3582a0be8e3691e87654aeb6" +checksum = "bc2fafddf188d13788e7099295a59b99e99b2148ab2195cae454e754cc099925" dependencies = [ "async-trait", "async_once", @@ -610,7 +610,7 @@ dependencies = [ "proc-macro2", "quote", "scratch", - "syn 2.0.13", + "syn 2.0.14", ] [[package]] @@ -627,7 +627,7 @@ checksum = "2345488264226bf682893e25de0769f3360aac9957980ec49361b083ddaa5bc5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.13", + "syn 2.0.14", ] [[package]] @@ -720,7 +720,7 @@ dependencies = [ "proc-macro2", "proc-macro2-diagnostics", "quote", - "syn 2.0.13", + "syn 2.0.14", ] [[package]] @@ -1018,7 +1018,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.13", + "syn 2.0.14", ] [[package]] @@ -1868,7 +1868,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.13", + "syn 2.0.14", ] [[package]] @@ -1980,7 +1980,7 @@ dependencies = [ "proc-macro2", "proc-macro2-diagnostics", "quote", - "syn 2.0.13", + "syn 2.0.14", ] [[package]] @@ -2028,7 +2028,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.13", + "syn 2.0.14", ] [[package]] @@ -2176,7 +2176,7 @@ checksum = "606c4ba35817e2922a308af55ad51bab3645b59eae5c570d4a6cf07e36bd493b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.13", + "syn 2.0.14", "version_check", "yansi", ] @@ -2319,7 +2319,7 @@ checksum = "8d2275aab483050ab2a7364c1a46604865ee7d6906684e08db0f090acf74f9e7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.13", + "syn 2.0.14", ] [[package]] @@ -2487,7 +2487,7 @@ dependencies = [ "proc-macro2", "quote", "rocket_http", - "syn 2.0.13", + "syn 2.0.14", "unicode-xid", ] @@ -2693,9 +2693,9 @@ checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed" [[package]] name = "serde" -version = "1.0.159" +version = "1.0.160" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c04e8343c3daeec41f58990b9d77068df31209f2af111e059e9fe9646693065" +checksum = "bb2f3770c8bce3bcda7e149193a069a0f4365bda1fa5cd88e03bca26afc1216c" dependencies = [ "serde_derive", ] @@ -2712,13 +2712,13 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.159" +version = "1.0.160" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c614d17805b093df4b147b51339e7e44bf05ef59fba1e45d83500bcfb4d8585" +checksum = "291a097c63d8497e00160b166a967a4a79c64f3facdd01cbd7502231688d77df" dependencies = [ "proc-macro2", "quote", - "syn 2.0.13", + "syn 2.0.14", ] [[package]] @@ -2903,9 +2903,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c9da457c5285ac1f936ebd076af6dac17a61cfe7826f2076b4d015cf47bc8ec" +checksum = "fcf316d5356ed6847742d036f8a39c3b8435cac10bd528a4bd461928a6ab34d5" dependencies = [ "proc-macro2", "quote", @@ -2964,7 +2964,7 @@ checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.13", + "syn 2.0.14", ] [[package]] @@ -3057,7 +3057,7 @@ checksum = "61a573bdc87985e9d6ddeed1b3d864e8a302c847e40d647746df2f1de209d1ce" dependencies = [ "proc-macro2", "quote", - "syn 2.0.13", + "syn 2.0.14", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index a5167a26..013a1e41 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,7 +71,7 @@ futures = "0.3.28" tokio = { version = "1.27.0", features = ["rt-multi-thread", "fs", "io-util", "parking_lot", "time", "signal"] } # A generic serialization/deserialization framework -serde = { version = "1.0.159", features = ["derive"] } +serde = { version = "1.0.160", features = ["derive"] } serde_json = "1.0.95" # A safe, extensible ORM and Query builder @@ -133,7 +133,7 @@ data-url = "0.2.0" bytes = "1.4.0" # Cache function results (Used for version check and favicon fetching) -cached = "0.42.0" +cached = "0.43.0" # Used for custom short lived cookie jar during favicon extraction cookie = "0.16.2" diff --git a/src/api/notifications.rs b/src/api/notifications.rs index 22c3bf1b..7bc502ee 100644 --- a/src/api/notifications.rs +++ b/src/api/notifications.rs @@ -74,7 +74,7 @@ async fn websockets_hub<'r>( ws: rocket_ws::WebSocket, data: WsAccessToken, ip: ClientIp, -) -> Result, Error> { +) -> Result { let addr = ip.ip; info!("Accepting Rocket WS connection from {addr}"); @@ -93,19 +93,19 @@ async fn websockets_hub<'r>( (rx, WSEntryMapGuard::new(users, claims.sub, entry_uuid, addr)) }; - Ok(ws.channel(move |mut stream| { - Box::pin(async move { - // Make sure the guard is moved into the channel future so it's not dropped earlier + Ok({ + rocket_ws::Stream! { ws => { + let mut ws = ws; let _guard = guard; let mut interval = tokio::time::interval(Duration::from_secs(15)); loop { tokio::select! { - res = stream.next() => { + res = ws.next() => { match res { Some(Ok(message)) => { match message { // Respond to any pings - Message::Ping(ping) => stream.send(Message::Pong(ping)).await?, + Message::Ping(ping) => yield Message::Pong(ping), Message::Pong(_) => {/* Ignored */}, // We should receive an initial message with the protocol and version, and we will reply to it @@ -113,12 +113,12 @@ async fn websockets_hub<'r>( let msg = message.strip_suffix(RECORD_SEPARATOR as char).unwrap_or(message); if serde_json::from_str(msg).ok() == Some(INITIAL_MESSAGE) { - stream.send(Message::binary(INITIAL_RESPONSE)).await?; + yield Message::binary(INITIAL_RESPONSE); continue; } } // Just echo anything else the client sends - _ => stream.send(message).await?, + _ => yield message, } } _ => break, @@ -127,18 +127,16 @@ async fn websockets_hub<'r>( res = rx.recv() => { match res { - Some(res) => stream.send(res).await?, + Some(res) => yield res, None => break, } } - _ = interval.tick() => stream.send(Message::Ping(create_ping())).await? + _ = interval.tick() => yield Message::Ping(create_ping()) } } - - Ok(()) - }) - })) + }} + }) } //