1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
//! Allows taking an existing stream of data and asynchronously convert it to a
//! websocket client.
//!
//! This module contains the trait that transforms stream into
//! an intermediate struct called `Upgrade` and the `Upgrade` struct itself.
//! The `Upgrade` struct is used to inspect details of the websocket connection
//! (e.g. what protocols it wants to use) and decide whether to accept or reject it.
use super::{validate, HyperIntoWsError, Request, WsUpgrade};
use bytes::BytesMut;
use client::async::ClientNew;
use codec::http::HttpServerCodec;
use codec::ws::{Context, MessageCodec};
use futures::sink::Send as SinkSend;
use futures::Stream as StreamTrait;
use futures::{Future, Sink};
use hyper::header::Headers;
use hyper::http::h1::Incoming;
use hyper::status::StatusCode;
use std::io::{self, ErrorKind};
use stream::async::Stream;
use tokio_io::codec::{Framed, FramedParts};

/// An asynchronous websocket upgrade.
///
/// This struct is given when a connection is being upgraded to a websocket
/// request. It implements everything that a normal `WsUpgrade` struct does
/// along with the final functions to create websocket clients (although this
/// is done asynchronously).
///
/// # Example
///
/// ```rust,no_run
/// use websocket::async::{Core, TcpListener, TcpStream};
/// use websocket::async::futures::{Stream, Future};
/// use websocket::async::server::upgrade::IntoWs;
/// use websocket::sync::Client;
///
/// let mut core = Core::new().unwrap();
/// let handle = core.handle();
/// let addr = "127.0.0.1:80".parse().unwrap();
/// let listener = TcpListener::bind(&addr, &handle).unwrap();
///
/// let websocket_clients = listener
///     .incoming().map_err(|e| e.into())
///     .and_then(|(stream, _)| stream.into_ws().map_err(|e| e.3))
///     .map(|upgrade| {
///         if upgrade.protocols().iter().any(|p| p == "super-cool-proto") {
///             let accepted = upgrade
///                 .use_protocol("super-cool-proto")
///                 .accept()
///                 .map(|_| ()).map_err(|_| ());
///
///             handle.spawn(accepted);
///         } else {
///             let rejected = upgrade.reject()
///                 .map(|_| ()).map_err(|_| ());
///
///             handle.spawn(rejected);
///         }
///     });
/// ```
pub type Upgrade<S> = WsUpgrade<S, BytesMut>;

/// These are the extra functions given to `WsUpgrade` with the `async` feature
/// turned on. A type alias for this specialization of `WsUpgrade` lives in this
/// module under the name `Upgrade`.
impl<S> WsUpgrade<S, BytesMut>
where
	S: Stream + Send + 'static,
{
	/// Asynchronously accept the websocket handshake, then create a client.
	/// This will asynchronously send a response accepting the connection
	/// and create a websocket client.
	pub fn accept(self) -> ClientNew<S> {
		self.internal_accept(None)
	}

	/// Asynchronously accept the websocket handshake, then create a client.
	/// This will asynchronously send a response accepting the connection
	/// with custom headers in the response and create a websocket client.
	pub fn accept_with(self, custom_headers: &Headers) -> ClientNew<S> {
		self.internal_accept(Some(custom_headers))
	}

	fn internal_accept(mut self, custom_headers: Option<&Headers>) -> ClientNew<S> {
		let status = self.prepare_headers(custom_headers);
		let WsUpgrade {
			headers,
			stream,
			request,
			buffer,
		} = self;

		let duplex = Framed::from_parts(
			FramedParts {
				inner: stream,
				readbuf: buffer,
				writebuf: BytesMut::with_capacity(0),
			},
			HttpServerCodec,
		);

		let future = duplex
			.send(Incoming {
				version: request.version,
				subject: status,
				headers: headers.clone(),
			})
			.map(move |s| {
				let codec = MessageCodec::default(Context::Server);
				let client = Framed::from_parts(s.into_parts(), codec);
				(client, headers)
			})
			.map_err(|e| e.into());
		Box::new(future)
	}

	/// Asynchronously send a rejection message and deconstruct `self`
	/// into it's original stream. The stream being returned is framed with the
	/// `HttpServerCodec` since that was used to send the rejection message.
	pub fn reject(self) -> SinkSend<Framed<S, HttpServerCodec>> {
		self.internal_reject(None)
	}

	/// Asynchronously send a rejection message with custom headers and
	/// deconstruct `self` into it's original stream.
	///  The stream being returned is framed with the
	/// `HttpServerCodec` since that was used to send the rejection message.
	pub fn reject_with(self, headers: &Headers) -> SinkSend<Framed<S, HttpServerCodec>> {
		self.internal_reject(Some(headers))
	}

	fn internal_reject(
		mut self,
		headers: Option<&Headers>,
	) -> SinkSend<Framed<S, HttpServerCodec>> {
		if let Some(custom) = headers {
			self.headers.extend(custom.iter());
		}
		let duplex = Framed::from_parts(
			FramedParts {
				inner: self.stream,
				readbuf: self.buffer,
				writebuf: BytesMut::with_capacity(0),
			},
			HttpServerCodec,
		);
		duplex.send(Incoming {
			version: self.request.version,
			subject: StatusCode::BadRequest,
			headers: self.headers,
		})
	}
}

