From c8051294b0a6556b99519eb93ff50266e1b63125 Mon Sep 17 00:00:00 2001 From: Boris-Chengbiao Zhou Date: Sat, 2 Apr 2016 20:04:51 +0200 Subject: [PATCH] Switch from rust-websocket to ws-rs --- Cargo.toml | 4 +- src/bin/livereload.rs | 210 ------------------------------------------ src/bin/mdbook.rs | 37 +++++--- 3 files changed, 26 insertions(+), 225 deletions(-) delete mode 100644 src/bin/livereload.rs diff --git a/Cargo.toml b/Cargo.toml index 504d042f..43b186e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ crossbeam = { version = "0.2.8", optional = true } # Serve feature iron = { version = "0.3", optional = true } staticfile = { version = "0.2", optional = true } -websocket = { version = "0.16.1", optional = true} +ws = { version = "0.4.6", optional = true} # Tests @@ -42,7 +42,7 @@ debug = [] output = [] regenerate-css = [] watch = ["notify", "time", "crossbeam"] -serve = ["iron", "staticfile", "websocket"] +serve = ["iron", "staticfile", "ws"] [[bin]] doc = false diff --git a/src/bin/livereload.rs b/src/bin/livereload.rs deleted file mode 100644 index 23b15b2d..00000000 --- a/src/bin/livereload.rs +++ /dev/null @@ -1,210 +0,0 @@ -extern crate websocket; -extern crate crossbeam; - -use std::sync::mpsc::channel; -use std::sync::mpsc; -use std::io; -use std::thread; -use std::sync::{Arc, Mutex}; -use std::ops::Deref; -use std::marker::PhantomData; - -use self::websocket::header::WebSocketProtocol; -use self::websocket::ws::sender::Sender; -use self::websocket::ws::receiver::Receiver; -use self::websocket::message::Type; -use self::websocket::{Server, Message}; - -const WS_PROTOCOL: &'static str = "livereload"; -const RELOAD_COMMAND: &'static str = "reload"; - - -#[derive(Debug, Clone, PartialEq)] -enum MessageType { - Reload, - Close, -} - - -#[derive(Clone)] -struct ComparableSender { - sender: mpsc::Sender, - id: usize, -} - -impl PartialEq for ComparableSender { - fn eq(&self, other: &Self) -> bool { - self.id == other.id - } -} - -impl Deref for ComparableSender { - type Target = mpsc::Sender; - - fn deref(&self) -> &mpsc::Sender { - &self.sender - } -} - - -struct ComparableSenderFactory { - next_id: usize, - sender_type: PhantomData, -} - -impl ComparableSenderFactory { - fn generate(&mut self, sender: mpsc::Sender) -> ComparableSender { - let tx = ComparableSender { - sender: sender, - id: self.next_id, - }; - self.next_id += 1; - tx - } - - fn new() -> ComparableSenderFactory { - ComparableSenderFactory { - next_id: 0, - sender_type: PhantomData, - } - } -} - - -pub struct LiveReload { - senders: Arc>>>, -} - -impl LiveReload { - pub fn new(address: &str) -> io::Result { - let server = try!(Server::bind(address)); - - let senders: Arc>>> = Arc::new(Mutex::new(vec![])); - let senders_clone = senders.clone(); - - let mut factory = ComparableSenderFactory::new(); - - let lr = LiveReload { senders: senders_clone }; - - // handle connection attempts on a separate thread - thread::spawn(move || { - for connection in server { - let mut senders = senders.clone(); - let (tx, rx) = channel(); - - let tx = factory.generate(tx); - - senders.lock().unwrap().push(tx.clone()); - - // each connection gets a separate thread - thread::spawn(move || { - let request = connection.unwrap().read_request().unwrap(); - let headers = request.headers.clone(); - - let mut valid = false; - if let Some(&WebSocketProtocol(ref protocols)) = headers.get() { - if protocols.contains(&(WS_PROTOCOL.to_owned())) { - valid = true; - } - } - - let client; - if valid { - let mut response = request.accept(); - response.headers.set(WebSocketProtocol(vec![WS_PROTOCOL.to_owned()])); - client = response.send().unwrap(); - } else { - request.fail().send().unwrap(); - println!("{:?}", "Rejecting invalid websocket request."); - return; - } - - let (mut ws_tx, mut ws_rx) = client.split(); - - // handle receiving and sending (websocket) in two separate threads - crossbeam::scope(|scope| { - let tx_clone = tx.clone(); - scope.spawn(move || { - let tx = tx_clone; - loop { - match rx.recv() { - Ok(msg) => { - match msg { - MessageType::Reload => { - let message: Message = Message::text(RELOAD_COMMAND.to_owned()); - let mut senders = senders.clone(); - if ws_tx.send_message(&message).is_err() { - // the receiver isn't available anymore - // remove the tx from senders and exit - LiveReload::remove_sender(&mut senders, &tx); - break; - } - }, - MessageType::Close => { - LiveReload::remove_sender(&mut senders, &tx); - break; - }, - } - }, - Err(e) => { - println!("{:?}", e); - break; - }, - } - } - }); - - for message in ws_rx.incoming_messages() { - match message { - Ok(message) => { - let message: Message = message; - match message.opcode { - Type::Close => { - tx.send(MessageType::Close).unwrap(); - break; - }, - // TODO ? - // Type::Ping => { - // let message = websocket::Message::pong(message.payload); - // ws_tx.send_message(&message).unwrap(); - // }, - _ => { - println!("{:?}", message.opcode); - unimplemented!() - }, - } - }, - Err(err) => { - println!("Error: {}", err); - break; - }, - } - } - }); - }); - } - }); - - Ok(lr) - } - - fn remove_sender(senders: &mut Arc>>>, el: &ComparableSender) { - let mut senders = senders.lock().unwrap(); - let mut index = 0; - for i in 0..senders.len() { - if &senders[i] == el { - index = i; - break; - } - } - senders.remove(index); - } - - pub fn trigger_reload(&self) { - let senders = self.senders.lock().unwrap(); - println!("Reloading {} client(s).", senders.len()); - for sender in senders.iter() { - sender.send(MessageType::Reload).unwrap(); - } - } -} diff --git a/src/bin/mdbook.rs b/src/bin/mdbook.rs index 5fde4750..be69f8b4 100644 --- a/src/bin/mdbook.rs +++ b/src/bin/mdbook.rs @@ -15,9 +15,8 @@ extern crate time; extern crate iron; #[cfg(feature = "serve")] extern crate staticfile; - #[cfg(feature = "serve")] -mod livereload; +extern crate ws; use std::env; use std::error::Error; @@ -32,10 +31,6 @@ use notify::Watcher; #[cfg(feature = "watch")] use std::sync::mpsc::channel; -// Uses for the Serve feature -#[cfg(feature = "serve")] -use livereload::LiveReload; - use mdbook::MDBook; @@ -187,16 +182,21 @@ fn watch(args: &ArgMatches) -> Result<(), Box> { // Watch command implementation #[cfg(feature = "serve")] fn serve(args: &ArgMatches) -> Result<(), Box> { + const RELOAD_COMMAND: &'static str = "reload"; + let book_dir = get_book_dir(args); let mut book = MDBook::new(&book_dir).read_config(); let port = args.value_of("port").unwrap_or("3000"); let ws_port = args.value_of("ws-port").unwrap_or("3001"); + let address = format!("localhost:{}", port); + let ws_address = format!("localhost:{}", ws_port); + book.set_livereload(format!(r#" - "#, ws_port).to_owned()); + "#, ws_port, RELOAD_COMMAND).to_owned()); try!(book.build()); let staticfile = staticfile::Static::new(book.get_dest()); let iron = iron::Iron::new(staticfile); - let _iron = iron.http(&*format!("localhost:{}", port)).unwrap(); + let _iron = iron.http(&*address).unwrap(); - let lr = LiveReload::new(&format!("localhost:{}", ws_port)).unwrap(); + let ws_server = ws::WebSocket::new(|_| { + |_| { + Ok(()) + } + }).unwrap(); + + let broadcaster = ws_server.broadcaster(); + + std::thread::spawn(move || { + ws_server.listen(&*ws_address).unwrap(); + }); - println!("{:?}", "Registering change trigger"); trigger_on_change(&mut book, move |event, book| { if let Some(path) = event.path { println!("File changed: {:?}\nBuilding book...\n", path); match book.build() { Err(e) => println!("Error while building: {:?}", e), - _ => lr.trigger_reload(), + _ => broadcaster.send(RELOAD_COMMAND).unwrap(), } println!(""); } @@ -282,6 +291,8 @@ fn trigger_on_change(book: &mut MDBook, closure: F) -> () let mut previous_time = time::get_time(); + println!("\nListening for changes...\n"); + loop { match rx.recv() { Ok(event) => {