Crow  1.1
A C++ microframework for the web
 
Loading...
Searching...
No Matches
websocket.h
1#pragma once
2#include <array>
3#include "crow/logging.h"
4#include "crow/socket_adaptors.h"
5#include "crow/http_request.h"
6#include "crow/TinySHA1.hpp"
7#include "crow/utility.h"
8
9namespace crow // NOTE: Already documented in "crow/app.h"
10{
11#ifdef CROW_USE_BOOST
12 namespace asio = boost::asio;
13 using error_code = boost::system::error_code;
14#else
15 using error_code = asio::error_code;
16#endif
17
18 /**
19 * \namespace crow::websocket
20 * \brief Namespace that includes the \ref Connection class
21 * and \ref connection struct. Useful for WebSockets connection.
22 *
23 * Used specially in crow/websocket.h, crow/app.h and crow/routing.h
24 */
25 namespace websocket
26 {
27 enum class WebSocketReadState
28 {
29 MiniHeader,
30 Len16,
31 Len64,
32 Mask,
33 Payload,
34 };
35
36 /// A base class for websocket connection.
38 {
39 virtual void send_binary(std::string msg) = 0;
40 virtual void send_text(std::string msg) = 0;
41 virtual void send_ping(std::string msg) = 0;
42 virtual void send_pong(std::string msg) = 0;
43 virtual void close(std::string const& msg = "quit") = 0;
44 virtual std::string get_remote_ip() = 0;
45 virtual ~connection() = default;
46
47 void userdata(void* u) { userdata_ = u; }
48 void* userdata() { return userdata_; }
49
50 private:
51 void* userdata_;
52 };
53
54 // Modified version of the illustration in RFC6455 Section-5.2
55 //
56 //
57 // 0 1 2 3 -byte
58 // 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 -bit
59 // +-+-+-+-+-------+-+-------------+-------------------------------+
60 // |F|R|R|R| opcode|M| Payload len | Extended payload length |
61 // |I|S|S|S| (4) |A| (7) | (16/64) |
62 // |N|V|V|V| |S| | (if payload len==126/127) |
63 // | |1|2|3| |K| | |
64 // +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
65 // | Extended payload length continued, if payload len == 127 |
66 // + - - - - - - - - - - - - - - - +-------------------------------+
67 // | |Masking-key, if MASK set to 1 |
68 // +-------------------------------+-------------------------------+
69 // | Masking-key (continued) | Payload Data |
70 // +-------------------------------- - - - - - - - - - - - - - - - +
71 // : Payload Data continued ... :
72 // + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
73 // | Payload Data continued ... |
74 // +---------------------------------------------------------------+
75 //
76
77 /// A websocket connection.
78
79 template<typename Adaptor, typename Handler>
80 class Connection : public connection
81 {
82 public:
83 /// Constructor for a connection.
84
85 ///
86 /// Requires a request with an "Upgrade: websocket" header.<br>
87 /// Automatically handles the handshake.
88 Connection(const crow::request& req, Adaptor&& adaptor, Handler* handler, uint64_t max_payload,
89 std::function<void(crow::websocket::connection&)> open_handler,
90 std::function<void(crow::websocket::connection&, const std::string&, bool)> message_handler,
91 std::function<void(crow::websocket::connection&, const std::string&)> close_handler,
92 std::function<void(crow::websocket::connection&, const std::string&)> error_handler,
93 std::function<bool(const crow::request&, void**)> accept_handler):
94 adaptor_(std::move(adaptor)),
95 handler_(handler),
96 max_payload_bytes_(max_payload),
97 open_handler_(std::move(open_handler)),
98 message_handler_(std::move(message_handler)),
99 close_handler_(std::move(close_handler)),
100 error_handler_(std::move(error_handler)),
101 accept_handler_(std::move(accept_handler))
102 {
103 if (!utility::string_equals(req.get_header_value("upgrade"), "websocket"))
104 {
105 adaptor_.close();
106 handler_->remove_websocket(this);
107 delete this;
108 return;
109 }
110
111 if (accept_handler_)
112 {
113 void* ud = nullptr;
114 if (!accept_handler_(req, &ud))
115 {
116 adaptor_.close();
117 handler_->remove_websocket(this);
118 delete this;
119 return;
120 }
121 userdata(ud);
122 }
123
124 // Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
125 // Sec-WebSocket-Version: 13
126 std::string magic = req.get_header_value("Sec-WebSocket-Key") + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
127 sha1::SHA1 s;
128 s.processBytes(magic.data(), magic.size());
129 uint8_t digest[20];
130 s.getDigestBytes(digest);
131
132 start(crow::utility::base64encode((unsigned char*)digest, 20));
133 }
134
135 ~Connection() noexcept override
136 {
137 // Do not modify anchor_ here since writing shared_ptr is not atomic.
138 auto watch = std::weak_ptr<void>{anchor_};
139
140 // Wait until all unhandled asynchronous operations to join.
141 // As the deletion occurs inside 'check_destroy()', which already locks
142 // anchor, use count can be 1 on valid deletion context.
143 while (watch.use_count() > 2) // 1 for 'check_destroy() routine', 1 for 'this->anchor_'
144 {
145 std::this_thread::yield();
146 }
147 }
148
149 template<typename Callable>
151 {
152 Callable callable;
153 std::weak_ptr<void> watch;
154
155 void operator()()
156 {
157 if (auto anchor = watch.lock())
158 {
159 std::move(callable)();
160 }
161 }
162 };
163
164 /// Send data through the socket.
165 template<typename CompletionHandler>
166 void dispatch(CompletionHandler&& handler)
167 {
168 asio::dispatch(adaptor_.get_io_service(),
169 WeakWrappedMessage<typename std::decay<CompletionHandler>::type>{
170 std::forward<CompletionHandler>(handler), anchor_});
171 }
172
173 /// Send data through the socket and return immediately.
174 template<typename CompletionHandler>
175 void post(CompletionHandler&& handler)
176 {
177 asio::post(adaptor_.get_io_service(),
178 WeakWrappedMessage<typename std::decay<CompletionHandler>::type>{
179 std::forward<CompletionHandler>(handler), anchor_});
180 }
181
182 /// Send a "Ping" message.
183
184 ///
185 /// Usually invoked to check if the other point is still online.
186 void send_ping(std::string msg) override
187 {
188 send_data(0x9, std::move(msg));
189 }
190
191 /// Send a "Pong" message.
192
193 ///
194 /// Usually automatically invoked as a response to a "Ping" message.
195 void send_pong(std::string msg) override
196 {
197 send_data(0xA, std::move(msg));
198 }
199
200 /// Send a binary encoded message.
201 void send_binary(std::string msg) override
202 {
203 send_data(0x2, std::move(msg));
204 }
205
206 /// Send a plaintext message.
207 void send_text(std::string msg) override
208 {
209 send_data(0x1, std::move(msg));
210 }
211
212 /// Send a close signal.
213
214 ///
215 /// Sets a flag to destroy the object once the message is sent.
216 void close(std::string const& msg) override
217 {
218 dispatch([this, msg]() mutable {
219 has_sent_close_ = true;
220 if (has_recv_close_ && !is_close_handler_called_)
221 {
222 is_close_handler_called_ = true;
223 if (close_handler_)
224 close_handler_(*this, msg);
225 }
226 auto header = build_header(0x8, msg.size());
227 write_buffers_.emplace_back(std::move(header));
228 write_buffers_.emplace_back(msg);
229 do_write();
230 });
231 }
232
233 std::string get_remote_ip() override
234 {
235 return adaptor_.remote_endpoint().address().to_string();
236 }
237
238 void set_max_payload_size(uint64_t payload)
239 {
240 max_payload_bytes_ = payload;
241 }
242
243 protected:
244 /// Generate the websocket headers using an opcode and the message size (in bytes).
245 std::string build_header(int opcode, size_t size)
246 {
247 char buf[2 + 8] = "\x80\x00";
248 buf[0] += opcode;
249 if (size < 126)
250 {
251 buf[1] += static_cast<char>(size);
252 return {buf, buf + 2};
253 }
254 else if (size < 0x10000)
255 {
256 buf[1] += 126;
257 *(uint16_t*)(buf + 2) = htons(static_cast<uint16_t>(size));
258 return {buf, buf + 4};
259 }
260 else
261 {
262 buf[1] += 127;
263 *reinterpret_cast<uint64_t*>(buf + 2) = ((1 == htonl(1)) ? static_cast<uint64_t>(size) : (static_cast<uint64_t>(htonl((size)&0xFFFFFFFF)) << 32) | htonl(static_cast<uint64_t>(size) >> 32));
264 return {buf, buf + 10};
265 }
266 }
267
268 /// Send the HTTP upgrade response.
269
270 ///
271 /// Finishes the handshake process, then starts reading messages from the socket.
272 void start(std::string&& hello)
273 {
274 static const std::string header =
275 "HTTP/1.1 101 Switching Protocols\r\n"
276 "Upgrade: websocket\r\n"
277 "Connection: Upgrade\r\n"
278 "Sec-WebSocket-Accept: ";
279 write_buffers_.emplace_back(header);
280 write_buffers_.emplace_back(std::move(hello));
281 write_buffers_.emplace_back(crlf);
282 write_buffers_.emplace_back(crlf);
283 do_write();
284 if (open_handler_)
285 open_handler_(*this);
286 do_read();
287 }
288
289 /// Read a websocket message.
290
291 ///
292 /// Involves:<br>
293 /// Handling headers (opcodes, size).<br>
294 /// Unmasking the payload.<br>
295 /// Reading the actual payload.<br>
296 void do_read()
297 {
298 if (has_sent_close_ && has_recv_close_)
299 {
300 close_connection_ = true;
301 adaptor_.shutdown_readwrite();
302 adaptor_.close();
304 return;
305 }
306
307 is_reading = true;
308 switch (state_)
309 {
310 case WebSocketReadState::MiniHeader:
311 {
312 mini_header_ = 0;
313 //asio::async_read(adaptor_.socket(), asio::buffer(&mini_header_, 1),
314 adaptor_.socket().async_read_some(
315 asio::buffer(&mini_header_, 2),
316 [this](const error_code& ec, std::size_t
317#ifdef CROW_ENABLE_DEBUG
318 bytes_transferred
319#endif
320 )
321
322 {
323 is_reading = false;
324 mini_header_ = ntohs(mini_header_);
325#ifdef CROW_ENABLE_DEBUG
326
327 if (!ec && bytes_transferred != 2)
328 {
329 throw std::runtime_error("WebSocket:MiniHeader:async_read fail:asio bug?");
330 }
331#endif
332
333 if (!ec)
334 {
335 if ((mini_header_ & 0x80) == 0x80)
336 has_mask_ = true;
337 else //if the websocket specification is enforced and the message isn't masked, terminate the connection
338 {
339#ifndef CROW_ENFORCE_WS_SPEC
340 has_mask_ = false;
341#else
342 close_connection_ = true;
343 adaptor_.shutdown_readwrite();
344 adaptor_.close();
345 if (error_handler_)
346 error_handler_(*this, "Client connection not masked.");
348#endif
349 }
350
351 if ((mini_header_ & 0x7f) == 127)
352 {
353 state_ = WebSocketReadState::Len64;
354 }
355 else if ((mini_header_ & 0x7f) == 126)
356 {
357 state_ = WebSocketReadState::Len16;
358 }
359 else
360 {
361 remaining_length_ = mini_header_ & 0x7f;
362 state_ = WebSocketReadState::Mask;
363 }
364 do_read();
365 }
366 else
367 {
368 close_connection_ = true;
369 adaptor_.shutdown_readwrite();
370 adaptor_.close();
371 if (error_handler_)
372 error_handler_(*this, ec.message());
374 }
375 });
376 }
377 break;
378 case WebSocketReadState::Len16:
379 {
380 remaining_length_ = 0;
381 remaining_length16_ = 0;
382 asio::async_read(
383 adaptor_.socket(), asio::buffer(&remaining_length16_, 2),
384 [this](const error_code& ec, std::size_t
385#ifdef CROW_ENABLE_DEBUG
386 bytes_transferred
387#endif
388 ) {
389 is_reading = false;
390 remaining_length16_ = ntohs(remaining_length16_);
391 remaining_length_ = remaining_length16_;
392#ifdef CROW_ENABLE_DEBUG
393 if (!ec && bytes_transferred != 2)
394 {
395 throw std::runtime_error("WebSocket:Len16:async_read fail:asio bug?");
396 }
397#endif
398
399 if (!ec)
400 {
401 state_ = WebSocketReadState::Mask;
402 do_read();
403 }
404 else
405 {
406 close_connection_ = true;
407 adaptor_.shutdown_readwrite();
408 adaptor_.close();
409 if (error_handler_)
410 error_handler_(*this, ec.message());
412 }
413 });
414 }
415 break;
416 case WebSocketReadState::Len64:
417 {
418 asio::async_read(
419 adaptor_.socket(), asio::buffer(&remaining_length_, 8),
420 [this](const error_code& ec, std::size_t
421#ifdef CROW_ENABLE_DEBUG
422 bytes_transferred
423#endif
424 ) {
425 is_reading = false;
426 remaining_length_ = ((1 == ntohl(1)) ? (remaining_length_) : (static_cast<uint64_t>(ntohl((remaining_length_)&0xFFFFFFFF)) << 32) | ntohl((remaining_length_) >> 32));
427#ifdef CROW_ENABLE_DEBUG
428 if (!ec && bytes_transferred != 8)
429 {
430 throw std::runtime_error("WebSocket:Len16:async_read fail:asio bug?");
431 }
432#endif
433
434 if (!ec)
435 {
436 state_ = WebSocketReadState::Mask;
437 do_read();
438 }
439 else
440 {
441 close_connection_ = true;
442 adaptor_.shutdown_readwrite();
443 adaptor_.close();
444 if (error_handler_)
445 error_handler_(*this, ec.message());
447 }
448 });
449 }
450 break;
451 case WebSocketReadState::Mask:
452 if (remaining_length_ > max_payload_bytes_)
453 {
454 close_connection_ = true;
455 adaptor_.close();
456 if (error_handler_)
457 error_handler_(*this, "Message length exceeds maximum payload.");
459 }
460 else if (has_mask_)
461 {
462 asio::async_read(
463 adaptor_.socket(), asio::buffer((char*)&mask_, 4),
464 [this](const error_code& ec, std::size_t
465#ifdef CROW_ENABLE_DEBUG
466 bytes_transferred
467#endif
468 ) {
469 is_reading = false;
470#ifdef CROW_ENABLE_DEBUG
471 if (!ec && bytes_transferred != 4)
472 {
473 throw std::runtime_error("WebSocket:Mask:async_read fail:asio bug?");
474 }
475#endif
476
477 if (!ec)
478 {
479 state_ = WebSocketReadState::Payload;
480 do_read();
481 }
482 else
483 {
484 close_connection_ = true;
485 if (error_handler_)
486 error_handler_(*this, ec.message());
487 adaptor_.shutdown_readwrite();
488 adaptor_.close();
490 }
491 });
492 }
493 else
494 {
495 state_ = WebSocketReadState::Payload;
496 do_read();
497 }
498 break;
499 case WebSocketReadState::Payload:
500 {
501 auto to_read = static_cast<std::uint64_t>(buffer_.size());
502 if (remaining_length_ < to_read)
503 to_read = remaining_length_;
504 adaptor_.socket().async_read_some(
505 asio::buffer(buffer_, static_cast<std::size_t>(to_read)),
506 [this](const error_code& ec, std::size_t bytes_transferred) {
507 is_reading = false;
508
509 if (!ec)
510 {
511 fragment_.insert(fragment_.end(), buffer_.begin(), buffer_.begin() + bytes_transferred);
512 remaining_length_ -= bytes_transferred;
513 if (remaining_length_ == 0)
514 {
515 if (handle_fragment())
516 {
517 state_ = WebSocketReadState::MiniHeader;
518 do_read();
519 }
520 }
521 else
522 do_read();
523 }
524 else
525 {
526 close_connection_ = true;
527 if (error_handler_)
528 error_handler_(*this, ec.message());
529 adaptor_.shutdown_readwrite();
530 adaptor_.close();
531 check_destroy();
532 }
533 });
534 }
535 break;
536 }
537 }
538
539 /// Check if the FIN bit is set.
540 bool is_FIN()
541 {
542 return mini_header_ & 0x8000;
543 }
544
545 /// Extract the opcode from the header.
546 int opcode()
547 {
548 return (mini_header_ & 0x0f00) >> 8;
549 }
550
551 /// Process the payload fragment.
552
553 ///
554 /// Unmasks the fragment, checks the opcode, merges fragments into 1 message body, and calls the appropriate handler.
556 {
557 if (has_mask_)
558 {
559 for (decltype(fragment_.length()) i = 0; i < fragment_.length(); i++)
560 {
561 fragment_[i] ^= ((char*)&mask_)[i % 4];
562 }
563 }
564 switch (opcode())
565 {
566 case 0: // Continuation
567 {
568 message_ += fragment_;
569 if (is_FIN())
570 {
571 if (message_handler_)
572 message_handler_(*this, message_, is_binary_);
573 message_.clear();
574 }
575 }
576 break;
577 case 1: // Text
578 {
579 is_binary_ = false;
580 message_ += fragment_;
581 if (is_FIN())
582 {
583 if (message_handler_)
584 message_handler_(*this, message_, is_binary_);
585 message_.clear();
586 }
587 }
588 break;
589 case 2: // Binary
590 {
591 is_binary_ = true;
592 message_ += fragment_;
593 if (is_FIN())
594 {
595 if (message_handler_)
596 message_handler_(*this, message_, is_binary_);
597 message_.clear();
598 }
599 }
600 break;
601 case 0x8: // Close
602 {
603 has_recv_close_ = true;
604 if (!has_sent_close_)
605 {
606 close(fragment_);
607 }
608 else
609 {
610 adaptor_.shutdown_readwrite();
611 adaptor_.close();
612 close_connection_ = true;
613 if (!is_close_handler_called_)
614 {
615 if (close_handler_)
616 close_handler_(*this, fragment_);
617 is_close_handler_called_ = true;
618 }
619 check_destroy();
620 return false;
621 }
622 }
623 break;
624 case 0x9: // Ping
625 {
626 send_pong(fragment_);
627 }
628 break;
629 case 0xA: // Pong
630 {
631 pong_received_ = true;
632 }
633 break;
634 }
635
636 fragment_.clear();
637 return true;
638 }
639
640 /// Send the buffers' data through the socket.
641
642 ///
643 /// Also destroys the object if the Close flag is set.
644 void do_write()
645 {
646 if (sending_buffers_.empty())
647 {
648 sending_buffers_.swap(write_buffers_);
649 std::vector<asio::const_buffer> buffers;
650 buffers.reserve(sending_buffers_.size());
651 for (auto& s : sending_buffers_)
652 {
653 buffers.emplace_back(asio::buffer(s));
654 }
655 auto watch = std::weak_ptr<void>{anchor_};
656 asio::async_write(
657 adaptor_.socket(), buffers,
658 [&, watch](const error_code& ec, std::size_t /*bytes_transferred*/) {
659 if (!ec && !close_connection_)
660 {
661 sending_buffers_.clear();
662 if (!write_buffers_.empty())
663 do_write();
664 if (has_sent_close_)
665 close_connection_ = true;
666 }
667 else
668 {
669 auto anchor = watch.lock();
670 if (anchor == nullptr) { return; }
671
672 sending_buffers_.clear();
673 close_connection_ = true;
674 check_destroy();
675 }
676 });
677 }
678 }
679
680 /// Destroy the Connection.
682 {
683 //if (has_sent_close_ && has_recv_close_)
684 if (!is_close_handler_called_)
685 if (close_handler_)
686 close_handler_(*this, "uncleanly");
687 handler_->remove_websocket(this);
688 if (sending_buffers_.empty() && !is_reading)
689 delete this;
690 }
691
692
694 {
695 std::string payload;
696 Connection* self;
697 int opcode;
698
699 void operator()()
700 {
701 self->send_data_impl(this);
702 }
703 };
704
705 void send_data_impl(SendMessageType* s)
706 {
707 auto header = build_header(s->opcode, s->payload.size());
708 write_buffers_.emplace_back(std::move(header));
709 write_buffers_.emplace_back(std::move(s->payload));
710 do_write();
711 }
712
713 void send_data(int opcode, std::string&& msg)
714 {
715 SendMessageType event_arg{
716 std::move(msg),
717 this,
718 opcode};
719
720 post(std::move(event_arg));
721 }
722
723 private:
724 Adaptor adaptor_;
725 Handler* handler_;
726
727 std::vector<std::string> sending_buffers_;
728 std::vector<std::string> write_buffers_;
729
730 std::array<char, 4096> buffer_;
731 bool is_binary_;
732 std::string message_;
733 std::string fragment_;
734 WebSocketReadState state_{WebSocketReadState::MiniHeader};
735 uint16_t remaining_length16_{0};
736 uint64_t remaining_length_{0};
737 uint64_t max_payload_bytes_{UINT64_MAX};
738 bool close_connection_{false};
739 bool is_reading{false};
740 bool has_mask_{false};
741 uint32_t mask_;
742 uint16_t mini_header_;
743 bool has_sent_close_{false};
744 bool has_recv_close_{false};
745 bool error_occurred_{false};
746 bool pong_received_{false};
747 bool is_close_handler_called_{false};
748
749 std::shared_ptr<void> anchor_ = std::make_shared<int>(); // Value is just for placeholding
750
751 std::function<void(crow::websocket::connection&)> open_handler_;
752 std::function<void(crow::websocket::connection&, const std::string&, bool)> message_handler_;
753 std::function<void(crow::websocket::connection&, const std::string&)> close_handler_;
754 std::function<void(crow::websocket::connection&, const std::string&)> error_handler_;
755 std::function<bool(const crow::request&, void**)> accept_handler_;
756 };
757 } // namespace websocket
758} // namespace crow
TinySHA1 - a header only implementation of the SHA1 algorithm in C++. Based on the implementation in ...
A websocket connection.
Definition websocket.h:81
void dispatch(CompletionHandler &&handler)
Send data through the socket.
Definition websocket.h:166
void do_read()
Read a websocket message.
Definition websocket.h:296
void send_pong(std::string msg) override
Send a "Pong" message.
Definition websocket.h:195
bool handle_fragment()
Process the payload fragment.
Definition websocket.h:555
std::string build_header(int opcode, size_t size)
Generate the websocket headers using an opcode and the message size (in bytes).
Definition websocket.h:245
void send_text(std::string msg) override
Send a plaintext message.
Definition websocket.h:207
void close(std::string const &msg) override
Send a close signal.
Definition websocket.h:216
int opcode()
Extract the opcode from the header.
Definition websocket.h:546
void do_write()
Send the buffers' data through the socket.
Definition websocket.h:644
void check_destroy()
Destroy the Connection.
Definition websocket.h:681
void start(std::string &&hello)
Send the HTTP upgrade response.
Definition websocket.h:272
bool is_FIN()
Check if the FIN bit is set.
Definition websocket.h:540
void send_ping(std::string msg) override
Send a "Ping" message.
Definition websocket.h:186
void send_binary(std::string msg) override
Send a binary encoded message.
Definition websocket.h:201
Connection(const crow::request &req, Adaptor &&adaptor, Handler *handler, uint64_t max_payload, std::function< void(crow::websocket::connection &)> open_handler, std::function< void(crow::websocket::connection &, const std::string &, bool)> message_handler, std::function< void(crow::websocket::connection &, const std::string &)> close_handler, std::function< void(crow::websocket::connection &, const std::string &)> error_handler, std::function< bool(const crow::request &, void **)> accept_handler)
Constructor for a connection.
Definition websocket.h:88
void post(CompletionHandler &&handler)
Send data through the socket and return immediately.
Definition websocket.h:175
A tiny SHA1 algorithm implementation used internally in the Crow server (specifically in crow/websock...
Definition TinySHA1.hpp:48
The main namespace of the library. In this namespace is defined the most important classes and functi...
An HTTP request.
Definition http_request.h:36
A base class for websocket connection.
Definition websocket.h:38