1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
//! The asynchronous implementation of a websocket server.
use bytes::BytesMut;
use futures::{Future, Stream};
use server::upgrade::async::{IntoWs, Upgrade};
use server::InvalidConnection;
use server::{NoTlsAcceptor, WsServer};
use std::io;
use std::net::SocketAddr;
use std::net::ToSocketAddrs;
use tokio_core::net::{TcpListener, TcpStream};
pub use tokio_core::reactor::Handle;

#[cfg(any(feature = "async-ssl"))]
use native_tls::TlsAcceptor;
#[cfg(any(feature = "async-ssl"))]
use tokio_tls::{TlsAcceptor as TlsAcceptorExt, TlsStream};

/// The asynchronous specialization of a websocket server.
/// Use this struct to create asynchronous servers.
pub type Server<S> = WsServer<S, TcpListener>;

/// A stream of websocket connections and addresses the server generates.
///
/// Each item of the stream is the address of the incoming connection and an `Upgrade`
/// struct which lets the user decide whether to turn the connection into a websocket
/// connection or reject it.
pub type Incoming<S> =
	Box<Stream<Item = (Upgrade<S>, SocketAddr), Error = InvalidConnection<S, BytesMut>> + Send>;

/// Asynchronous methods for creating an async server and accepting incoming connections.
impl WsServer<NoTlsAcceptor, TcpListener> {
	/// Bind a websocket server to an address.
	/// Creating a websocket server can be done immediately so this does not
	/// return a `Future` but a simple `Result`.
	pub fn bind<A: ToSocketAddrs>(addr: A, handle: &Handle) -> io::Result<Self> {
		let tcp = ::std::net::TcpListener::bind(addr)?;
		let address = tcp.local_addr()?;
		Ok(Server {
			listener: TcpListener::from_listener(tcp, &address, handle)?,
			ssl_acceptor: NoTlsAcceptor,
		})
	}

	/// Turns the server into a stream of connection objects.
	///
	/// Each item of the stream is the address of the incoming connection and an `Upgrade`
	/// struct which lets the user decide whether to turn the connection into a websocket
	/// connection or reject it.
	///
	/// See the [`examples/async-server.rs`]
	/// (https://github.com/cyderize/rust-websocket/blob/master/examples/async-server.rs)
	/// example for a good echo server example.
	pub fn incoming(self) -> Incoming<TcpStream> {
		let future = self
			.listener
			.incoming()
			.map_err(|e| InvalidConnection {
				stream: None,
				parsed: None,
				buffer: None,
				error: e.into(),
			})
			.and_then(|(stream, a)| {
				stream
					.into_ws()
					.map_err(|(stream, req, buf, err)| InvalidConnection {
						stream: Some(stream),
						parsed: req,
						buffer: Some(buf),
						error: err,
					})
					.map(move |u| (u, a))
			});
		Box::new(future)
	}
}

/// Asynchronous methods for creating an async SSL server and accepting incoming connections.
#[cfg(any(feature = "async-ssl"))]
impl WsServer<TlsAcceptor, TcpListener> {
	/// Bind an SSL websocket server to an address.
	/// Creating a websocket server can be done immediately so this does not
	/// return a `Future` but a simple `Result`.
	///
	/// Since this is an SSL server one needs to provide a `TlsAcceptor` that contains
	/// the server's SSL information.
	pub fn bind_secure<A: ToSocketAddrs>(
		addr: A,
		acceptor: TlsAcceptor,
		handle: &Handle,
	) -> io::Result<Self> {
		let tcp = ::std::net::TcpListener::bind(addr)?;
		let address = tcp.local_addr()?;
		Ok(Server {
			listener: TcpListener::from_listener(tcp, &address, handle)?,
			ssl_acceptor: acceptor,
		})
	}

	/// Turns the server into a stream of connection objects.
	///
	/// Each item of the stream is the address of the incoming connection and an `Upgrade`
	/// struct which lets the user decide whether to turn the connection into a websocket
	/// connection or reject it.
	///
	/// See the [`examples/async-server.rs`]
	/// (https://github.com/cyderize/rust-websocket/blob/master/examples/async-server.rs)
	/// example for a good echo server example.
	pub fn incoming(self) -> Incoming<TlsStream<TcpStream>> {
		let acceptor = TlsAcceptorExt::from(self.ssl_acceptor);
		let future = self
			.listener
			.incoming()
			.map_err(|e| InvalidConnection {
				stream: None,
				parsed: None,
				buffer: None,
				error: e.into(),
			})
			.and_then(move |(stream, a)| {
				acceptor
					.accept(stream)
					.map_err(|e| {
						InvalidConnection {
							stream: None,
							parsed: None,
							buffer: None,
							// TODO: better error types
							error: io::Error::new(io::ErrorKind::Other, e).into(),
						}
					})
					.map(move |s| (s, a))
			})
			.and_then(|(stream, a)| {
				stream
					.into_ws()
					.map_err(|(stream, req, buf, err)| InvalidConnection {
						stream: Some(stream),
						parsed: req,
						buffer: Some(buf),
						error: err,
					})
					.map(move |u| (u, a))
			});
		Box::new(future)
	}
}