Crow  0.3
A C++ microframework for the web
websocket.h
1 #pragma once
2 #include <boost/algorithm/string/predicate.hpp>
3 #include <boost/array.hpp>
4 #include "crow/socket_adaptors.h"
5 #include "crow/http_request.h"
6 #include "crow/TinySHA1.hpp"
7 
8 namespace crow
9 {
10  namespace websocket
11  {
12  enum class WebSocketReadState
13  {
14  MiniHeader,
15  Len16,
16  Len64,
17  Mask,
18  Payload,
19  };
20 
21  ///A base class for websocket connection.
22  struct connection
23  {
24  virtual void send_binary(const std::string& msg) = 0;
25  virtual void send_text(const std::string& msg) = 0;
26  virtual void send_ping(const std::string& msg) = 0;
27  virtual void send_pong(const std::string& msg) = 0;
28  virtual void close(const std::string& msg = "quit") = 0;
29  virtual ~connection(){}
30 
31  void userdata(void* u) { userdata_ = u; }
32  void* userdata() { return userdata_; }
33 
34  private:
35  void* userdata_;
36  };
37 
38  // 0 1 2 3 -byte
39  // 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 -bit
40  // +-+-+-+-+-------+-+-------------+-------------------------------+
41  // |F|R|R|R| opcode|M| Payload len | Extended payload length |
42  // |I|S|S|S| (4) |A| (7) | (16/64) |
43  // |N|V|V|V| |S| | (if payload len==126/127) |
44  // | |1|2|3| |K| | |
45  // +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
46  // | Extended payload length continued, if payload len == 127 |
47  // + - - - - - - - - - - - - - - - +-------------------------------+
48  // | |Masking-key, if MASK set to 1 |
49  // +-------------------------------+-------------------------------+
50  // | Masking-key (continued) | Payload Data |
51  // +-------------------------------- - - - - - - - - - - - - - - - +
52  // : Payload Data continued ... :
53  // + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
54  // | Payload Data continued ... |
55  // +---------------------------------------------------------------+
56 
57  /// A websocket connection.
58  template <typename Adaptor>
59  class Connection : public connection
60  {
61  public:
62  /// Constructor for a connection.
63 
64  ///
65  /// Requires a request with an "Upgrade: websocket" header.<br>
66  /// Automatically handles the handshake.
67  Connection(const crow::request& req, Adaptor&& adaptor,
68  std::function<void(crow::websocket::connection&)> open_handler,
69  std::function<void(crow::websocket::connection&, const std::string&, bool)> message_handler,
70  std::function<void(crow::websocket::connection&, const std::string&)> close_handler,
71  std::function<void(crow::websocket::connection&)> error_handler,
72  std::function<bool(const crow::request&)> accept_handler)
73  : adaptor_(std::move(adaptor)), open_handler_(std::move(open_handler)), message_handler_(std::move(message_handler)), close_handler_(std::move(close_handler)), error_handler_(std::move(error_handler))
74  , accept_handler_(std::move(accept_handler))
75  {
76  if (!boost::iequals(req.get_header_value("upgrade"), "websocket"))
77  {
78  adaptor.close();
79  delete this;
80  return;
81  }
82 
83  if (accept_handler_)
84  {
85  if (!accept_handler_(req))
86  {
87  adaptor.close();
88  delete this;
89  return;
90  }
91  }
92 
93  // Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
94  // Sec-WebSocket-Version: 13
95  std::string magic = req.get_header_value("Sec-WebSocket-Key") + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
96  sha1::SHA1 s;
97  s.processBytes(magic.data(), magic.size());
98  uint8_t digest[20];
99  s.getDigestBytes(digest);
100  start(crow::utility::base64encode((char*)digest, 20));
101  }
102 
103  /// Send data through the socket.
104  template<typename CompletionHandler>
105  void dispatch(CompletionHandler handler)
106  {
107  adaptor_.get_io_service().dispatch(handler);
108  }
109 
110  /// Send data through the socket and return immediately.
111  template<typename CompletionHandler>
112  void post(CompletionHandler handler)
113  {
114  adaptor_.get_io_service().post(handler);
115  }
116 
117  /// Send a "Ping" message.
118 
119  ///
120  /// Usually invoked to check if the other point is still online.
121  void send_ping(const std::string& msg) override
122  {
123  dispatch([this, msg]{
124  auto header = build_header(0x9, msg.size());
125  write_buffers_.emplace_back(std::move(header));
126  write_buffers_.emplace_back(msg);
127  do_write();
128  });
129  }
130 
131  /// Send a "Pong" message.
132 
133  ///
134  /// Usually automatically invoked as a response to a "Ping" message.
135  void send_pong(const std::string& msg) override
136  {
137  dispatch([this, msg]{
138  auto header = build_header(0xA, msg.size());
139  write_buffers_.emplace_back(std::move(header));
140  write_buffers_.emplace_back(msg);
141  do_write();
142  });
143  }
144 
145  /// Send a binary encoded message.
146  void send_binary(const std::string& msg) override
147  {
148  dispatch([this, msg]{
149  auto header = build_header(2, msg.size());
150  write_buffers_.emplace_back(std::move(header));
151  write_buffers_.emplace_back(msg);
152  do_write();
153  });
154  }
155 
156  /// Send a plaintext message.
157  void send_text(const std::string& msg) override
158  {
159  dispatch([this, msg]{
160  auto header = build_header(1, msg.size());
161  write_buffers_.emplace_back(std::move(header));
162  write_buffers_.emplace_back(msg);
163  do_write();
164  });
165  }
166 
167  /// Send a close signal.
168 
169  ///
170  /// Sets a flag to destroy the object once the message is sent.
171  void close(const std::string& msg) override
172  {
173  dispatch([this, msg]{
174  has_sent_close_ = true;
175  if (has_recv_close_ && !is_close_handler_called_)
176  {
177  is_close_handler_called_ = true;
178  if (close_handler_)
179  close_handler_(*this, msg);
180  }
181  auto header = build_header(0x8, msg.size());
182  write_buffers_.emplace_back(std::move(header));
183  write_buffers_.emplace_back(msg);
184  do_write();
185  });
186  }
187 
188  protected:
189 
190  /// Generate the websocket headers using an opcode and the message size (in bytes).
191  std::string build_header(int opcode, size_t size)
192  {
193  char buf[2+8] = "\x80\x00";
194  buf[0] += opcode;
195  if (size < 126)
196  {
197  buf[1] += static_cast<char>(size);
198  return {buf, buf+2};
199  }
200  else if (size < 0x10000)
201  {
202  buf[1] += 126;
203  *(uint16_t*)(buf+2) = htons(static_cast<uint16_t>(size));
204  return {buf, buf+4};
205  }
206  else
207  {
208  buf[1] += 127;
209  *reinterpret_cast<uint64_t*>(buf+2) = ((1==htonl(1)) ? static_cast<uint64_t>(size) : (static_cast<uint64_t>(htonl((size) & 0xFFFFFFFF)) << 32) | htonl(static_cast<uint64_t>(size) >> 32));
210  return {buf, buf+10};
211  }
212  }
213 
214  /// Send the HTTP upgrade response.
215 
216  ///
217  /// Finishes the handshake process, then starts reading messages from the socket.
218  void start(std::string&& hello)
219  {
220  static std::string header = "HTTP/1.1 101 Switching Protocols\r\n"
221  "Upgrade: websocket\r\n"
222  "Connection: Upgrade\r\n"
223  "Sec-WebSocket-Accept: ";
224  static std::string crlf = "\r\n";
225  write_buffers_.emplace_back(header);
226  write_buffers_.emplace_back(std::move(hello));
227  write_buffers_.emplace_back(crlf);
228  write_buffers_.emplace_back(crlf);
229  do_write();
230  if (open_handler_)
231  open_handler_(*this);
232  do_read();
233  }
234 
235  /// Read a websocket message.
236 
237  ///
238  /// Involves:<br>
239  /// Handling headers (opcodes, size).<br>
240  /// Unmasking the payload.<br>
241  /// Reading the actual payload.<br>
242  void do_read()
243  {
244  is_reading = true;
245  switch(state_)
246  {
247  case WebSocketReadState::MiniHeader:
248  {
249  mini_header_ = 0;
250  //boost::asio::async_read(adaptor_.socket(), boost::asio::buffer(&mini_header_, 1),
251  adaptor_.socket().async_read_some(boost::asio::buffer(&mini_header_, 2),
252  [this](const boost::system::error_code& ec, std::size_t
253 #ifdef CROW_ENABLE_DEBUG
254  bytes_transferred
255 #endif
256  )
257 
258  {
259  is_reading = false;
260  mini_header_ = ntohs(mini_header_);
261 #ifdef CROW_ENABLE_DEBUG
262 
263  if (!ec && bytes_transferred != 2)
264  {
265  throw std::runtime_error("WebSocket:MiniHeader:async_read fail:asio bug?");
266  }
267 #endif
268 
269  if (!ec)
270  {
271  if ((mini_header_ & 0x80) == 0x80)
272  has_mask_ = true;
273 
274  if ((mini_header_ & 0x7f) == 127)
275  {
276  state_ = WebSocketReadState::Len64;
277  }
278  else if ((mini_header_ & 0x7f) == 126)
279  {
280  state_ = WebSocketReadState::Len16;
281  }
282  else
283  {
284  remaining_length_ = mini_header_ & 0x7f;
285  state_ = WebSocketReadState::Mask;
286  }
287  do_read();
288  }
289  else
290  {
291  close_connection_ = true;
292  adaptor_.close();
293  if (error_handler_)
294  error_handler_(*this);
295  check_destroy();
296  }
297  });
298  }
299  break;
300  case WebSocketReadState::Len16:
301  {
302  remaining_length_ = 0;
303  remaining_length16_ = 0;
304  boost::asio::async_read(adaptor_.socket(), boost::asio::buffer(&remaining_length16_, 2),
305  [this](const boost::system::error_code& ec, std::size_t
306 #ifdef CROW_ENABLE_DEBUG
307  bytes_transferred
308 #endif
309  )
310  {
311  is_reading = false;
312  remaining_length16_ = ntohs(remaining_length16_);
313  remaining_length_ = remaining_length16_;
314 #ifdef CROW_ENABLE_DEBUG
315  if (!ec && bytes_transferred != 2)
316  {
317  throw std::runtime_error("WebSocket:Len16:async_read fail:asio bug?");
318  }
319 #endif
320 
321  if (!ec)
322  {
323  state_ = WebSocketReadState::Mask;
324  do_read();
325  }
326  else
327  {
328  close_connection_ = true;
329  adaptor_.close();
330  if (error_handler_)
331  error_handler_(*this);
332  check_destroy();
333  }
334  });
335  }
336  break;
337  case WebSocketReadState::Len64:
338  {
339  boost::asio::async_read(adaptor_.socket(), boost::asio::buffer(&remaining_length_, 8),
340  [this](const boost::system::error_code& ec, std::size_t
341 #ifdef CROW_ENABLE_DEBUG
342  bytes_transferred
343 #endif
344  )
345  {
346  is_reading = false;
347  remaining_length_ = ((1==ntohl(1)) ? (remaining_length_) : (static_cast<uint64_t>(ntohl((remaining_length_) & 0xFFFFFFFF)) << 32) | ntohl((remaining_length_) >> 32));
348 #ifdef CROW_ENABLE_DEBUG
349  if (!ec && bytes_transferred != 8)
350  {
351  throw std::runtime_error("WebSocket:Len16:async_read fail:asio bug?");
352  }
353 #endif
354 
355  if (!ec)
356  {
357  state_ = WebSocketReadState::Mask;
358  do_read();
359  }
360  else
361  {
362  close_connection_ = true;
363  adaptor_.close();
364  if (error_handler_)
365  error_handler_(*this);
366  check_destroy();
367  }
368  });
369  }
370  break;
371  case WebSocketReadState::Mask:
372  if (has_mask_)
373  {
374  boost::asio::async_read(adaptor_.socket(), boost::asio::buffer((char*)&mask_, 4),
375  [this](const boost::system::error_code& ec, std::size_t
376 #ifdef CROW_ENABLE_DEBUG
377  bytes_transferred
378 #endif
379  )
380  {
381  is_reading = false;
382 #ifdef CROW_ENABLE_DEBUG
383  if (!ec && bytes_transferred != 4)
384  {
385  throw std::runtime_error("WebSocket:Mask:async_read fail:asio bug?");
386  }
387 #endif
388 
389  if (!ec)
390  {
391  state_ = WebSocketReadState::Payload;
392  do_read();
393  }
394  else
395  {
396  close_connection_ = true;
397  if (error_handler_)
398  error_handler_(*this);
399  adaptor_.close();
400  }
401  });
402  }
403  else
404  {
405  state_ = WebSocketReadState::Payload;
406  do_read();
407  }
408  break;
409  case WebSocketReadState::Payload:
410  {
411  auto to_read = static_cast<std::uint64_t>(buffer_.size());
412  if (remaining_length_ < to_read)
413  to_read = remaining_length_;
414  adaptor_.socket().async_read_some(boost::asio::buffer(buffer_, static_cast<std::size_t>(to_read)),
415  [this](const boost::system::error_code& ec, std::size_t bytes_transferred)
416  {
417  is_reading = false;
418 
419  if (!ec)
420  {
421  fragment_.insert(fragment_.end(), buffer_.begin(), buffer_.begin() + bytes_transferred);
422  remaining_length_ -= bytes_transferred;
423  if (remaining_length_ == 0)
424  {
425  handle_fragment();
426  state_ = WebSocketReadState::MiniHeader;
427  do_read();
428  }
429  else
430  do_read();
431  }
432  else
433  {
434  close_connection_ = true;
435  if (error_handler_)
436  error_handler_(*this);
437  adaptor_.close();
438  }
439  });
440  }
441  break;
442  }
443  }
444 
445  /// Check if the FIN bit is set.
446  bool is_FIN()
447  {
448  return mini_header_ & 0x8000;
449  }
450 
451  /// Extract the opcode from the header.
452  int opcode()
453  {
454  return (mini_header_ & 0x0f00) >> 8;
455  }
456 
457  /// Process the payload fragment.
458 
459  ///
460  /// Unmasks the fragment, checks the opcode, merges fragments into 1 message body, and calls the appropriate handler.
462  {
463  if (has_mask_)
464  {
465  for(decltype(fragment_.length()) i = 0; i < fragment_.length(); i ++)
466  {
467  fragment_[i] ^= ((char*)&mask_)[i%4];
468  }
469  }
470  switch(opcode())
471  {
472  case 0: // Continuation
473  {
474  message_ += fragment_;
475  if (is_FIN())
476  {
477  if (message_handler_)
478  message_handler_(*this, message_, is_binary_);
479  message_.clear();
480  }
481  }
482  break;
483  case 1: // Text
484  {
485  is_binary_ = false;
486  message_ += fragment_;
487  if (is_FIN())
488  {
489  if (message_handler_)
490  message_handler_(*this, message_, is_binary_);
491  message_.clear();
492  }
493  }
494  break;
495  case 2: // Binary
496  {
497  is_binary_ = true;
498  message_ += fragment_;
499  if (is_FIN())
500  {
501  if (message_handler_)
502  message_handler_(*this, message_, is_binary_);
503  message_.clear();
504  }
505  }
506  break;
507  case 0x8: // Close
508  {
509  has_recv_close_ = true;
510  if (!has_sent_close_)
511  {
512  close(fragment_);
513  }
514  else
515  {
516  adaptor_.close();
517  close_connection_ = true;
518  if (!is_close_handler_called_)
519  {
520  if (close_handler_)
521  close_handler_(*this, fragment_);
522  is_close_handler_called_ = true;
523  }
524  check_destroy();
525  }
526  }
527  break;
528  case 0x9: // Ping
529  {
530  send_pong(fragment_);
531  }
532  break;
533  case 0xA: // Pong
534  {
535  pong_received_ = true;
536  }
537  break;
538  }
539 
540  fragment_.clear();
541  }
542 
543  /// Send the buffers' data through the socket.
544 
545  ///
546  /// Also destroyes the object if the Close flag is set.
547  void do_write()
548  {
549  if (sending_buffers_.empty())
550  {
551  sending_buffers_.swap(write_buffers_);
552  std::vector<boost::asio::const_buffer> buffers;
553  buffers.reserve(sending_buffers_.size());
554  for(auto& s:sending_buffers_)
555  {
556  buffers.emplace_back(boost::asio::buffer(s));
557  }
558  boost::asio::async_write(adaptor_.socket(), buffers,
559  [&](const boost::system::error_code& ec, std::size_t /*bytes_transferred*/)
560  {
561  sending_buffers_.clear();
562  if (!ec && !close_connection_)
563  {
564  if (!write_buffers_.empty())
565  do_write();
566  if (has_sent_close_)
567  close_connection_ = true;
568  }
569  else
570  {
571  close_connection_ = true;
572  check_destroy();
573  }
574  });
575  }
576  }
577 
578  /// Destroy the Connection.
580  {
581  //if (has_sent_close_ && has_recv_close_)
582  if (!is_close_handler_called_)
583  if (close_handler_)
584  close_handler_(*this, "uncleanly");
585  if (sending_buffers_.empty() && !is_reading)
586  delete this;
587  }
588  private:
589  Adaptor adaptor_;
590 
591  std::vector<std::string> sending_buffers_;
592  std::vector<std::string> write_buffers_;
593 
594  boost::array<char, 4096> buffer_;
595  bool is_binary_;
596  std::string message_;
597  std::string fragment_;
598  WebSocketReadState state_{WebSocketReadState::MiniHeader};
599  uint16_t remaining_length16_{0};
600  uint64_t remaining_length_{0};
601  bool close_connection_{false};
602  bool is_reading{false};
603  bool has_mask_{false};
604  uint32_t mask_;
605  uint16_t mini_header_;
606  bool has_sent_close_{false};
607  bool has_recv_close_{false};
608  bool error_occured_{false};
609  bool pong_received_{false};
610  bool is_close_handler_called_{false};
611 
612  std::function<void(crow::websocket::connection&)> open_handler_;
613  std::function<void(crow::websocket::connection&, const std::string&, bool)> message_handler_;
614  std::function<void(crow::websocket::connection&, const std::string&)> close_handler_;
615  std::function<void(crow::websocket::connection&)> error_handler_;
616  std::function<bool(const crow::request&)> accept_handler_;
617  };
618  }
619 }
crow::websocket::Connection::is_FIN
bool is_FIN()
Check if the FIN bit is set.
Definition: websocket.h:446
crow::websocket::Connection::build_header
std::string build_header(int opcode, size_t size)
Generate the websocket headers using an opcode and the message size (in bytes).
Definition: websocket.h:191
crow::websocket::Connection::start
void start(std::string &&hello)
Send the HTTP upgrade response.
Definition: websocket.h:218
crow::websocket::Connection::close
void close(const std::string &msg) override
Send a close signal.
Definition: websocket.h:171
crow::websocket::Connection::send_ping
void send_ping(const std::string &msg) override
Send a "Ping" message.
Definition: websocket.h:121
crow::websocket::connection
A base class for websocket connection.
Definition: websocket.h:22
crow::websocket::Connection::send_pong
void send_pong(const std::string &msg) override
Send a "Pong" message.
Definition: websocket.h:135
crow::websocket::Connection::handle_fragment
void handle_fragment()
Process the payload fragment.
Definition: websocket.h:461
crow::request
An HTTP request.
Definition: http_request.h:26
crow::websocket::Connection::Connection
Connection(const crow::request &req, Adaptor &&adaptor, std::function< void(crow::websocket::connection &)> open_handler, std::function< void(crow::websocket::connection &, const std::string &, bool)> message_handler, std::function< void(crow::websocket::connection &, const std::string &)> close_handler, std::function< void(crow::websocket::connection &)> error_handler, std::function< bool(const crow::request &)> accept_handler)
Constructor for a connection.
Definition: websocket.h:67
crow::websocket::Connection::post
void post(CompletionHandler handler)
Send data through the socket and return immediately.
Definition: websocket.h:112
crow::websocket::Connection::send_text
void send_text(const std::string &msg) override
Send a plaintext message.
Definition: websocket.h:157
crow::websocket::Connection::dispatch
void dispatch(CompletionHandler handler)
Send data through the socket.
Definition: websocket.h:105
crow::websocket::Connection::do_write
void do_write()
Send the buffers' data through the socket.
Definition: websocket.h:547
crow::websocket::Connection::do_read
void do_read()
Read a websocket message.
Definition: websocket.h:242
crow::websocket::Connection::check_destroy
void check_destroy()
Destroy the Connection.
Definition: websocket.h:579
crow::websocket::Connection::opcode
int opcode()
Extract the opcode from the header.
Definition: websocket.h:452
crow::websocket::Connection::send_binary
void send_binary(const std::string &msg) override
Send a binary encoded message.
Definition: websocket.h:146
crow::websocket::Connection
A websocket connection.
Definition: websocket.h:59