Crow  1.1
A C++ microframework for the web
 
Loading...
Searching...
No Matches
websocket.h
1#pragma once
2#include <array>
3#include <memory>
4#include <optional>
5#include <string>
6#include <thread>
7#include "crow/http_response.h"
8#include "crow/logging.h"
9#include "crow/socket_adaptors.h"
10#include "crow/http_request.h"
11#include "crow/TinySHA1.hpp"
12#include "crow/utility.h"
13
14namespace crow // NOTE: Already documented in "crow/app.h"
15{
16#ifdef CROW_USE_BOOST
17 namespace asio = boost::asio;
18 using error_code = boost::system::error_code;
19#else
20 using error_code = asio::error_code;
21#endif
22
23 /**
24 * \namespace crow::websocket
25 * \brief Namespace that includes the \ref Connection class
26 * and \ref connection struct. Useful for WebSockets connection.
27 *
28 * Used specially in crow/websocket.h, crow/app.h and crow/routing.h
29 */
30 namespace websocket
31 {
32 enum class WebSocketReadState
33 {
34 MiniHeader,
35 Len16,
36 Len64,
37 Mask,
38 Payload,
39 };
40
41 // Codes taken from https://www.rfc-editor.org/rfc/rfc6455#section-7.4.1
42 enum CloseStatusCode : uint16_t {
43 NormalClosure = 1000,
44 EndpointGoingAway = 1001,
45 ProtocolError = 1002,
46 UnacceptableData = 1003,
47 InconsistentData = 1007,
48 PolicyViolated = 1008,
49 MessageTooBig = 1009,
50 ExtensionsNotNegotiated = 1010,
51 UnexpectedCondition = 1011,
52
53 // Reserved for applications only, should not send/receive these to/from clients
54 NoStatusCodePresent = 1005,
55 ClosedAbnormally = 1006,
56 TLSHandshakeFailure = 1015,
57
58 StartStatusCodesForLibraries = 3000,
59 StartStatusCodesForPrivateUse = 4000,
60 // Status code should be between 1000 and 4999 inclusive
61 StartStatusCodes = NormalClosure,
62 EndStatusCodes = 4999,
63 };
64
65 /// A base class for websocket connection.
67 {
68 virtual void send_binary(std::string msg) = 0;
69 virtual void send_text(std::string msg) = 0;
70 virtual void send_ping(std::string msg) = 0;
71 virtual void send_pong(std::string msg) = 0;
72 virtual void close(std::string const& msg = "quit", uint16_t status_code = CloseStatusCode::NormalClosure) = 0;
73 virtual std::string get_remote_ip() = 0;
74 virtual std::string get_subprotocol() const = 0;
75 virtual ~connection() = default;
76
77 void userdata(void* u) { userdata_ = u; }
78 void* userdata() { return userdata_; }
79
80 private:
81 void* userdata_;
82 };
83
84 // Modified version of the illustration in RFC6455 Section-5.2
85 //
86 //
87 // 0 1 2 3 -byte
88 // 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 -bit
89 // +-+-+-+-+-------+-+-------------+-------------------------------+
90 // |F|R|R|R| opcode|M| Payload len | Extended payload length |
91 // |I|S|S|S| (4) |A| (7) | (16/64) |
92 // |N|V|V|V| |S| | (if payload len==126/127) |
93 // | |1|2|3| |K| | |
94 // +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
95 // | Extended payload length continued, if payload len == 127 |
96 // + - - - - - - - - - - - - - - - +-------------------------------+
97 // | |Masking-key, if MASK set to 1 |
98 // +-------------------------------+-------------------------------+
99 // | Masking-key (continued) | Payload Data |
100 // +-------------------------------- - - - - - - - - - - - - - - - +
101 // : Payload Data continued ... :
102 // + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
103 // | Payload Data continued ... |
104 // +---------------------------------------------------------------+
105 //
106
107 /// A websocket connection.
108
109 template<typename Adaptor, typename Handler>
110 class Connection : public connection, public std::enable_shared_from_this<Connection<Adaptor, Handler>>
111 {
112 public:
113 /// Factory for a connection.
114 ///
115 /// Requires a request with an "Upgrade: websocket" header.<br>
116 /// Automatically handles the handshake.
117 static void create(const crow::request& req, Adaptor adaptor, Handler* handler,
118 uint64_t max_payload, const std::vector<std::string>& subprotocols,
119 std::function<void(crow::websocket::connection&)> open_handler,
120 std::function<void(crow::websocket::connection&, const std::string&, bool)> message_handler,
121 std::function<void(crow::websocket::connection&, const std::string&, uint16_t)> close_handler,
122 std::function<void(crow::websocket::connection&, const std::string&)> error_handler,
123 std::function<void(const crow::request&, std::optional<crow::response>&, void**)> accept_handler,
124 bool mirror_protocols)
125 {
126 auto conn = std::shared_ptr<Connection>(new Connection(std::move(adaptor),
127 handler, max_payload,
128 std::move(open_handler),
129 std::move(message_handler),
130 std::move(close_handler),
131 std::move(error_handler),
132 std::move(accept_handler)));
133
134 // Perform handshake validation
135 if (!utility::string_equals(req.get_header_value("upgrade"), "websocket"))
136 {
137 conn->adaptor_.close();
138 return;
139 }
140
141 std::string requested_subprotocols_header = req.get_header_value("Sec-WebSocket-Protocol");
142 if (!subprotocols.empty() || !requested_subprotocols_header.empty())
143 {
144 auto requested_subprotocols = utility::split(requested_subprotocols_header, ", ");
145 auto subprotocol = utility::find_first_of(subprotocols.begin(), subprotocols.end(), requested_subprotocols.begin(), requested_subprotocols.end());
146 if (subprotocol != subprotocols.end())
147 {
148 conn->subprotocol_ = *subprotocol;
149 }
150 }
151
152 if (mirror_protocols & !requested_subprotocols_header.empty())
153 {
154 conn->subprotocol_ = requested_subprotocols_header;
155 }
156
157 if (conn->accept_handler_)
158 {
159 void* ud = nullptr;
160 std::optional<crow::response> res;
161 conn->accept_handler_(req, res, &ud);
162 if (res)
163 {
164 std::vector<asio::const_buffer> buffers;
165 auto server_name = "";
166 std::string content_length_buffer;
167 res->write_header_into_buffer(buffers, content_length_buffer, req.keep_alive, server_name);
168 buffers.emplace_back(res->body.data(), res->body.size());
169 error_code ec;
170 asio::write(conn->adaptor_.socket(), buffers, ec);
171 conn->adaptor_.close();
172 return;
173 }
174 conn->userdata(ud);
175 }
176
177 // Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
178 // Sec-WebSocket-Version: 13
179 std::string magic = req.get_header_value("Sec-WebSocket-Key") + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
180 sha1::SHA1 s;
181 s.processBytes(magic.data(), magic.size());
182 uint8_t digest[20];
183 s.getDigestBytes(digest);
184
185 conn->handler_->add_websocket(conn);
186 conn->start(crow::utility::base64encode((unsigned char*)digest, 20));
187 }
188
189 ~Connection() noexcept override = default;
190
191 template<typename Callable>
193 {
194 Callable callable;
195 std::weak_ptr<void> watch;
196
197 void operator()()
198 {
199 if (auto anchor = watch.lock())
200 {
201 std::move(callable)();
202 }
203 }
204 };
205
206 /// Send data through the socket.
207 template<typename CompletionHandler>
208 void dispatch(CompletionHandler&& handler)
209 {
210 asio::dispatch(adaptor_.get_io_context(),
211 WeakWrappedMessage<typename std::decay<CompletionHandler>::type>{
212 std::forward<CompletionHandler>(handler), anchor_});
213 }
214
215 /// Send data through the socket and return immediately.
216 template<typename CompletionHandler>
217 void post(CompletionHandler&& handler)
218 {
219 asio::post(adaptor_.get_io_context(),
220 WeakWrappedMessage<typename std::decay<CompletionHandler>::type>{
221 std::forward<CompletionHandler>(handler), anchor_});
222 }
223
224 /// Send a "Ping" message.
225
226 ///
227 /// Usually invoked to check if the other point is still online.
228 void send_ping(std::string msg) override
229 {
230 send_data(0x9, std::move(msg));
231 }
232
233 /// Send a "Pong" message.
234
235 ///
236 /// Usually automatically invoked as a response to a "Ping" message.
237 void send_pong(std::string msg) override
238 {
239 send_data(0xA, std::move(msg));
240 }
241
242 /// Send a binary encoded message.
243 void send_binary(std::string msg) override
244 {
245 send_data(0x2, std::move(msg));
246 }
247
248 /// Send a plaintext message.
249 void send_text(std::string msg) override
250 {
251 send_data(0x1, std::move(msg));
252 }
253
254 /// Send a close signal.
255
256 ///
257 /// Sets a flag to destroy the object once the message is sent.
258 void close(std::string const& msg, uint16_t status_code) override
259 {
260 dispatch([shared_this = this->shared_from_this(), msg, status_code]() mutable {
261 shared_this->has_sent_close_ = true;
262 if (shared_this->has_recv_close_ && !shared_this->is_close_handler_called_)
263 {
264 shared_this->is_close_handler_called_ = true;
265 if (shared_this->close_handler_)
266 shared_this->close_handler_(*shared_this, msg, status_code);
267 }
268 auto header = shared_this->build_header(0x8, msg.size() + 2);
269 char status_buf[2];
270 *(uint16_t*)(status_buf) = htons(status_code);
271
272 shared_this->write_buffers_.emplace_back(std::move(header));
273 shared_this->write_buffers_.emplace_back(std::string(status_buf, 2));
274 shared_this->write_buffers_.emplace_back(msg);
275 shared_this->do_write();
276 });
277 }
278
279 std::string get_remote_ip() override
280 {
281 return adaptor_.address();
282 }
283
284 void set_max_payload_size(uint64_t payload)
285 {
286 max_payload_bytes_ = payload;
287 }
288
289 /// Returns the matching client/server subprotocol, empty string if none matched.
290 std::string get_subprotocol() const override
291 {
292 return subprotocol_;
293 }
294
295 protected:
296 /// Generate the websocket headers using an opcode and the message size (in bytes).
297 std::string build_header(int opcode, size_t size)
298 {
299 char buf[2 + 8] = "\x80\x00";
300 buf[0] += opcode;
301 if (size < 126)
302 {
303 buf[1] += static_cast<char>(size);
304 return {buf, buf + 2};
305 }
306 else if (size < 0x10000)
307 {
308 buf[1] += 126;
309 *(uint16_t*)(buf + 2) = htons(static_cast<uint16_t>(size));
310 return {buf, buf + 4};
311 }
312 else
313 {
314 buf[1] += 127;
315 *reinterpret_cast<uint64_t*>(buf + 2) = ((1 == htonl(1)) ? static_cast<uint64_t>(size) : (static_cast<uint64_t>(htonl((size)&0xFFFFFFFF)) << 32) | htonl(static_cast<uint64_t>(size) >> 32));
316 return {buf, buf + 10};
317 }
318 }
319
320 /// Send the HTTP upgrade response.
321
322 ///
323 /// Finishes the handshake process, then starts reading messages from the socket.
324 void start(std::string&& hello)
325 {
326 static const std::string header =
327 "HTTP/1.1 101 Switching Protocols\r\n"
328 "Upgrade: websocket\r\n"
329 "Connection: Upgrade\r\n"
330 "Sec-WebSocket-Accept: ";
331 write_buffers_.emplace_back(header);
332 write_buffers_.emplace_back(std::move(hello));
333 write_buffers_.emplace_back(crlf);
334 if (!subprotocol_.empty())
335 {
336 write_buffers_.emplace_back("Sec-WebSocket-Protocol: ");
337 write_buffers_.emplace_back(subprotocol_);
338 write_buffers_.emplace_back(crlf);
339 }
340 write_buffers_.emplace_back(crlf);
341 do_write();
342 if (open_handler_)
343 open_handler_(*this);
344 do_read();
345 }
346
347 /// Read a websocket message.
348
349 ///
350 /// Involves:<br>
351 /// Handling headers (opcodes, size).<br>
352 /// Unmasking the payload.<br>
353 /// Reading the actual payload.<br>
354 void do_read()
355 {
356 if (has_sent_close_ && has_recv_close_)
357 {
358 close_connection_ = true;
359 adaptor_.shutdown_readwrite();
360 adaptor_.close();
362 return;
363 }
364
365 is_reading = true;
366 switch (state_)
367 {
368 case WebSocketReadState::MiniHeader:
369 {
370 mini_header_ = 0;
371 //asio::async_read(adaptor_.socket(), asio::buffer(&mini_header_, 1),
372 adaptor_.socket().async_read_some(
373 asio::buffer(&mini_header_, 2),
374 [shared_this = this->shared_from_this()](const error_code& ec, std::size_t
375#ifdef CROW_ENABLE_DEBUG
376 bytes_transferred
377#endif
378 )
379
380 {
381 shared_this->is_reading = false;
382 shared_this->mini_header_ = ntohs(shared_this->mini_header_);
383#ifdef CROW_ENABLE_DEBUG
384
385 if (!ec && bytes_transferred != 2)
386 {
387 throw std::runtime_error("WebSocket:MiniHeader:async_read fail:asio bug?");
388 }
389#endif
390
391 if (!ec)
392 {
393 if ((shared_this->mini_header_ & 0x80) == 0x80)
394 shared_this->has_mask_ = true;
395 else //if the websocket specification is enforced and the message isn't masked, terminate the connection
396 {
397#ifndef CROW_ENFORCE_WS_SPEC
398 shared_this->has_mask_ = false;
399#else
400 shared_this->close_connection_ = true;
401 shared_this->adaptor_.shutdown_readwrite();
402 shared_this->adaptor_.close();
403 if (shared_this->error_handler_)
404 shared_this->error_handler_(*shared_this, "Client connection not masked.");
405 shared_this->check_destroy(CloseStatusCode::UnacceptableData);
406#endif
407 }
408
409 if ((shared_this->mini_header_ & 0x7f) == 127)
410 {
411 shared_this->state_ = WebSocketReadState::Len64;
412 }
413 else if ((shared_this->mini_header_ & 0x7f) == 126)
414 {
415 shared_this->state_ = WebSocketReadState::Len16;
416 }
417 else
418 {
419 shared_this->remaining_length_ = shared_this->mini_header_ & 0x7f;
420 shared_this->state_ = WebSocketReadState::Mask;
421 }
422 shared_this->do_read();
423 }
424 else
425 {
426 shared_this->close_connection_ = true;
427 shared_this->adaptor_.shutdown_readwrite();
428 shared_this->adaptor_.close();
429 if (shared_this->error_handler_)
430 shared_this->error_handler_(*shared_this, ec.message());
431 shared_this->check_destroy();
432 }
433 });
434 }
435 break;
436 case WebSocketReadState::Len16:
437 {
438 remaining_length_ = 0;
439 remaining_length16_ = 0;
440 asio::async_read(
441 adaptor_.socket(), asio::buffer(&remaining_length16_, 2),
442 [shared_this = this->shared_from_this()](const error_code& ec, std::size_t
443#ifdef CROW_ENABLE_DEBUG
444 bytes_transferred
445#endif
446 ) {
447 shared_this->is_reading = false;
448 shared_this->remaining_length16_ = ntohs(shared_this->remaining_length16_);
449 shared_this->remaining_length_ = shared_this->remaining_length16_;
450#ifdef CROW_ENABLE_DEBUG
451 if (!ec && bytes_transferred != 2)
452 {
453 throw std::runtime_error("WebSocket:Len16:async_read fail:asio bug?");
454 }
455#endif
456
457 if (!ec)
458 {
459 shared_this->state_ = WebSocketReadState::Mask;
460 shared_this->do_read();
461 }
462 else
463 {
464 shared_this->close_connection_ = true;
465 shared_this->adaptor_.shutdown_readwrite();
466 shared_this->adaptor_.close();
467 if (shared_this->error_handler_)
468 shared_this->error_handler_(*shared_this, ec.message());
469 shared_this->check_destroy();
470 }
471 });
472 }
473 break;
474 case WebSocketReadState::Len64:
475 {
476 asio::async_read(
477 adaptor_.socket(), asio::buffer(&remaining_length_, 8),
478 [shared_this = this->shared_from_this()](const error_code& ec, std::size_t
479#ifdef CROW_ENABLE_DEBUG
480 bytes_transferred
481#endif
482 ) {
483 shared_this->is_reading = false;
484 shared_this->remaining_length_ = ((1 == ntohl(1)) ? (shared_this->remaining_length_) : (static_cast<uint64_t>(ntohl((shared_this->remaining_length_)&0xFFFFFFFF)) << 32) | ntohl((shared_this->remaining_length_) >> 32));
485#ifdef CROW_ENABLE_DEBUG
486 if (!ec && bytes_transferred != 8)
487 {
488 throw std::runtime_error("WebSocket:Len16:async_read fail:asio bug?");
489 }
490#endif
491
492 if (!ec)
493 {
494 shared_this->state_ = WebSocketReadState::Mask;
495 shared_this->do_read();
496 }
497 else
498 {
499 shared_this->close_connection_ = true;
500 shared_this->adaptor_.shutdown_readwrite();
501 shared_this->adaptor_.close();
502 if (shared_this->error_handler_)
503 shared_this->error_handler_(*shared_this, ec.message());
504 shared_this->check_destroy();
505 }
506 });
507 }
508 break;
509 case WebSocketReadState::Mask:
510 if (remaining_length_ > max_payload_bytes_)
511 {
512 close_connection_ = true;
513 adaptor_.close();
514 if (error_handler_)
515 error_handler_(*this, "Message length exceeds maximum payload.");
516 check_destroy(MessageTooBig);
517 }
518 else if (has_mask_)
519 {
520 asio::async_read(
521 adaptor_.socket(), asio::buffer((char*)&mask_, 4),
522 [shared_this = this->shared_from_this()](const error_code& ec, std::size_t
523#ifdef CROW_ENABLE_DEBUG
524 bytes_transferred
525#endif
526 ) {
527 shared_this->is_reading = false;
528#ifdef CROW_ENABLE_DEBUG
529 if (!ec && bytes_transferred != 4)
530 {
531 throw std::runtime_error("WebSocket:Mask:async_read fail:asio bug?");
532 }
533#endif
534
535 if (!ec)
536 {
537 shared_this->state_ = WebSocketReadState::Payload;
538 shared_this->do_read();
539 }
540 else
541 {
542 shared_this->close_connection_ = true;
543 if (shared_this->error_handler_)
544 shared_this->error_handler_(*shared_this, ec.message());
545 shared_this->adaptor_.shutdown_readwrite();
546 shared_this->adaptor_.close();
547 shared_this->check_destroy();
548 }
549 });
550 }
551 else
552 {
553 state_ = WebSocketReadState::Payload;
554 do_read();
555 }
556 break;
557 case WebSocketReadState::Payload:
558 {
559 auto to_read = static_cast<std::uint64_t>(buffer_.size());
560 if (remaining_length_ < to_read)
561 to_read = remaining_length_;
562 adaptor_.socket().async_read_some(
563 asio::buffer(buffer_, static_cast<std::size_t>(to_read)),
564 [shared_this = this->shared_from_this()](const error_code& ec, std::size_t bytes_transferred) {
565 shared_this->is_reading = false;
566
567 if (!ec)
568 {
569 shared_this->fragment_.insert(shared_this->fragment_.end(), shared_this->buffer_.begin(), shared_this->buffer_.begin() + bytes_transferred);
570 shared_this->remaining_length_ -= bytes_transferred;
571 if (shared_this->remaining_length_ == 0)
572 {
573 if (shared_this->handle_fragment())
574 {
575 shared_this->state_ = WebSocketReadState::MiniHeader;
576 shared_this->do_read();
577 }
578 }
579 else
580 shared_this->do_read();
581 }
582 else
583 {
584 shared_this->close_connection_ = true;
585 if (shared_this->error_handler_)
586 shared_this->error_handler_(*shared_this, ec.message());
587 shared_this->adaptor_.shutdown_readwrite();
588 shared_this->adaptor_.close();
589 shared_this->check_destroy();
590 }
591 });
592 }
593 break;
594 }
595 }
596
597 /// Check if the FIN bit is set.
598 bool is_FIN()
599 {
600 return mini_header_ & 0x8000;
601 }
602
603 /// Extract the opcode from the header.
604 int opcode()
605 {
606 return (mini_header_ & 0x0f00) >> 8;
607 }
608
609 /// Process the payload fragment.
610
611 ///
612 /// Unmasks the fragment, checks the opcode, merges fragments into 1 message body, and calls the appropriate handler.
614 {
615 if (has_mask_)
616 {
617 for (decltype(fragment_.length()) i = 0; i < fragment_.length(); i++)
618 {
619 fragment_[i] ^= ((char*)&mask_)[i % 4];
620 }
621 }
622 switch (opcode())
623 {
624 case 0: // Continuation
625 {
626 message_ += fragment_;
627 if (is_FIN())
628 {
629 if (message_handler_)
630 message_handler_(*this, message_, is_binary_);
631 message_.clear();
632 }
633 }
634 break;
635 case 1: // Text
636 {
637 is_binary_ = false;
638 message_ += fragment_;
639 if (is_FIN())
640 {
641 if (message_handler_)
642 message_handler_(*this, message_, is_binary_);
643 message_.clear();
644 }
645 }
646 break;
647 case 2: // Binary
648 {
649 is_binary_ = true;
650 message_ += fragment_;
651 if (is_FIN())
652 {
653 if (message_handler_)
654 message_handler_(*this, message_, is_binary_);
655 message_.clear();
656 }
657 }
658 break;
659 case 0x8: // Close
660 {
661 has_recv_close_ = true;
662
663
664 uint16_t status_code = NoStatusCodePresent;
665 std::string::size_type message_start = 2;
666 if (fragment_.size() >= 2)
667 {
668 status_code = ntohs(((uint16_t*)fragment_.data())[0]);
669 } else {
670 // no message will crash substr
671 message_start = 0;
672 }
673
674 if (!has_sent_close_)
675 {
676 close(fragment_.substr(message_start), status_code);
677 }
678 else
679 {
680
681 close_connection_ = true;
682 if (!is_close_handler_called_)
683 {
684 if (close_handler_)
685 close_handler_(*this, fragment_.substr(message_start), status_code);
686 is_close_handler_called_ = true;
687 }
688 adaptor_.shutdown_readwrite();
689 adaptor_.close();
690
691 // Close handler must have been called at this point so code does not matter
692 check_destroy();
693 return false;
694 }
695 }
696 break;
697 case 0x9: // Ping
698 {
699 send_pong(fragment_);
700 }
701 break;
702 case 0xA: // Pong
703 {
704 pong_received_ = true;
705 }
706 break;
707 }
708
709 fragment_.clear();
710 return true;
711 }
712
713 /// Send the buffers' data through the socket.
714
715 ///
716 /// Also destroys the object if the Close flag is set.
717 void do_write()
718 {
719 if (write_buffers_.empty()) return;
720
721 sending_buffers_.swap(write_buffers_);
722 std::vector<asio::const_buffer> buffers;
723 buffers.reserve(sending_buffers_.size());
724 for (auto& s : sending_buffers_)
725 {
726 buffers.emplace_back(asio::buffer(s));
727 }
728 auto watch = std::weak_ptr<void>{anchor_};
729 asio::async_write(
730 adaptor_.socket(), buffers,
731 [shared_this = this->shared_from_this(), watch](const error_code& ec, std::size_t /*bytes_transferred*/) {
732 auto anchor = watch.lock();
733 if (anchor == nullptr)
734 return;
735
736 if (!ec && !shared_this->close_connection_)
737 {
738 shared_this->sending_buffers_.clear();
739 if (!shared_this->write_buffers_.empty())
740 shared_this->do_write();
741 if (shared_this->has_sent_close_)
742 shared_this->close_connection_ = true;
743 }
744 else
745 {
746 shared_this->sending_buffers_.clear();
747 shared_this->close_connection_ = true;
748 shared_this->check_destroy();
749 }
750 });
751 }
752
753 /// Destroy the Connection.
754 void check_destroy(websocket::CloseStatusCode code = CloseStatusCode::ClosedAbnormally)
755 {
756 // Note that if the close handler was not yet called at this point we did not receive a close packet (or send one)
757 // and thus we use ClosedAbnormally unless instructed otherwise
758 if (!is_close_handler_called_)
759 {
760 if (close_handler_)
761 {
762 close_handler_(*this, "uncleanly", code);
763 }
764 }
765
766 handler_->remove_websocket(this->shared_from_this());
767 }
768
769
771 {
772 std::string payload;
773 Connection* self;
774 int opcode;
775
776 void operator()()
777 {
778 self->send_data_impl(this);
779 }
780 };
781
782 void send_data_impl(SendMessageType* s)
783 {
784 auto header = build_header(s->opcode, s->payload.size());
785 write_buffers_.emplace_back(std::move(header));
786 write_buffers_.emplace_back(std::move(s->payload));
787 do_write();
788 }
789
790 void send_data(int opcode, std::string&& msg)
791 {
792 SendMessageType event_arg{
793 std::move(msg),
794 this,
795 opcode};
796
797 post(std::move(event_arg));
798 }
799
800 private:
801 Connection(Adaptor&& adaptor, Handler* handler, uint64_t max_payload,
802 std::function<void(crow::websocket::connection&)> open_handler,
803 std::function<void(crow::websocket::connection&, const std::string&, bool)> message_handler,
804 std::function<void(crow::websocket::connection&, const std::string&, uint16_t)> close_handler,
805 std::function<void(crow::websocket::connection&, const std::string&)> error_handler,
806 std::function<void(const crow::request&, std::optional<crow::response>&, void**)> accept_handler):
807 adaptor_(std::move(adaptor)),
808 handler_(handler),
809 max_payload_bytes_(max_payload),
810 open_handler_(std::move(open_handler)),
811 message_handler_(std::move(message_handler)),
812 close_handler_(std::move(close_handler)),
813 error_handler_(std::move(error_handler)),
814 accept_handler_(std::move(accept_handler))
815 {}
816
817 Adaptor adaptor_;
818 Handler* handler_;
819
820 std::vector<std::string> sending_buffers_;
821 std::vector<std::string> write_buffers_;
822
823 std::array<char, 4096> buffer_;
824 bool is_binary_;
825 std::string message_;
826 std::string fragment_;
827 WebSocketReadState state_{WebSocketReadState::MiniHeader};
828 uint16_t remaining_length16_{0};
829 uint64_t remaining_length_{0};
830 uint64_t max_payload_bytes_{UINT64_MAX};
831 std::string subprotocol_;
832 bool close_connection_{false};
833 bool is_reading{false};
834 bool has_mask_{false};
835 uint32_t mask_;
836 uint16_t mini_header_;
837 bool has_sent_close_{false};
838 bool has_recv_close_{false};
839 bool error_occurred_{false};
840 bool pong_received_{false};
841 bool is_close_handler_called_{false};
842
843 std::shared_ptr<void> anchor_ = std::make_shared<int>(); // Value is just for placeholding
844
845 std::function<void(crow::websocket::connection&)> open_handler_;
846 std::function<void(crow::websocket::connection&, const std::string&, bool)> message_handler_;
847 std::function<void(crow::websocket::connection&, const std::string&, uint16_t status_code)> close_handler_;
848 std::function<void(crow::websocket::connection&, const std::string&)> error_handler_;
849 std::function<void(const crow::request&, std::optional<crow::response>&, void**)> accept_handler_;
850 };
851 } // namespace websocket
852} // namespace crow
TinySHA1 - a header only implementation of the SHA1 algorithm in C++. Based on the implementation in ...
A websocket connection.
Definition websocket.h:111
void dispatch(CompletionHandler &&handler)
Send data through the socket.
Definition websocket.h:208
void do_read()
Read a websocket message.
Definition websocket.h:354
void send_pong(std::string msg) override
Send a "Pong" message.
Definition websocket.h:237
bool handle_fragment()
Process the payload fragment.
Definition websocket.h:613
std::string build_header(int opcode, size_t size)
Generate the websocket headers using an opcode and the message size (in bytes).
Definition websocket.h:297
void close(std::string const &msg, uint16_t status_code) override
Send a close signal.
Definition websocket.h:258
void send_text(std::string msg) override
Send a plaintext message.
Definition websocket.h:249
std::string get_subprotocol() const override
Returns the matching client/server subprotocol, empty string if none matched.
Definition websocket.h:290
static void create(const crow::request &req, Adaptor adaptor, Handler *handler, uint64_t max_payload, const std::vector< std::string > &subprotocols, std::function< void(crow::websocket::connection &)> open_handler, std::function< void(crow::websocket::connection &, const std::string &, bool)> message_handler, std::function< void(crow::websocket::connection &, const std::string &, uint16_t)> close_handler, std::function< void(crow::websocket::connection &, const std::string &)> error_handler, std::function< void(const crow::request &, std::optional< crow::response > &, void **)> accept_handler, bool mirror_protocols)
Definition websocket.h:117
int opcode()
Extract the opcode from the header.
Definition websocket.h:604
void do_write()
Send the buffers' data through the socket.
Definition websocket.h:717
void start(std::string &&hello)
Send the HTTP upgrade response.
Definition websocket.h:324
bool is_FIN()
Check if the FIN bit is set.
Definition websocket.h:598
void send_ping(std::string msg) override
Send a "Ping" message.
Definition websocket.h:228
void send_binary(std::string msg) override
Send a binary encoded message.
Definition websocket.h:243
void check_destroy(websocket::CloseStatusCode code=CloseStatusCode::ClosedAbnormally)
Destroy the Connection.
Definition websocket.h:754
void post(CompletionHandler &&handler)
Send data through the socket and return immediately.
Definition websocket.h:217
A tiny SHA1 algorithm implementation used internally in the Crow server (specifically in crow/websock...
Definition TinySHA1.hpp:48
The main namespace of the library. In this namespace is defined the most important classes and functi...
An HTTP request.
Definition http_request.h:36
bool keep_alive
Whether or not the server should send a connection: Keep-Alive header to the client.
Definition http_request.h:45
A base class for websocket connection.
Definition websocket.h:67