Crow  0.3
A C++ microframework for the web
websocket.h
1#pragma once
2#include <boost/algorithm/string/predicate.hpp>
3#include <boost/array.hpp>
4#include "crow/socket_adaptors.h"
5#include "crow/http_request.h"
6#include "crow/TinySHA1.hpp"
7
8namespace crow
9{
10 namespace websocket
11 {
12 enum class WebSocketReadState
13 {
14 MiniHeader,
15 Len16,
16 Len64,
17 Mask,
18 Payload,
19 };
20
21 ///A base class for websocket connection.
23 {
24 virtual void send_binary(const std::string& msg) = 0;
25 virtual void send_text(const std::string& msg) = 0;
26 virtual void send_ping(const std::string& msg) = 0;
27 virtual void send_pong(const std::string& msg) = 0;
28 virtual void close(const std::string& msg = "quit") = 0;
29 virtual ~connection(){}
30
31 void userdata(void* u) { userdata_ = u; }
32 void* userdata() { return userdata_; }
33
34 private:
35 void* userdata_;
36 };
37
38 // 0 1 2 3 -byte
39 // 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 -bit
40 // +-+-+-+-+-------+-+-------------+-------------------------------+
41 // |F|R|R|R| opcode|M| Payload len | Extended payload length |
42 // |I|S|S|S| (4) |A| (7) | (16/64) |
43 // |N|V|V|V| |S| | (if payload len==126/127) |
44 // | |1|2|3| |K| | |
45 // +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
46 // | Extended payload length continued, if payload len == 127 |
47 // + - - - - - - - - - - - - - - - +-------------------------------+
48 // | |Masking-key, if MASK set to 1 |
49 // +-------------------------------+-------------------------------+
50 // | Masking-key (continued) | Payload Data |
51 // +-------------------------------- - - - - - - - - - - - - - - - +
52 // : Payload Data continued ... :
53 // + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
54 // | Payload Data continued ... |
55 // +---------------------------------------------------------------+
56
57 /// A websocket connection.
58 template <typename Adaptor>
59 class Connection : public connection
60 {
61 public:
62 /// Constructor for a connection.
63
64 ///
65 /// Requires a request with an "Upgrade: websocket" header.<br>
66 /// Automatically handles the handshake.
67 Connection(const crow::request& req, Adaptor&& adaptor,
68 std::function<void(crow::websocket::connection&)> open_handler,
69 std::function<void(crow::websocket::connection&, const std::string&, bool)> message_handler,
70 std::function<void(crow::websocket::connection&, const std::string&)> close_handler,
71 std::function<void(crow::websocket::connection&)> error_handler,
72 std::function<bool(const crow::request&)> accept_handler)
73 : adaptor_(std::move(adaptor)), open_handler_(std::move(open_handler)), message_handler_(std::move(message_handler)), close_handler_(std::move(close_handler)), error_handler_(std::move(error_handler))
74 , accept_handler_(std::move(accept_handler))
75 {
76 if (!boost::iequals(req.get_header_value("upgrade"), "websocket"))
77 {
78 adaptor.close();
79 delete this;
80 return;
81 }
82
83 if (accept_handler_)
84 {
85 if (!accept_handler_(req))
86 {
87 adaptor.close();
88 delete this;
89 return;
90 }
91 }
92
93 // Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
94 // Sec-WebSocket-Version: 13
95 std::string magic = req.get_header_value("Sec-WebSocket-Key") + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
96 sha1::SHA1 s;
97 s.processBytes(magic.data(), magic.size());
98 uint8_t digest[20];
99 s.getDigestBytes(digest);
100 start(crow::utility::base64encode((char*)digest, 20));
101 }
102
103 /// Send data through the socket.
104 template<typename CompletionHandler>
105 void dispatch(CompletionHandler handler)
106 {
107 adaptor_.get_io_service().dispatch(handler);
108 }
109
110 /// Send data through the socket and return immediately.
111 template<typename CompletionHandler>
112 void post(CompletionHandler handler)
113 {
114 adaptor_.get_io_service().post(handler);
115 }
116
117 /// Send a "Ping" message.
118
119 ///
120 /// Usually invoked to check if the other point is still online.
121 void send_ping(const std::string& msg) override
122 {
123 dispatch([this, msg]{
124 auto header = build_header(0x9, msg.size());
125 write_buffers_.emplace_back(std::move(header));
126 write_buffers_.emplace_back(msg);
127 do_write();
128 });
129 }
130
131 /// Send a "Pong" message.
132
133 ///
134 /// Usually automatically invoked as a response to a "Ping" message.
135 void send_pong(const std::string& msg) override
136 {
137 dispatch([this, msg]{
138 auto header = build_header(0xA, msg.size());
139 write_buffers_.emplace_back(std::move(header));
140 write_buffers_.emplace_back(msg);
141 do_write();
142 });
143 }
144
145 /// Send a binary encoded message.
146 void send_binary(const std::string& msg) override
147 {
148 dispatch([this, msg]{
149 auto header = build_header(2, msg.size());
150 write_buffers_.emplace_back(std::move(header));
151 write_buffers_.emplace_back(msg);
152 do_write();
153 });
154 }
155
156 /// Send a plaintext message.
157 void send_text(const std::string& msg) override
158 {
159 dispatch([this, msg]{
160 auto header = build_header(1, msg.size());
161 write_buffers_.emplace_back(std::move(header));
162 write_buffers_.emplace_back(msg);
163 do_write();
164 });
165 }
166
167 /// Send a close signal.
168
169 ///
170 /// Sets a flag to destroy the object once the message is sent.
171 void close(const std::string& msg) override
172 {
173 dispatch([this, msg]{
174 has_sent_close_ = true;
175 if (has_recv_close_ && !is_close_handler_called_)
176 {
177 is_close_handler_called_ = true;
178 if (close_handler_)
179 close_handler_(*this, msg);
180 }
181 auto header = build_header(0x8, msg.size());
182 write_buffers_.emplace_back(std::move(header));
183 write_buffers_.emplace_back(msg);
184 do_write();
185 });
186 }
187
188 protected:
189
190 /// Generate the websocket headers using an opcode and the message size (in bytes).
191 std::string build_header(int opcode, size_t size)
192 {
193 char buf[2+8] = "\x80\x00";
194 buf[0] += opcode;
195 if (size < 126)
196 {
197 buf[1] += static_cast<char>(size);
198 return {buf, buf+2};
199 }
200 else if (size < 0x10000)
201 {
202 buf[1] += 126;
203 *(uint16_t*)(buf+2) = htons(static_cast<uint16_t>(size));
204 return {buf, buf+4};
205 }
206 else
207 {
208 buf[1] += 127;
209 *reinterpret_cast<uint64_t*>(buf+2) = ((1==htonl(1)) ? static_cast<uint64_t>(size) : (static_cast<uint64_t>(htonl((size) & 0xFFFFFFFF)) << 32) | htonl(static_cast<uint64_t>(size) >> 32));
210 return {buf, buf+10};
211 }
212 }
213
214 /// Send the HTTP upgrade response.
215
216 ///
217 /// Finishes the handshake process, then starts reading messages from the socket.
218 void start(std::string&& hello)
219 {
220 static std::string header = "HTTP/1.1 101 Switching Protocols\r\n"
221 "Upgrade: websocket\r\n"
222 "Connection: Upgrade\r\n"
223 "Sec-WebSocket-Accept: ";
224 static std::string crlf = "\r\n";
225 write_buffers_.emplace_back(header);
226 write_buffers_.emplace_back(std::move(hello));
227 write_buffers_.emplace_back(crlf);
228 write_buffers_.emplace_back(crlf);
229 do_write();
230 if (open_handler_)
231 open_handler_(*this);
232 do_read();
233 }
234
235 /// Read a websocket message.
236
237 ///
238 /// Involves:<br>
239 /// Handling headers (opcodes, size).<br>
240 /// Unmasking the payload.<br>
241 /// Reading the actual payload.<br>
242 void do_read()
243 {
244 is_reading = true;
245 switch(state_)
246 {
247 case WebSocketReadState::MiniHeader:
248 {
249 mini_header_ = 0;
250 //boost::asio::async_read(adaptor_.socket(), boost::asio::buffer(&mini_header_, 1),
251 adaptor_.socket().async_read_some(boost::asio::buffer(&mini_header_, 2),
252 [this](const boost::system::error_code& ec, std::size_t
253#ifdef CROW_ENABLE_DEBUG
254 bytes_transferred
255#endif
256 )
257
258 {
259 is_reading = false;
260 mini_header_ = ntohs(mini_header_);
261#ifdef CROW_ENABLE_DEBUG
262
263 if (!ec && bytes_transferred != 2)
264 {
265 throw std::runtime_error("WebSocket:MiniHeader:async_read fail:asio bug?");
266 }
267#endif
268
269 if (!ec)
270 {
271 if ((mini_header_ & 0x80) == 0x80)
272 has_mask_ = true;
273
274 if ((mini_header_ & 0x7f) == 127)
275 {
276 state_ = WebSocketReadState::Len64;
277 }
278 else if ((mini_header_ & 0x7f) == 126)
279 {
280 state_ = WebSocketReadState::Len16;
281 }
282 else
283 {
284 remaining_length_ = mini_header_ & 0x7f;
285 state_ = WebSocketReadState::Mask;
286 }
287 do_read();
288 }
289 else
290 {
291 close_connection_ = true;
292 adaptor_.close();
293 if (error_handler_)
294 error_handler_(*this);
296 }
297 });
298 }
299 break;
300 case WebSocketReadState::Len16:
301 {
302 remaining_length_ = 0;
303 remaining_length16_ = 0;
304 boost::asio::async_read(adaptor_.socket(), boost::asio::buffer(&remaining_length16_, 2),
305 [this](const boost::system::error_code& ec, std::size_t
306#ifdef CROW_ENABLE_DEBUG
307 bytes_transferred
308#endif
309 )
310 {
311 is_reading = false;
312 remaining_length16_ = ntohs(remaining_length16_);
313 remaining_length_ = remaining_length16_;
314#ifdef CROW_ENABLE_DEBUG
315 if (!ec && bytes_transferred != 2)
316 {
317 throw std::runtime_error("WebSocket:Len16:async_read fail:asio bug?");
318 }
319#endif
320
321 if (!ec)
322 {
323 state_ = WebSocketReadState::Mask;
324 do_read();
325 }
326 else
327 {
328 close_connection_ = true;
329 adaptor_.close();
330 if (error_handler_)
331 error_handler_(*this);
333 }
334 });
335 }
336 break;
337 case WebSocketReadState::Len64:
338 {
339 boost::asio::async_read(adaptor_.socket(), boost::asio::buffer(&remaining_length_, 8),
340 [this](const boost::system::error_code& ec, std::size_t
341#ifdef CROW_ENABLE_DEBUG
342 bytes_transferred
343#endif
344 )
345 {
346 is_reading = false;
347 remaining_length_ = ((1==ntohl(1)) ? (remaining_length_) : (static_cast<uint64_t>(ntohl((remaining_length_) & 0xFFFFFFFF)) << 32) | ntohl((remaining_length_) >> 32));
348#ifdef CROW_ENABLE_DEBUG
349 if (!ec && bytes_transferred != 8)
350 {
351 throw std::runtime_error("WebSocket:Len16:async_read fail:asio bug?");
352 }
353#endif
354
355 if (!ec)
356 {
357 state_ = WebSocketReadState::Mask;
358 do_read();
359 }
360 else
361 {
362 close_connection_ = true;
363 adaptor_.close();
364 if (error_handler_)
365 error_handler_(*this);
367 }
368 });
369 }
370 break;
371 case WebSocketReadState::Mask:
372 if (has_mask_)
373 {
374 boost::asio::async_read(adaptor_.socket(), boost::asio::buffer((char*)&mask_, 4),
375 [this](const boost::system::error_code& ec, std::size_t
376#ifdef CROW_ENABLE_DEBUG
377 bytes_transferred
378#endif
379 )
380 {
381 is_reading = false;
382#ifdef CROW_ENABLE_DEBUG
383 if (!ec && bytes_transferred != 4)
384 {
385 throw std::runtime_error("WebSocket:Mask:async_read fail:asio bug?");
386 }
387#endif
388
389 if (!ec)
390 {
391 state_ = WebSocketReadState::Payload;
392 do_read();
393 }
394 else
395 {
396 close_connection_ = true;
397 if (error_handler_)
398 error_handler_(*this);
399 adaptor_.close();
400 }
401 });
402 }
403 else
404 {
405 state_ = WebSocketReadState::Payload;
406 do_read();
407 }
408 break;
409 case WebSocketReadState::Payload:
410 {
411 auto to_read = static_cast<std::uint64_t>(buffer_.size());
412 if (remaining_length_ < to_read)
413 to_read = remaining_length_;
414 adaptor_.socket().async_read_some(boost::asio::buffer(buffer_, static_cast<std::size_t>(to_read)),
415 [this](const boost::system::error_code& ec, std::size_t bytes_transferred)
416 {
417 is_reading = false;
418
419 if (!ec)
420 {
421 fragment_.insert(fragment_.end(), buffer_.begin(), buffer_.begin() + bytes_transferred);
422 remaining_length_ -= bytes_transferred;
423 if (remaining_length_ == 0)
424 {
425 handle_fragment();
426 state_ = WebSocketReadState::MiniHeader;
427 do_read();
428 }
429 else
430 do_read();
431 }
432 else
433 {
434 close_connection_ = true;
435 if (error_handler_)
436 error_handler_(*this);
437 adaptor_.close();
438 }
439 });
440 }
441 break;
442 }
443 }
444
445 /// Check if the FIN bit is set.
446 bool is_FIN()
447 {
448 return mini_header_ & 0x8000;
449 }
450
451 /// Extract the opcode from the header.
452 int opcode()
453 {
454 return (mini_header_ & 0x0f00) >> 8;
455 }
456
457 /// Process the payload fragment.
458
459 ///
460 /// Unmasks the fragment, checks the opcode, merges fragments into 1 message body, and calls the appropriate handler.
462 {
463 if (has_mask_)
464 {
465 for(decltype(fragment_.length()) i = 0; i < fragment_.length(); i ++)
466 {
467 fragment_[i] ^= ((char*)&mask_)[i%4];
468 }
469 }
470 switch(opcode())
471 {
472 case 0: // Continuation
473 {
474 message_ += fragment_;
475 if (is_FIN())
476 {
477 if (message_handler_)
478 message_handler_(*this, message_, is_binary_);
479 message_.clear();
480 }
481 }
482 break;
483 case 1: // Text
484 {
485 is_binary_ = false;
486 message_ += fragment_;
487 if (is_FIN())
488 {
489 if (message_handler_)
490 message_handler_(*this, message_, is_binary_);
491 message_.clear();
492 }
493 }
494 break;
495 case 2: // Binary
496 {
497 is_binary_ = true;
498 message_ += fragment_;
499 if (is_FIN())
500 {
501 if (message_handler_)
502 message_handler_(*this, message_, is_binary_);
503 message_.clear();
504 }
505 }
506 break;
507 case 0x8: // Close
508 {
509 has_recv_close_ = true;
510 if (!has_sent_close_)
511 {
512 close(fragment_);
513 }
514 else
515 {
516 adaptor_.close();
517 close_connection_ = true;
518 if (!is_close_handler_called_)
519 {
520 if (close_handler_)
521 close_handler_(*this, fragment_);
522 is_close_handler_called_ = true;
523 }
524 check_destroy();
525 }
526 }
527 break;
528 case 0x9: // Ping
529 {
530 send_pong(fragment_);
531 }
532 break;
533 case 0xA: // Pong
534 {
535 pong_received_ = true;
536 }
537 break;
538 }
539
540 fragment_.clear();
541 }
542
543 /// Send the buffers' data through the socket.
544
545 ///
546 /// Also destroyes the object if the Close flag is set.
547 void do_write()
548 {
549 if (sending_buffers_.empty())
550 {
551 sending_buffers_.swap(write_buffers_);
552 std::vector<boost::asio::const_buffer> buffers;
553 buffers.reserve(sending_buffers_.size());
554 for(auto& s:sending_buffers_)
555 {
556 buffers.emplace_back(boost::asio::buffer(s));
557 }
558 boost::asio::async_write(adaptor_.socket(), buffers,
559 [&](const boost::system::error_code& ec, std::size_t /*bytes_transferred*/)
560 {
561 sending_buffers_.clear();
562 if (!ec && !close_connection_)
563 {
564 if (!write_buffers_.empty())
565 do_write();
566 if (has_sent_close_)
567 close_connection_ = true;
568 }
569 else
570 {
571 close_connection_ = true;
572 check_destroy();
573 }
574 });
575 }
576 }
577
578 /// Destroy the Connection.
580 {
581 //if (has_sent_close_ && has_recv_close_)
582 if (!is_close_handler_called_)
583 if (close_handler_)
584 close_handler_(*this, "uncleanly");
585 if (sending_buffers_.empty() && !is_reading)
586 delete this;
587 }
588 private:
589 Adaptor adaptor_;
590
591 std::vector<std::string> sending_buffers_;
592 std::vector<std::string> write_buffers_;
593
594 boost::array<char, 4096> buffer_;
595 bool is_binary_;
596 std::string message_;
597 std::string fragment_;
598 WebSocketReadState state_{WebSocketReadState::MiniHeader};
599 uint16_t remaining_length16_{0};
600 uint64_t remaining_length_{0};
601 bool close_connection_{false};
602 bool is_reading{false};
603 bool has_mask_{false};
604 uint32_t mask_;
605 uint16_t mini_header_;
606 bool has_sent_close_{false};
607 bool has_recv_close_{false};
608 bool error_occured_{false};
609 bool pong_received_{false};
610 bool is_close_handler_called_{false};
611
612 std::function<void(crow::websocket::connection&)> open_handler_;
613 std::function<void(crow::websocket::connection&, const std::string&, bool)> message_handler_;
614 std::function<void(crow::websocket::connection&, const std::string&)> close_handler_;
615 std::function<void(crow::websocket::connection&)> error_handler_;
616 std::function<bool(const crow::request&)> accept_handler_;
617 };
618 }
619}
A websocket connection.
Definition: websocket.h:60
void check_destroy()
Destroy the Connection.
Definition: websocket.h:579
void close(const std::string &msg) override
Send a close signal.
Definition: websocket.h:171
void start(std::string &&hello)
Send the HTTP upgrade response.
Definition: websocket.h:218
void handle_fragment()
Process the payload fragment.
Definition: websocket.h:461
int opcode()
Extract the opcode from the header.
Definition: websocket.h:452
void send_ping(const std::string &msg) override
Send a "Ping" message.
Definition: websocket.h:121
void do_write()
Send the buffers' data through the socket.
Definition: websocket.h:547
void dispatch(CompletionHandler handler)
Send data through the socket.
Definition: websocket.h:105
Connection(const crow::request &req, Adaptor &&adaptor, std::function< void(crow::websocket::connection &)> open_handler, std::function< void(crow::websocket::connection &, const std::string &, bool)> message_handler, std::function< void(crow::websocket::connection &, const std::string &)> close_handler, std::function< void(crow::websocket::connection &)> error_handler, std::function< bool(const crow::request &)> accept_handler)
Constructor for a connection.
Definition: websocket.h:67
void send_text(const std::string &msg) override
Send a plaintext message.
Definition: websocket.h:157
void post(CompletionHandler handler)
Send data through the socket and return immediately.
Definition: websocket.h:112
std::string build_header(int opcode, size_t size)
Generate the websocket headers using an opcode and the message size (in bytes).
Definition: websocket.h:191
void do_read()
Read a websocket message.
Definition: websocket.h:242
void send_binary(const std::string &msg) override
Send a binary encoded message.
Definition: websocket.h:146
void send_pong(const std::string &msg) override
Send a "Pong" message.
Definition: websocket.h:135
bool is_FIN()
Check if the FIN bit is set.
Definition: websocket.h:446
An HTTP request.
Definition: http_request.h:27
A base class for websocket connection.
Definition: websocket.h:23