diff --git a/Cargo.toml b/Cargo.toml index 1d5d87e..7011950 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,3 +8,11 @@ edition = "2021" [dependencies] diesel = {version = "2.1.3", features = ["postgres"]} dotenvy = "0.15.7" +serde = {version = "1.0.192", features = ["derive"]} +serde_json = "1.0.108" +tokio = {version = "1.34.0", features = ["macros", "sync", "rt-multi-thread"] } +websockets = "0.3.0" +tokio-stream = "0.1.14" +warp = "0.3" +futures = { version = "0.3", default-features = false } +uuid = { version = "1.1.2", features = ["serde", "v4"] } diff --git a/src/bin/handlers.rs b/src/bin/handlers.rs new file mode 100644 index 0000000..ca7809c --- /dev/null +++ b/src/bin/handlers.rs @@ -0,0 +1,6 @@ +use crate::{ws, Clients, Result}; +use warp::Reply; +pub async fn ws_handler(ws: warp::ws::Ws, clients: Clients) -> Result { + println!("ws_handler"); + Ok(ws.on_upgrade(move |socket| ws::client_connection(socket, clients))) +} diff --git a/src/bin/ws.rs b/src/bin/ws.rs new file mode 100644 index 0000000..a059374 --- /dev/null +++ b/src/bin/ws.rs @@ -0,0 +1,61 @@ +use self::modelos::*; +use rust_diesel_blogs::*; +use crate::{Client, Clients}; +use futures::{FutureExt, StreamExt}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::UnboundedReceiverStream; +use uuid::Uuid; +use warp::ws::{Message, WebSocket}; +pub async fn client_connection(ws: WebSocket, clients: Clients) { + println!("establishing client connection... {:?}", ws); + let (client_ws_sender, mut client_ws_rcv) = ws.split(); + let (client_sender, client_rcv) = mpsc::unbounded_channel(); + let client_rcv = UnboundedReceiverStream::new(client_rcv); + tokio::task::spawn(client_rcv.forward(client_ws_sender).map(|result| { + if let Err(e) = result { + println!("error sending websocket msg: {}", e); + } + })); + let uuid = Uuid::new_v4().simple().to_string(); + let new_client = Client { + client_id: uuid.clone(), + sender: Some(client_sender), + }; + clients.lock().await.insert(uuid.clone(), new_client); + while let Some(result) = client_ws_rcv.next().await { + let msg = match result { + Ok(msg) => msg, + Err(e) => { + println!("error receiving message for id {}): {}", uuid.clone(), e); + break; + } + }; + client_msg(&uuid, msg, &clients).await; + } + clients.lock().await.remove(&uuid); + println!("{} disconnected", uuid); +} +async fn client_msg(client_id: &str, msg: Message, clients: &Clients) { + println!("received message from {}: {:?}", client_id, msg); + let message = match msg.to_str() { + Ok(v) => v, + Err(_) => return, + }; + + println!("{}", &message); + let nuevopost: NuevoPost = serde_json::from_str(&message).unwrap(); + let connection = &mut establecer_coneccion(); + let post = crear_post(connection, nuevopost.title, nuevopost.body); + + let locked = clients.lock().await; + match locked.get(client_id) { + Some(v) => { + if let Some(sender) = &v.sender { + println!("Dando Respuesta"); + let _ = sender.send(Ok(Message::text(format!("{} fue cargado",nuevopost.title)))); + } + } + None => return, + } + return; +} diff --git a/src/bin/ws_escribir_post.rs b/src/bin/ws_escribir_post.rs new file mode 100644 index 0000000..bcf0033 --- /dev/null +++ b/src/bin/ws_escribir_post.rs @@ -0,0 +1,27 @@ +use std::{collections::HashMap, convert::Infallible, sync::Arc}; +use tokio::sync::{mpsc, Mutex}; +use warp::{ws::Message, Filter, Rejection}; +mod handlers; +mod ws; +#[derive(Debug, Clone)] +pub struct Client { + pub client_id: String, + pub sender: Option>>, +} +type Clients = Arc>>; +type Result = std::result::Result; +#[tokio::main] +async fn main() { + let clients: Clients = Arc::new(Mutex::new(HashMap::new())); + println!("Configuring websocket route"); + let ws_route = warp::path("ws") + .and(warp::ws()) + .and(with_clients(clients.clone())) + .and_then(handlers::ws_handler); + let routes = ws_route.with(warp::cors().allow_any_origin()); + println!("Starting server"); + warp::serve(routes).run(([192,168, 1, 4], 8000)).await; +} +fn with_clients(clients: Clients) -> impl Filter + Clone { + warp::any().map(move || clients.clone()) +} diff --git a/src/lib.rs b/src/lib.rs index a4ba4cb..742ef3d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,4 +28,4 @@ pub fn crear_post(conn: &mut PgConnection, title: &str, body: &str) -> Post { .returning(Post::as_returning()) .get_result(conn) .expect("Fallo la carga") -} \ No newline at end of file +} diff --git a/src/modelos.rs b/src/modelos.rs index a24c5c4..260489a 100644 --- a/src/modelos.rs +++ b/src/modelos.rs @@ -1,5 +1,7 @@ use diesel::{prelude::{Queryable, Insertable}, Selectable}; +use serde::{Serialize, Deserialize}; + use crate::schema::posts; #[derive(Queryable, Selectable)] @@ -12,9 +14,10 @@ pub struct Post { pub published: bool, } +#[derive(Serialize, Deserialize)] #[derive(Insertable)] #[diesel(table_name = posts)] pub struct NuevoPost<'a> { pub title: &'a str, pub body: &'a str, -} \ No newline at end of file +}