1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
//! Allows taking an existing stream of data and asynchronously convert it to a //! websocket client. //! //! This module contains the trait that transforms stream into //! an intermediate struct called `Upgrade` and the `Upgrade` struct itself. //! The `Upgrade` struct is used to inspect details of the websocket connection //! (e.g. what protocols it wants to use) and decide whether to accept or reject it. use super::{validate, HyperIntoWsError, Request, WsUpgrade}; use bytes::BytesMut; use client::async::ClientNew; use codec::http::HttpServerCodec; use codec::ws::{Context, MessageCodec}; use futures::sink::Send as SinkSend; use futures::Stream as StreamTrait; use futures::{Future, Sink}; use hyper::header::Headers; use hyper::http::h1::Incoming; use hyper::status::StatusCode; use std::io::{self, ErrorKind}; use stream::async::Stream; use tokio_io::codec::{Framed, FramedParts}; /// An asynchronous websocket upgrade. /// /// This struct is given when a connection is being upgraded to a websocket /// request. It implements everything that a normal `WsUpgrade` struct does /// along with the final functions to create websocket clients (although this /// is done asynchronously). /// /// # Example /// /// ```rust,no_run /// use websocket::async::{Core, TcpListener, TcpStream}; /// use websocket::async::futures::{Stream, Future}; /// use websocket::async::server::upgrade::IntoWs; /// use websocket::sync::Client; /// /// let mut core = Core::new().unwrap(); /// let handle = core.handle(); /// let addr = "127.0.0.1:80".parse().unwrap(); /// let listener = TcpListener::bind(&addr, &handle).unwrap(); /// /// let websocket_clients = listener /// .incoming().map_err(|e| e.into()) /// .and_then(|(stream, _)| stream.into_ws().map_err(|e| e.3)) /// .map(|upgrade| { /// if upgrade.protocols().iter().any(|p| p == "super-cool-proto") { /// let accepted = upgrade /// .use_protocol("super-cool-proto") /// .accept() /// .map(|_| ()).map_err(|_| ()); /// /// handle.spawn(accepted); /// } else { /// let rejected = upgrade.reject() /// .map(|_| ()).map_err(|_| ()); /// /// handle.spawn(rejected); /// } /// }); /// ``` pub type Upgrade<S> = WsUpgrade<S, BytesMut>; /// These are the extra functions given to `WsUpgrade` with the `async` feature /// turned on. A type alias for this specialization of `WsUpgrade` lives in this /// module under the name `Upgrade`. impl<S> WsUpgrade<S, BytesMut> where S: Stream + Send + 'static, { /// Asynchronously accept the websocket handshake, then create a client. /// This will asynchronously send a response accepting the connection /// and create a websocket client. pub fn accept(self) -> ClientNew<S> { self.internal_accept(None) } /// Asynchronously accept the websocket handshake, then create a client. /// This will asynchronously send a response accepting the connection /// with custom headers in the response and create a websocket client. pub fn accept_with(self, custom_headers: &Headers) -> ClientNew<S> { self.internal_accept(Some(custom_headers)) } fn internal_accept(mut self, custom_headers: Option<&Headers>) -> ClientNew<S> { let status = self.prepare_headers(custom_headers); let WsUpgrade { headers, stream, request, buffer, } = self; let duplex = Framed::from_parts( FramedParts { inner: stream, readbuf: buffer, writebuf: BytesMut::with_capacity(0), }, HttpServerCodec, ); let future = duplex .send(Incoming { version: request.version, subject: status, headers: headers.clone(), }) .map(move |s| { let codec = MessageCodec::default(Context::Server); let client = Framed::from_parts(s.into_parts(), codec); (client, headers) }) .map_err(|e| e.into()); Box::new(future) } /// Asynchronously send a rejection message and deconstruct `self` /// into it's original stream. The stream being returned is framed with the /// `HttpServerCodec` since that was used to send the rejection message. pub fn reject(self) -> SinkSend<Framed<S, HttpServerCodec>> { self.internal_reject(None) } /// Asynchronously send a rejection message with custom headers and /// deconstruct `self` into it's original stream. /// The stream being returned is framed with the /// `HttpServerCodec` since that was used to send the rejection message. pub fn reject_with(self, headers: &Headers) -> SinkSend<Framed<S, HttpServerCodec>> { self.internal_reject(Some(headers)) } fn internal_reject( mut self, headers: Option<&Headers>, ) -> SinkSend<Framed<S, HttpServerCodec>> { if let Some(custom) = headers { self.headers.extend(custom.iter()); } let duplex = Framed::from_parts( FramedParts { inner: self.stream, readbuf: self.buffer, writebuf: BytesMut::with_capacity(0), }, HttpServerCodec, ); duplex.send(Incoming { version: self.request.version, subject: StatusCode::BadRequest, headers: self.headers, }) } } /// Trait to take a stream or similar and attempt to recover the start of a /// websocket handshake from it (asynchronously). /// Should be used when a stream might contain a request for a websocket session. /// /// If an upgrade request can be parsed, one can accept or deny the handshake with /// the `WsUpgrade` struct. /// Otherwise the original stream is returned along with an error. /// /// Note: the stream is owned because the websocket client expects to own its stream. /// /// This is already implemented for all async streams, which means all types with /// `AsyncRead + AsyncWrite + 'static` (`'static` because the future wants to own /// the stream). /// /// # Example /// /// ```rust,no_run /// use websocket::async::{Core, TcpListener, TcpStream}; /// use websocket::async::futures::{Stream, Future}; /// use websocket::async::server::upgrade::IntoWs; /// use websocket::sync::Client; /// /// let mut core = Core::new().unwrap(); /// let handle = core.handle(); /// let addr = "127.0.0.1:80".parse().unwrap(); /// let listener = TcpListener::bind(&addr, &handle).unwrap(); /// /// let websocket_clients = listener /// .incoming().map_err(|e| e.into()) /// .and_then(|(stream, _)| stream.into_ws().map_err(|e| e.3)) /// .map(|upgrade| { /// if upgrade.protocols().iter().any(|p| p == "super-cool-proto") { /// let accepted = upgrade /// .use_protocol("super-cool-proto") /// .accept() /// .map(|_| ()).map_err(|_| ()); /// /// handle.spawn(accepted); /// } else { /// let rejected = upgrade.reject() /// .map(|_| ()).map_err(|_| ()); /// /// handle.spawn(rejected); /// } /// }); /// ``` pub trait IntoWs { /// The type of stream this upgrade process is working with (TcpStream, etc.) type Stream: Stream; /// An error value in case the stream is not asking for a websocket connection /// or something went wrong. It is common to also include the stream here. type Error; /// Attempt to read and parse the start of a Websocket handshake, later /// with the returned `WsUpgrade` struct, call `accept to start a /// websocket client, and `reject` to send a handshake rejection response. /// /// Note: this is the asynchronous version, meaning it will not block when /// trying to read a request. fn into_ws(self) -> Box<Future<Item = Upgrade<Self::Stream>, Error = Self::Error> + Send>; } impl<S> IntoWs for S where S: Stream + Send + 'static, { type Stream = S; type Error = (S, Option<Request>, BytesMut, HyperIntoWsError); fn into_ws(self) -> Box<Future<Item = Upgrade<Self::Stream>, Error = Self::Error> + Send> { let future = self .framed(HttpServerCodec) .into_future() .map_err(|(e, s)| { let FramedParts { inner, readbuf, .. } = s.into_parts(); (inner, None, readbuf, e.into()) }) .and_then(|(m, s)| { let FramedParts { inner, readbuf, .. } = s.into_parts(); if let Some(msg) = m { match validate(&msg.subject.0, msg.version, &msg.headers) { Ok(()) => Ok((msg, inner, readbuf)), Err(e) => Err((inner, Some(msg), readbuf, e)), } } else { let err = HyperIntoWsError::Io(io::Error::new( ErrorKind::ConnectionReset, "Connection dropped before handshake could be read", )); Err((inner, None, readbuf, err)) } }) .map(|(m, stream, buffer)| WsUpgrade { headers: Headers::new(), stream, request: m, buffer, }); Box::new(future) } }