1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
use std::io::Read;
use std::io::Result as IoResult;
use hyper::buffer::BufReader;
use dataframe::{DataFrame, Opcode};
use message::OwnedMessage;
use result::{WebSocketError, WebSocketResult};
pub use stream::sync::Shutdown;
use stream::sync::{AsTcpStream, Stream};
use ws;
use ws::receiver::Receiver as ReceiverTrait;
use ws::receiver::{DataFrameIterator, MessageIterator};
pub struct Reader<R>
where
R: Read,
{
pub stream: BufReader<R>,
pub receiver: Receiver,
}
impl<R> Reader<R>
where
R: Read,
{
pub fn recv_dataframe(&mut self) -> WebSocketResult<DataFrame> {
self.receiver.recv_dataframe(&mut self.stream)
}
pub fn incoming_dataframes(&mut self) -> DataFrameIterator<Receiver, BufReader<R>> {
self.receiver.incoming_dataframes(&mut self.stream)
}
pub fn recv_message(&mut self) -> WebSocketResult<OwnedMessage> {
self.receiver.recv_message(&mut self.stream)
}
pub fn incoming_messages<'a>(&'a mut self) -> MessageIterator<'a, Receiver, BufReader<R>> {
self.receiver.incoming_messages(&mut self.stream)
}
}
impl<S> Reader<S>
where
S: AsTcpStream + Stream + Read,
{
pub fn shutdown(&self) -> IoResult<()> {
self.stream.get_ref().as_tcp().shutdown(Shutdown::Read)
}
pub fn shutdown_all(&self) -> IoResult<()> {
self.stream.get_ref().as_tcp().shutdown(Shutdown::Both)
}
}
pub struct Receiver {
buffer: Vec<DataFrame>,
mask: bool,
}
impl Receiver {
pub fn new(mask: bool) -> Receiver {
Receiver {
buffer: Vec::new(),
mask,
}
}
}
impl ws::Receiver for Receiver {
type F = DataFrame;
type M = OwnedMessage;
fn recv_dataframe<R>(&mut self, reader: &mut R) -> WebSocketResult<DataFrame>
where
R: Read,
{
DataFrame::read_dataframe(reader, self.mask)
}
fn recv_message_dataframes<R>(&mut self, reader: &mut R) -> WebSocketResult<Vec<DataFrame>>
where
R: Read,
{
let mut finished = if self.buffer.is_empty() {
let first = self.recv_dataframe(reader)?;
if first.opcode == Opcode::Continuation {
return Err(WebSocketError::ProtocolError(
"Unexpected continuation data frame opcode",
));
}
let finished = first.finished;
self.buffer.push(first);
finished
} else {
false
};
while !finished {
let next = self.recv_dataframe(reader)?;
finished = next.finished;
match next.opcode as u8 {
0 => self.buffer.push(next),
8...15 => {
return Ok(vec![next]);
}
_ => {
return Err(WebSocketError::ProtocolError(
"Unexpected data frame opcode",
))
}
}
}
Ok(::std::mem::replace(&mut self.buffer, Vec::new()))
}
}