Implement login-with-device

This commit is contained in:
Bernd Schoolmann
2023-08-04 21:12:23 +02:00
committed by Mathijs van Veluw
parent e9ec3741ae
commit 8d7b3db33d
20 changed files with 842 additions and 8 deletions

View File

@ -36,10 +36,19 @@ static WS_USERS: Lazy<Arc<WebSocketUsers>> = Lazy::new(|| {
})
});
use super::{push_cipher_update, push_folder_update, push_logout, push_send_update, push_user_update};
pub static WS_ANONYMOUS_SUBSCRIPTIONS: Lazy<Arc<AnonymousWebSocketSubscriptions>> = Lazy::new(|| {
Arc::new(AnonymousWebSocketSubscriptions {
map: Arc::new(dashmap::DashMap::new()),
})
});
use super::{
push::push_auth_request, push::push_auth_response, push_cipher_update, push_folder_update, push_logout,
push_send_update, push_user_update,
};
pub fn routes() -> Vec<Route> {
routes![websockets_hub]
routes![websockets_hub, anonymous_websockets_hub]
}
#[derive(FromForm, Debug)]
@ -74,6 +83,29 @@ impl Drop for WSEntryMapGuard {
}
}
struct WSAnonymousEntryMapGuard {
subscriptions: Arc<AnonymousWebSocketSubscriptions>,
token: String,
addr: IpAddr,
}
impl WSAnonymousEntryMapGuard {
fn new(subscriptions: Arc<AnonymousWebSocketSubscriptions>, token: String, addr: IpAddr) -> Self {
Self {
subscriptions,
token,
addr,
}
}
}
impl Drop for WSAnonymousEntryMapGuard {
fn drop(&mut self) {
info!("Closing WS connection from {}", self.addr);
self.subscriptions.map.remove(&self.token);
}
}
#[get("/hub?<data..>")]
fn websockets_hub<'r>(
ws: rocket_ws::WebSocket,
@ -144,6 +176,72 @@ fn websockets_hub<'r>(
})
}
#[get("/anonymous-hub?<token..>")]
fn anonymous_websockets_hub<'r>(
ws: rocket_ws::WebSocket,
token: String,
ip: ClientIp,
) -> Result<rocket_ws::Stream!['r], Error> {
let addr = ip.ip;
info!("Accepting Anonymous Rocket WS connection from {addr}");
let (mut rx, guard) = {
let subscriptions = Arc::clone(&WS_ANONYMOUS_SUBSCRIPTIONS);
// Add a channel to send messages to this client to the map
let (tx, rx) = tokio::sync::mpsc::channel::<Message>(100);
subscriptions.map.insert(token.clone(), tx);
// Once the guard goes out of scope, the connection will have been closed and the entry will be deleted from the map
(rx, WSAnonymousEntryMapGuard::new(subscriptions, token, addr))
};
Ok({
rocket_ws::Stream! { ws => {
let mut ws = ws;
let _guard = guard;
let mut interval = tokio::time::interval(Duration::from_secs(15));
loop {
tokio::select! {
res = ws.next() => {
match res {
Some(Ok(message)) => {
match message {
// Respond to any pings
Message::Ping(ping) => yield Message::Pong(ping),
Message::Pong(_) => {/* Ignored */},
// We should receive an initial message with the protocol and version, and we will reply to it
Message::Text(ref message) => {
let msg = message.strip_suffix(RECORD_SEPARATOR as char).unwrap_or(message);
if serde_json::from_str(msg).ok() == Some(INITIAL_MESSAGE) {
yield Message::binary(INITIAL_RESPONSE);
continue;
}
}
// Just echo anything else the client sends
_ => yield message,
}
}
_ => break,
}
}
res = rx.recv() => {
match res {
Some(res) => yield res,
None => break,
}
}
_ = interval.tick() => yield Message::Ping(create_ping())
}
}
}}
})
}
//
// Websockets server
//
@ -352,6 +450,69 @@ impl WebSocketUsers {
push_send_update(ut, send, acting_device_uuid, conn).await;
}
}
pub async fn send_auth_request(
&self,
user_uuid: &String,
auth_request_uuid: &String,
acting_device_uuid: &String,
conn: &mut DbConn,
) {
let data = create_update(
vec![("Id".into(), auth_request_uuid.clone().into()), ("UserId".into(), user_uuid.clone().into())],
UpdateType::AuthRequest,
Some(acting_device_uuid.to_string()),
);
self.send_update(user_uuid, &data).await;
if CONFIG.push_enabled() {
push_auth_request(user_uuid.to_string(), auth_request_uuid.to_string(), conn).await;
}
}
pub async fn send_auth_response(
&self,
user_uuid: &String,
auth_response_uuid: &str,
approving_device_uuid: String,
conn: &mut DbConn,
) {
let data = create_update(
vec![("Id".into(), auth_response_uuid.to_owned().into()), ("UserId".into(), user_uuid.clone().into())],
UpdateType::AuthRequestResponse,
approving_device_uuid.clone().into(),
);
self.send_update(auth_response_uuid, &data).await;
if CONFIG.push_enabled() {
push_auth_response(user_uuid.to_string(), auth_response_uuid.to_string(), approving_device_uuid, conn)
.await;
}
}
}
#[derive(Clone)]
pub struct AnonymousWebSocketSubscriptions {
map: Arc<dashmap::DashMap<String, Sender<Message>>>,
}
impl AnonymousWebSocketSubscriptions {
async fn send_update(&self, token: &str, data: &[u8]) {
if let Some(sender) = self.map.get(token).map(|v| v.clone()) {
if let Err(e) = sender.send(Message::binary(data)).await {
error!("Error sending WS update {e}");
}
}
}
pub async fn send_auth_response(&self, user_uuid: &String, auth_response_uuid: &str) {
let data = create_anonymous_update(
vec![("Id".into(), auth_response_uuid.to_owned().into()), ("UserId".into(), user_uuid.clone().into())],
UpdateType::AuthRequestResponse,
user_uuid.to_string(),
);
self.send_update(auth_response_uuid, &data).await;
}
}
/* Message Structure
@ -387,6 +548,24 @@ fn create_update(payload: Vec<(Value, Value)>, ut: UpdateType, acting_device_uui
serialize(value)
}
fn create_anonymous_update(payload: Vec<(Value, Value)>, ut: UpdateType, user_id: String) -> Vec<u8> {
use rmpv::Value as V;
let value = V::Array(vec![
1.into(),
V::Map(vec![]),
V::Nil,
"AuthRequestResponseRecieved".into(),
V::Array(vec![V::Map(vec![
("Type".into(), (ut as i32).into()),
("Payload".into(), payload.into()),
("UserId".into(), user_id.into()),
])]),
]);
serialize(value)
}
fn create_ping() -> Vec<u8> {
serialize(Value::Array(vec![6.into()]))
}
@ -420,6 +599,7 @@ pub enum UpdateType {
}
pub type Notify<'a> = &'a rocket::State<Arc<WebSocketUsers>>;
pub type AnonymousNotify<'a> = &'a rocket::State<Arc<AnonymousWebSocketSubscriptions>>;
pub fn start_notification_server() -> Arc<WebSocketUsers> {
let users = Arc::clone(&WS_USERS);