/// Trait to take a stream or similar and attempt to recover the start of a
/// websocket handshake from it (asynchronously).
/// Should be used when a stream might contain a request for a websocket session.
///
/// If an upgrade request can be parsed, one can accept or deny the handshake with
/// the `WsUpgrade` struct.
/// Otherwise the original stream is returned along with an error.
///
/// Note: the stream is owned because the websocket client expects to own its stream.
///
/// This is already implemented for all async streams, which means all types with
/// `AsyncRead + AsyncWrite + 'static` (`'static` because the future wants to own
/// the stream).
///
/// # Example
///
/// ```rust,no_run
/// use websocket::async::{Core, TcpListener, TcpStream};
/// use websocket::async::futures::{Stream, Future};
/// use websocket::async::server::upgrade::IntoWs;
/// use websocket::sync::Client;
///
/// let mut core = Core::new().unwrap();
/// let handle = core.handle();
/// let addr = "127.0.0.1:80".parse().unwrap();
/// let listener = TcpListener::bind(&addr, &handle).unwrap();
///
/// let websocket_clients = listener
///     .incoming().map_err(|e| e.into())
///     .and_then(|(stream, _)| stream.into_ws().map_err(|e| e.3))
///     .map(|upgrade| {
///         if upgrade.protocols().iter().any(|p| p == "super-cool-proto") {
///             let accepted = upgrade
///                 .use_protocol("super-cool-proto")
///                 .accept()
///                 .map(|_| ()).map_err(|_| ());
///
///             handle.spawn(accepted);
///         } else {
///             let rejected = upgrade.reject()
///                 .map(|_| ()).map_err(|_| ());
///
///             handle.spawn(rejected);
///         }
///     });
/// ```
pub trait IntoWs {
	/// The type of stream this upgrade process is working with (TcpStream, etc.)
	type Stream: Stream;
	/// An error value in case the stream is not asking for a websocket connection
	/// or something went wrong. It is common to also include the stream here.
	type Error;
	/// Attempt to read and parse the start of a Websocket handshake, later
	/// with the  returned `WsUpgrade` struct, call `accept to start a
	/// websocket client, and `reject` to send a handshake rejection response.
	///
	/// Note: this is the asynchronous version, meaning it will not block when
	/// trying to read a request.
	fn into_ws(self) -> Box<Future<Item = Upgrade<Self::Stream>, Error = Self::Error> + Send>;
}

impl<S> IntoWs for S
where
	S: Stream + Send + 'static,
{
	type Stream = S;
	type Error = (S, Option<Request>, BytesMut, HyperIntoWsError);

	fn into_ws(self) -> Box<Future<Item = Upgrade<Self::Stream>, Error = Self::Error> + Send> {
		let future = self
			.framed(HttpServerCodec)
			.into_future()
			.map_err(|(e, s)| {
				let FramedParts { inner, readbuf, .. } = s.into_parts();
				(inner, None, readbuf, e.into())
			})
			.and_then(|(m, s)| {
				let FramedParts { inner, readbuf, .. } = s.into_parts();
				if let Some(msg) = m {
					match validate(&msg.subject.0, msg.version, &msg.headers) {
						Ok(()) => Ok((msg, inner, readbuf)),
						Err(e) => Err((inner, Some(msg), readbuf, e)),
					}
				} else {
					let err = HyperIntoWsError::Io(io::Error::new(
						ErrorKind::ConnectionReset,
						"Connection dropped before handshake could be read",
					));
					Err((inner, None, readbuf, err))
				}
			})
			.map(|(m, stream, buffer)| WsUpgrade {
				headers: Headers::new(),
				stream,
				request: m,
				buffer,
			});
		Box::new(future)
	}
}