Crow  1.1
A C++ microframework for the web
websocket.h
1 #pragma once
2 #include <array>
3 #include "crow/logging.h"
4 #include "crow/socket_adaptors.h"
5 #include "crow/http_request.h"
6 #include "crow/TinySHA1.hpp"
7 #include "crow/utility.h"
8 
9 namespace crow // NOTE: Already documented in "crow/app.h"
10 {
11 #ifdef CROW_USE_BOOST
12  namespace asio = boost::asio;
13  using error_code = boost::system::error_code;
14 #else
15  using error_code = asio::error_code;
16 #endif
17 
18  /**
19  * \namespace crow::websocket
20  * \brief Namespace that includes the \ref Connection class
21  * and \ref connection struct. Useful for WebSockets connection.
22  *
23  * Used specially in crow/websocket.h, crow/app.h and crow/routing.h
24  */
25  namespace websocket
26  {
27  enum class WebSocketReadState
28  {
29  MiniHeader,
30  Len16,
31  Len64,
32  Mask,
33  Payload,
34  };
35 
36  /// A base class for websocket connection.
37  struct connection
38  {
39  virtual void send_binary(std::string msg) = 0;
40  virtual void send_text(std::string msg) = 0;
41  virtual void send_ping(std::string msg) = 0;
42  virtual void send_pong(std::string msg) = 0;
43  virtual void close(std::string const& msg = "quit") = 0;
44  virtual std::string get_remote_ip() = 0;
45  virtual ~connection() = default;
46 
47  void userdata(void* u) { userdata_ = u; }
48  void* userdata() { return userdata_; }
49 
50  private:
51  void* userdata_;
52  };
53 
54  // Modified version of the illustration in RFC6455 Section-5.2
55  //
56  //
57  // 0 1 2 3 -byte
58  // 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 -bit
59  // +-+-+-+-+-------+-+-------------+-------------------------------+
60  // |F|R|R|R| opcode|M| Payload len | Extended payload length |
61  // |I|S|S|S| (4) |A| (7) | (16/64) |
62  // |N|V|V|V| |S| | (if payload len==126/127) |
63  // | |1|2|3| |K| | |
64  // +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
65  // | Extended payload length continued, if payload len == 127 |
66  // + - - - - - - - - - - - - - - - +-------------------------------+
67  // | |Masking-key, if MASK set to 1 |
68  // +-------------------------------+-------------------------------+
69  // | Masking-key (continued) | Payload Data |
70  // +-------------------------------- - - - - - - - - - - - - - - - +
71  // : Payload Data continued ... :
72  // + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
73  // | Payload Data continued ... |
74  // +---------------------------------------------------------------+
75  //
76 
77  /// A websocket connection.
78 
79  template<typename Adaptor, typename Handler>
80  class Connection : public connection
81  {
82  public:
83  /// Constructor for a connection.
84 
85  ///
86  /// Requires a request with an "Upgrade: websocket" header.<br>
87  /// Automatically handles the handshake.
88  Connection(const crow::request& req, Adaptor&& adaptor, Handler* handler, uint64_t max_payload,
89  std::function<void(crow::websocket::connection&)> open_handler,
90  std::function<void(crow::websocket::connection&, const std::string&, bool)> message_handler,
91  std::function<void(crow::websocket::connection&, const std::string&)> close_handler,
92  std::function<void(crow::websocket::connection&, const std::string&)> error_handler,
93  std::function<bool(const crow::request&, void**)> accept_handler):
94  adaptor_(std::move(adaptor)),
95  handler_(handler),
96  max_payload_bytes_(max_payload),
97  open_handler_(std::move(open_handler)),
98  message_handler_(std::move(message_handler)),
99  close_handler_(std::move(close_handler)),
100  error_handler_(std::move(error_handler)),
101  accept_handler_(std::move(accept_handler))
102  {
103  if (!utility::string_equals(req.get_header_value("upgrade"), "websocket"))
104  {
105  adaptor_.close();
106  handler_->remove_websocket(this);
107  delete this;
108  return;
109  }
110 
111  if (accept_handler_)
112  {
113  void* ud = nullptr;
114  if (!accept_handler_(req, &ud))
115  {
116  adaptor_.close();
117  handler_->remove_websocket(this);
118  delete this;
119  return;
120  }
121  userdata(ud);
122  }
123 
124  // Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
125  // Sec-WebSocket-Version: 13
126  std::string magic = req.get_header_value("Sec-WebSocket-Key") + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
127  sha1::SHA1 s;
128  s.processBytes(magic.data(), magic.size());
129  uint8_t digest[20];
130  s.getDigestBytes(digest);
131 
132  start(crow::utility::base64encode((unsigned char*)digest, 20));
133  }
134 
135  ~Connection() noexcept override
136  {
137  // Do not modify anchor_ here since writing shared_ptr is not atomic.
138  auto watch = std::weak_ptr<void>{anchor_};
139 
140  // Wait until all unhandled asynchronous operations to join.
141  // As the deletion occurs inside 'check_destroy()', which already locks
142  // anchor, use count can be 1 on valid deletion context.
143  while (watch.use_count() > 2) // 1 for 'check_destroy() routine', 1 for 'this->anchor_'
144  {
145  std::this_thread::yield();
146  }
147  }
148 
149  template<typename Callable>
151  {
152  Callable callable;
153  std::weak_ptr<void> watch;
154 
155  void operator()()
156  {
157  if (auto anchor = watch.lock())
158  {
159  std::move(callable)();
160  }
161  }
162  };
163 
164  /// Send data through the socket.
165  template<typename CompletionHandler>
166  void dispatch(CompletionHandler&& handler)
167  {
168  asio::dispatch(adaptor_.get_io_service(),
169  WeakWrappedMessage<typename std::decay<CompletionHandler>::type>{
170  std::forward<CompletionHandler>(handler), anchor_});
171  }
172 
173  /// Send data through the socket and return immediately.
174  template<typename CompletionHandler>
175  void post(CompletionHandler&& handler)
176  {
177  asio::post(adaptor_.get_io_service(),
178  WeakWrappedMessage<typename std::decay<CompletionHandler>::type>{
179  std::forward<CompletionHandler>(handler), anchor_});
180  }
181 
182  /// Send a "Ping" message.
183 
184  ///
185  /// Usually invoked to check if the other point is still online.
186  void send_ping(std::string msg) override
187  {
188  send_data(0x9, std::move(msg));
189  }
190 
191  /// Send a "Pong" message.
192 
193  ///
194  /// Usually automatically invoked as a response to a "Ping" message.
195  void send_pong(std::string msg) override
196  {
197  send_data(0xA, std::move(msg));
198  }
199 
200  /// Send a binary encoded message.
201  void send_binary(std::string msg) override
202  {
203  send_data(0x2, std::move(msg));
204  }
205 
206  /// Send a plaintext message.
207  void send_text(std::string msg) override
208  {
209  send_data(0x1, std::move(msg));
210  }
211 
212  /// Send a close signal.
213 
214  ///
215  /// Sets a flag to destroy the object once the message is sent.
216  void close(std::string const& msg) override
217  {
218  dispatch([this, msg]() mutable {
219  has_sent_close_ = true;
220  if (has_recv_close_ && !is_close_handler_called_)
221  {
222  is_close_handler_called_ = true;
223  if (close_handler_)
224  close_handler_(*this, msg);
225  }
226  auto header = build_header(0x8, msg.size());
227  write_buffers_.emplace_back(std::move(header));
228  write_buffers_.emplace_back(msg);
229  do_write();
230  });
231  }
232 
233  std::string get_remote_ip() override
234  {
235  return adaptor_.remote_endpoint().address().to_string();
236  }
237 
238  void set_max_payload_size(uint64_t payload)
239  {
240  max_payload_bytes_ = payload;
241  }
242 
243  protected:
244  /// Generate the websocket headers using an opcode and the message size (in bytes).
245  std::string build_header(int opcode, size_t size)
246  {
247  char buf[2 + 8] = "\x80\x00";
248  buf[0] += opcode;
249  if (size < 126)
250  {
251  buf[1] += static_cast<char>(size);
252  return {buf, buf + 2};
253  }
254  else if (size < 0x10000)
255  {
256  buf[1] += 126;
257  *(uint16_t*)(buf + 2) = htons(static_cast<uint16_t>(size));
258  return {buf, buf + 4};
259  }
260  else
261  {
262  buf[1] += 127;
263  *reinterpret_cast<uint64_t*>(buf + 2) = ((1 == htonl(1)) ? static_cast<uint64_t>(size) : (static_cast<uint64_t>(htonl((size)&0xFFFFFFFF)) << 32) | htonl(static_cast<uint64_t>(size) >> 32));
264  return {buf, buf + 10};
265  }
266  }
267 
268  /// Send the HTTP upgrade response.
269 
270  ///
271  /// Finishes the handshake process, then starts reading messages from the socket.
272  void start(std::string&& hello)
273  {
274  static const std::string header =
275  "HTTP/1.1 101 Switching Protocols\r\n"
276  "Upgrade: websocket\r\n"
277  "Connection: Upgrade\r\n"
278  "Sec-WebSocket-Accept: ";
279  write_buffers_.emplace_back(header);
280  write_buffers_.emplace_back(std::move(hello));
281  write_buffers_.emplace_back(crlf);
282  write_buffers_.emplace_back(crlf);
283  do_write();
284  if (open_handler_)
285  open_handler_(*this);
286  do_read();
287  }
288 
289  /// Read a websocket message.
290 
291  ///
292  /// Involves:<br>
293  /// Handling headers (opcodes, size).<br>
294  /// Unmasking the payload.<br>
295  /// Reading the actual payload.<br>
296  void do_read()
297  {
298  if (has_sent_close_ && has_recv_close_)
299  {
300  close_connection_ = true;
301  adaptor_.shutdown_readwrite();
302  adaptor_.close();
303  check_destroy();
304  return;
305  }
306 
307  is_reading = true;
308  switch (state_)
309  {
310  case WebSocketReadState::MiniHeader:
311  {
312  mini_header_ = 0;
313  //asio::async_read(adaptor_.socket(), asio::buffer(&mini_header_, 1),
314  adaptor_.socket().async_read_some(
315  asio::buffer(&mini_header_, 2),
316  [this](const error_code& ec, std::size_t
317 #ifdef CROW_ENABLE_DEBUG
318  bytes_transferred
319 #endif
320  )
321 
322  {
323  is_reading = false;
324  mini_header_ = ntohs(mini_header_);
325 #ifdef CROW_ENABLE_DEBUG
326 
327  if (!ec && bytes_transferred != 2)
328  {
329  throw std::runtime_error("WebSocket:MiniHeader:async_read fail:asio bug?");
330  }
331 #endif
332 
333  if (!ec)
334  {
335  if ((mini_header_ & 0x80) == 0x80)
336  has_mask_ = true;
337  else //if the websocket specification is enforced and the message isn't masked, terminate the connection
338  {
339 #ifndef CROW_ENFORCE_WS_SPEC
340  has_mask_ = false;
341 #else
342  close_connection_ = true;
343  adaptor_.shutdown_readwrite();
344  adaptor_.close();
345  if (error_handler_)
346  error_handler_(*this, "Client connection not masked.");
347  check_destroy();
348 #endif
349  }
350 
351  if ((mini_header_ & 0x7f) == 127)
352  {
353  state_ = WebSocketReadState::Len64;
354  }
355  else if ((mini_header_ & 0x7f) == 126)
356  {
357  state_ = WebSocketReadState::Len16;
358  }
359  else
360  {
361  remaining_length_ = mini_header_ & 0x7f;
362  state_ = WebSocketReadState::Mask;
363  }
364  do_read();
365  }
366  else
367  {
368  close_connection_ = true;
369  adaptor_.shutdown_readwrite();
370  adaptor_.close();
371  if (error_handler_)
372  error_handler_(*this, ec.message());
373  check_destroy();
374  }
375  });
376  }
377  break;
378  case WebSocketReadState::Len16:
379  {
380  remaining_length_ = 0;
381  remaining_length16_ = 0;
382  asio::async_read(
383  adaptor_.socket(), asio::buffer(&remaining_length16_, 2),
384  [this](const error_code& ec, std::size_t
385 #ifdef CROW_ENABLE_DEBUG
386  bytes_transferred
387 #endif
388  ) {
389  is_reading = false;
390  remaining_length16_ = ntohs(remaining_length16_);
391  remaining_length_ = remaining_length16_;
392 #ifdef CROW_ENABLE_DEBUG
393  if (!ec && bytes_transferred != 2)
394  {
395  throw std::runtime_error("WebSocket:Len16:async_read fail:asio bug?");
396  }
397 #endif
398 
399  if (!ec)
400  {
401  state_ = WebSocketReadState::Mask;
402  do_read();
403  }
404  else
405  {
406  close_connection_ = true;
407  adaptor_.shutdown_readwrite();
408  adaptor_.close();
409  if (error_handler_)
410  error_handler_(*this, ec.message());
411  check_destroy();
412  }
413  });
414  }
415  break;
416  case WebSocketReadState::Len64:
417  {
418  asio::async_read(
419  adaptor_.socket(), asio::buffer(&remaining_length_, 8),
420  [this](const error_code& ec, std::size_t
421 #ifdef CROW_ENABLE_DEBUG
422  bytes_transferred
423 #endif
424  ) {
425  is_reading = false;
426  remaining_length_ = ((1 == ntohl(1)) ? (remaining_length_) : (static_cast<uint64_t>(ntohl((remaining_length_)&0xFFFFFFFF)) << 32) | ntohl((remaining_length_) >> 32));
427 #ifdef CROW_ENABLE_DEBUG
428  if (!ec && bytes_transferred != 8)
429  {
430  throw std::runtime_error("WebSocket:Len16:async_read fail:asio bug?");
431  }
432 #endif
433 
434  if (!ec)
435  {
436  state_ = WebSocketReadState::Mask;
437  do_read();
438  }
439  else
440  {
441  close_connection_ = true;
442  adaptor_.shutdown_readwrite();
443  adaptor_.close();
444  if (error_handler_)
445  error_handler_(*this, ec.message());
446  check_destroy();
447  }
448  });
449  }
450  break;
451  case WebSocketReadState::Mask:
452  if (remaining_length_ > max_payload_bytes_)
453  {
454  close_connection_ = true;
455  adaptor_.close();
456  if (error_handler_)
457  error_handler_(*this, "Message length exceeds maximum payload.");
458  check_destroy();
459  }
460  else if (has_mask_)
461  {
462  asio::async_read(
463  adaptor_.socket(), asio::buffer((char*)&mask_, 4),
464  [this](const error_code& ec, std::size_t
465 #ifdef CROW_ENABLE_DEBUG
466  bytes_transferred
467 #endif
468  ) {
469  is_reading = false;
470 #ifdef CROW_ENABLE_DEBUG
471  if (!ec && bytes_transferred != 4)
472  {
473  throw std::runtime_error("WebSocket:Mask:async_read fail:asio bug?");
474  }
475 #endif
476 
477  if (!ec)
478  {
479  state_ = WebSocketReadState::Payload;
480  do_read();
481  }
482  else
483  {
484  close_connection_ = true;
485  if (error_handler_)
486  error_handler_(*this, ec.message());
487  adaptor_.shutdown_readwrite();
488  adaptor_.close();
489  check_destroy();
490  }
491  });
492  }
493  else
494  {
495  state_ = WebSocketReadState::Payload;
496  do_read();
497  }
498  break;
499  case WebSocketReadState::Payload:
500  {
501  auto to_read = static_cast<std::uint64_t>(buffer_.size());
502  if (remaining_length_ < to_read)
503  to_read = remaining_length_;
504  adaptor_.socket().async_read_some(
505  asio::buffer(buffer_, static_cast<std::size_t>(to_read)),
506  [this](const error_code& ec, std::size_t bytes_transferred) {
507  is_reading = false;
508 
509  if (!ec)
510  {
511  fragment_.insert(fragment_.end(), buffer_.begin(), buffer_.begin() + bytes_transferred);
512  remaining_length_ -= bytes_transferred;
513  if (remaining_length_ == 0)
514  {
515  if (handle_fragment())
516  {
517  state_ = WebSocketReadState::MiniHeader;
518  do_read();
519  }
520  }
521  else
522  do_read();
523  }
524  else
525  {
526  close_connection_ = true;
527  if (error_handler_)
528  error_handler_(*this, ec.message());
529  adaptor_.shutdown_readwrite();
530  adaptor_.close();
531  check_destroy();
532  }
533  });
534  }
535  break;
536  }
537  }
538 
539  /// Check if the FIN bit is set.
540  bool is_FIN()
541  {
542  return mini_header_ & 0x8000;
543  }
544 
545  /// Extract the opcode from the header.
546  int opcode()
547  {
548  return (mini_header_ & 0x0f00) >> 8;
549  }
550 
551  /// Process the payload fragment.
552 
553  ///
554  /// Unmasks the fragment, checks the opcode, merges fragments into 1 message body, and calls the appropriate handler.
556  {
557  if (has_mask_)
558  {
559  for (decltype(fragment_.length()) i = 0; i < fragment_.length(); i++)
560  {
561  fragment_[i] ^= ((char*)&mask_)[i % 4];
562  }
563  }
564  switch (opcode())
565  {
566  case 0: // Continuation
567  {
568  message_ += fragment_;
569  if (is_FIN())
570  {
571  if (message_handler_)
572  message_handler_(*this, message_, is_binary_);
573  message_.clear();
574  }
575  }
576  break;
577  case 1: // Text
578  {
579  is_binary_ = false;
580  message_ += fragment_;
581  if (is_FIN())
582  {
583  if (message_handler_)
584  message_handler_(*this, message_, is_binary_);
585  message_.clear();
586  }
587  }
588  break;
589  case 2: // Binary
590  {
591  is_binary_ = true;
592  message_ += fragment_;
593  if (is_FIN())
594  {
595  if (message_handler_)
596  message_handler_(*this, message_, is_binary_);
597  message_.clear();
598  }
599  }
600  break;
601  case 0x8: // Close
602  {
603  has_recv_close_ = true;
604  if (!has_sent_close_)
605  {
606  close(fragment_);
607  }
608  else
609  {
610  adaptor_.shutdown_readwrite();
611  adaptor_.close();
612  close_connection_ = true;
613  if (!is_close_handler_called_)
614  {
615  if (close_handler_)
616  close_handler_(*this, fragment_);
617  is_close_handler_called_ = true;
618  }
619  check_destroy();
620  return false;
621  }
622  }
623  break;
624  case 0x9: // Ping
625  {
626  send_pong(fragment_);
627  }
628  break;
629  case 0xA: // Pong
630  {
631  pong_received_ = true;
632  }
633  break;
634  }
635 
636  fragment_.clear();
637  return true;
638  }
639 
640  /// Send the buffers' data through the socket.
641 
642  ///
643  /// Also destroys the object if the Close flag is set.
644  void do_write()
645  {
646  if (sending_buffers_.empty())
647  {
648  sending_buffers_.swap(write_buffers_);
649  std::vector<asio::const_buffer> buffers;
650  buffers.reserve(sending_buffers_.size());
651  for (auto& s : sending_buffers_)
652  {
653  buffers.emplace_back(asio::buffer(s));
654  }
655  auto watch = std::weak_ptr<void>{anchor_};
656  asio::async_write(
657  adaptor_.socket(), buffers,
658  [&, watch](const error_code& ec, std::size_t /*bytes_transferred*/) {
659  if (!ec && !close_connection_)
660  {
661  sending_buffers_.clear();
662  if (!write_buffers_.empty())
663  do_write();
664  if (has_sent_close_)
665  close_connection_ = true;
666  }
667  else
668  {
669  auto anchor = watch.lock();
670  if (anchor == nullptr) { return; }
671 
672  sending_buffers_.clear();
673  close_connection_ = true;
674  check_destroy();
675  }
676  });
677  }
678  }
679 
680  /// Destroy the Connection.
682  {
683  //if (has_sent_close_ && has_recv_close_)
684  if (!is_close_handler_called_)
685  if (close_handler_)
686  close_handler_(*this, "uncleanly");
687  handler_->remove_websocket(this);
688  if (sending_buffers_.empty() && !is_reading)
689  delete this;
690  }
691 
692 
694  {
695  std::string payload;
696  Connection* self;
697  int opcode;
698 
699  void operator()()
700  {
701  self->send_data_impl(this);
702  }
703  };
704 
705  void send_data_impl(SendMessageType* s)
706  {
707  auto header = build_header(s->opcode, s->payload.size());
708  write_buffers_.emplace_back(std::move(header));
709  write_buffers_.emplace_back(std::move(s->payload));
710  do_write();
711  }
712 
713  void send_data(int opcode, std::string&& msg)
714  {
715  SendMessageType event_arg{
716  std::move(msg),
717  this,
718  opcode};
719 
720  post(std::move(event_arg));
721  }
722 
723  private:
724  Adaptor adaptor_;
725  Handler* handler_;
726 
727  std::vector<std::string> sending_buffers_;
728  std::vector<std::string> write_buffers_;
729 
730  std::array<char, 4096> buffer_;
731  bool is_binary_;
732  std::string message_;
733  std::string fragment_;
734  WebSocketReadState state_{WebSocketReadState::MiniHeader};
735  uint16_t remaining_length16_{0};
736  uint64_t remaining_length_{0};
737  uint64_t max_payload_bytes_{UINT64_MAX};
738  bool close_connection_{false};
739  bool is_reading{false};
740  bool has_mask_{false};
741  uint32_t mask_;
742  uint16_t mini_header_;
743  bool has_sent_close_{false};
744  bool has_recv_close_{false};
745  bool error_occurred_{false};
746  bool pong_received_{false};
747  bool is_close_handler_called_{false};
748 
749  std::shared_ptr<void> anchor_ = std::make_shared<int>(); // Value is just for placeholding
750 
751  std::function<void(crow::websocket::connection&)> open_handler_;
752  std::function<void(crow::websocket::connection&, const std::string&, bool)> message_handler_;
753  std::function<void(crow::websocket::connection&, const std::string&)> close_handler_;
754  std::function<void(crow::websocket::connection&, const std::string&)> error_handler_;
755  std::function<bool(const crow::request&, void**)> accept_handler_;
756  };
757  } // namespace websocket
758 } // namespace crow
TinySHA1 - a header only implementation of the SHA1 algorithm in C++. Based on the implementation in ...
A websocket connection.
Definition: websocket.h:81
void dispatch(CompletionHandler &&handler)
Send data through the socket.
Definition: websocket.h:166
void do_read()
Read a websocket message.
Definition: websocket.h:296
void send_pong(std::string msg) override
Send a "Pong" message.
Definition: websocket.h:195
bool handle_fragment()
Process the payload fragment.
Definition: websocket.h:555
std::string build_header(int opcode, size_t size)
Generate the websocket headers using an opcode and the message size (in bytes).
Definition: websocket.h:245
void send_text(std::string msg) override
Send a plaintext message.
Definition: websocket.h:207
void close(std::string const &msg) override
Send a close signal.
Definition: websocket.h:216
int opcode()
Extract the opcode from the header.
Definition: websocket.h:546
void do_write()
Send the buffers' data through the socket.
Definition: websocket.h:644
void check_destroy()
Destroy the Connection.
Definition: websocket.h:681
void start(std::string &&hello)
Send the HTTP upgrade response.
Definition: websocket.h:272
bool is_FIN()
Check if the FIN bit is set.
Definition: websocket.h:540
void send_ping(std::string msg) override
Send a "Ping" message.
Definition: websocket.h:186
void send_binary(std::string msg) override
Send a binary encoded message.
Definition: websocket.h:201
Connection(const crow::request &req, Adaptor &&adaptor, Handler *handler, uint64_t max_payload, std::function< void(crow::websocket::connection &)> open_handler, std::function< void(crow::websocket::connection &, const std::string &, bool)> message_handler, std::function< void(crow::websocket::connection &, const std::string &)> close_handler, std::function< void(crow::websocket::connection &, const std::string &)> error_handler, std::function< bool(const crow::request &, void **)> accept_handler)
Constructor for a connection.
Definition: websocket.h:88
void post(CompletionHandler &&handler)
Send data through the socket and return immediately.
Definition: websocket.h:175
A tiny SHA1 algorithm implementation used internally in the Crow server (specifically in crow/websock...
Definition: TinySHA1.hpp:48
The main namespace of the library. In this namespace is defined the most important classes and functi...
An HTTP request.
Definition: http_request.h:36
A base class for websocket connection.
Definition: websocket.h:38