Crow  1.0
A C++ microframework for the web
websocket.h
1 #pragma once
2 #include <array>
3 #include "crow/logging.h"
4 #include "crow/socket_adaptors.h"
5 #include "crow/http_request.h"
6 #include "crow/TinySHA1.hpp"
7 #include "crow/utility.h"
8 
9 namespace crow
10 {
11  namespace websocket
12  {
13  enum class WebSocketReadState
14  {
15  MiniHeader,
16  Len16,
17  Len64,
18  Mask,
19  Payload,
20  };
21 
22  /// A base class for websocket connection.
23  struct connection
24  {
25  virtual void send_binary(std::string msg) = 0;
26  virtual void send_text(std::string msg) = 0;
27  virtual void send_ping(std::string msg) = 0;
28  virtual void send_pong(std::string msg) = 0;
29  virtual void close(std::string const& msg = "quit") = 0;
30  virtual std::string get_remote_ip() = 0;
31  virtual ~connection() = default;
32 
33  void userdata(void* u) { userdata_ = u; }
34  void* userdata() { return userdata_; }
35 
36  private:
37  void* userdata_;
38  };
39 
40  // Modified version of the illustration in RFC6455 Section-5.2
41  //
42  //
43  // 0 1 2 3 -byte
44  // 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 -bit
45  // +-+-+-+-+-------+-+-------------+-------------------------------+
46  // |F|R|R|R| opcode|M| Payload len | Extended payload length |
47  // |I|S|S|S| (4) |A| (7) | (16/64) |
48  // |N|V|V|V| |S| | (if payload len==126/127) |
49  // | |1|2|3| |K| | |
50  // +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
51  // | Extended payload length continued, if payload len == 127 |
52  // + - - - - - - - - - - - - - - - +-------------------------------+
53  // | |Masking-key, if MASK set to 1 |
54  // +-------------------------------+-------------------------------+
55  // | Masking-key (continued) | Payload Data |
56  // +-------------------------------- - - - - - - - - - - - - - - - +
57  // : Payload Data continued ... :
58  // + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
59  // | Payload Data continued ... |
60  // +---------------------------------------------------------------+
61  //
62 
63  /// A websocket connection.
64 
65  template<typename Adaptor, typename Handler>
66  class Connection : public connection
67  {
68  public:
69  /// Constructor for a connection.
70 
71  ///
72  /// Requires a request with an "Upgrade: websocket" header.<br>
73  /// Automatically handles the handshake.
74  Connection(const crow::request& req, Adaptor&& adaptor, Handler* handler, uint64_t max_payload,
75  std::function<void(crow::websocket::connection&)> open_handler,
76  std::function<void(crow::websocket::connection&, const std::string&, bool)> message_handler,
77  std::function<void(crow::websocket::connection&, const std::string&)> close_handler,
78  std::function<void(crow::websocket::connection&, const std::string&)> error_handler,
79  std::function<bool(const crow::request&, void**)> accept_handler):
80  adaptor_(std::move(adaptor)),
81  handler_(handler),
82  max_payload_bytes_(max_payload),
83  open_handler_(std::move(open_handler)),
84  message_handler_(std::move(message_handler)),
85  close_handler_(std::move(close_handler)),
86  error_handler_(std::move(error_handler)),
87  accept_handler_(std::move(accept_handler))
88  {
89  if (!utility::string_equals(req.get_header_value("upgrade"), "websocket"))
90  {
91  adaptor_.close();
92  handler_->remove_websocket(this);
93  delete this;
94  return;
95  }
96 
97  if (accept_handler_)
98  {
99  void* ud = nullptr;
100  if (!accept_handler_(req, &ud))
101  {
102  adaptor_.close();
103  handler_->remove_websocket(this);
104  delete this;
105  return;
106  }
107  userdata(ud);
108  }
109 
110  // Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
111  // Sec-WebSocket-Version: 13
112  std::string magic = req.get_header_value("Sec-WebSocket-Key") + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
113  sha1::SHA1 s;
114  s.processBytes(magic.data(), magic.size());
115  uint8_t digest[20];
116  s.getDigestBytes(digest);
117 
118  start(crow::utility::base64encode((unsigned char*)digest, 20));
119  }
120 
121  ~Connection() noexcept override
122  {
123  // Do not modify anchor_ here since writing shared_ptr is not atomic.
124  auto watch = std::weak_ptr<void>{anchor_};
125 
126  // Wait until all unhandled asynchronous operations to join.
127  // As the deletion occurs inside 'check_destroy()', which already locks
128  // anchor, use count can be 1 on valid deletion context.
129  while (watch.use_count() > 2) // 1 for 'check_destroy() routine', 1 for 'this->anchor_'
130  {
131  std::this_thread::yield();
132  }
133  }
134 
135  template<typename Callable>
137  {
138  Callable callable;
139  std::weak_ptr<void> watch;
140 
141  void operator()()
142  {
143  if (auto anchor = watch.lock())
144  {
145  std::move(callable)();
146  }
147  }
148  };
149 
150  /// Send data through the socket.
151  template<typename CompletionHandler>
152  void dispatch(CompletionHandler&& handler)
153  {
154  asio::dispatch(adaptor_.get_io_service(),
155  WeakWrappedMessage<typename std::decay<CompletionHandler>::type>{
156  std::forward<CompletionHandler>(handler), anchor_});
157  }
158 
159  /// Send data through the socket and return immediately.
160  template<typename CompletionHandler>
161  void post(CompletionHandler&& handler)
162  {
163  asio::post(adaptor_.get_io_service(),
164  WeakWrappedMessage<typename std::decay<CompletionHandler>::type>{
165  std::forward<CompletionHandler>(handler), anchor_});
166  }
167 
168  /// Send a "Ping" message.
169 
170  ///
171  /// Usually invoked to check if the other point is still online.
172  void send_ping(std::string msg) override
173  {
174  send_data(0x9, std::move(msg));
175  }
176 
177  /// Send a "Pong" message.
178 
179  ///
180  /// Usually automatically invoked as a response to a "Ping" message.
181  void send_pong(std::string msg) override
182  {
183  send_data(0xA, std::move(msg));
184  }
185 
186  /// Send a binary encoded message.
187  void send_binary(std::string msg) override
188  {
189  send_data(0x2, std::move(msg));
190  }
191 
192  /// Send a plaintext message.
193  void send_text(std::string msg) override
194  {
195  send_data(0x1, std::move(msg));
196  }
197 
198  /// Send a close signal.
199 
200  ///
201  /// Sets a flag to destroy the object once the message is sent.
202  void close(std::string const& msg) override
203  {
204  dispatch([this, msg]() mutable {
205  has_sent_close_ = true;
206  if (has_recv_close_ && !is_close_handler_called_)
207  {
208  is_close_handler_called_ = true;
209  if (close_handler_)
210  close_handler_(*this, msg);
211  }
212  auto header = build_header(0x8, msg.size());
213  write_buffers_.emplace_back(std::move(header));
214  write_buffers_.emplace_back(msg);
215  do_write();
216  });
217  }
218 
219  std::string get_remote_ip() override
220  {
221  return adaptor_.remote_endpoint().address().to_string();
222  }
223 
224  void set_max_payload_size(uint64_t payload)
225  {
226  max_payload_bytes_ = payload;
227  }
228 
229  protected:
230  /// Generate the websocket headers using an opcode and the message size (in bytes).
231  std::string build_header(int opcode, size_t size)
232  {
233  char buf[2 + 8] = "\x80\x00";
234  buf[0] += opcode;
235  if (size < 126)
236  {
237  buf[1] += static_cast<char>(size);
238  return {buf, buf + 2};
239  }
240  else if (size < 0x10000)
241  {
242  buf[1] += 126;
243  *(uint16_t*)(buf + 2) = htons(static_cast<uint16_t>(size));
244  return {buf, buf + 4};
245  }
246  else
247  {
248  buf[1] += 127;
249  *reinterpret_cast<uint64_t*>(buf + 2) = ((1 == htonl(1)) ? static_cast<uint64_t>(size) : (static_cast<uint64_t>(htonl((size)&0xFFFFFFFF)) << 32) | htonl(static_cast<uint64_t>(size) >> 32));
250  return {buf, buf + 10};
251  }
252  }
253 
254  /// Send the HTTP upgrade response.
255 
256  ///
257  /// Finishes the handshake process, then starts reading messages from the socket.
258  void start(std::string&& hello)
259  {
260  static const std::string header =
261  "HTTP/1.1 101 Switching Protocols\r\n"
262  "Upgrade: websocket\r\n"
263  "Connection: Upgrade\r\n"
264  "Sec-WebSocket-Accept: ";
265  write_buffers_.emplace_back(header);
266  write_buffers_.emplace_back(std::move(hello));
267  write_buffers_.emplace_back(crlf);
268  write_buffers_.emplace_back(crlf);
269  do_write();
270  if (open_handler_)
271  open_handler_(*this);
272  do_read();
273  }
274 
275  /// Read a websocket message.
276 
277  ///
278  /// Involves:<br>
279  /// Handling headers (opcodes, size).<br>
280  /// Unmasking the payload.<br>
281  /// Reading the actual payload.<br>
282  void do_read()
283  {
284  if (has_sent_close_ && has_recv_close_)
285  {
286  close_connection_ = true;
287  adaptor_.shutdown_readwrite();
288  adaptor_.close();
289  check_destroy();
290  return;
291  }
292 
293  is_reading = true;
294  switch (state_)
295  {
296  case WebSocketReadState::MiniHeader:
297  {
298  mini_header_ = 0;
299  //asio::async_read(adaptor_.socket(), asio::buffer(&mini_header_, 1),
300  adaptor_.socket().async_read_some(
301  asio::buffer(&mini_header_, 2),
302  [this](const asio::error_code& ec, std::size_t
303 #ifdef CROW_ENABLE_DEBUG
304  bytes_transferred
305 #endif
306  )
307 
308  {
309  is_reading = false;
310  mini_header_ = ntohs(mini_header_);
311 #ifdef CROW_ENABLE_DEBUG
312 
313  if (!ec && bytes_transferred != 2)
314  {
315  throw std::runtime_error("WebSocket:MiniHeader:async_read fail:asio bug?");
316  }
317 #endif
318 
319  if (!ec)
320  {
321  if ((mini_header_ & 0x80) == 0x80)
322  has_mask_ = true;
323  else //if the websocket specification is enforced and the message isn't masked, terminate the connection
324  {
325 #ifndef CROW_ENFORCE_WS_SPEC
326  has_mask_ = false;
327 #else
328  close_connection_ = true;
329  adaptor_.shutdown_readwrite();
330  adaptor_.close();
331  if (error_handler_)
332  error_handler_(*this, "Client connection not masked.");
333  check_destroy();
334 #endif
335  }
336 
337  if ((mini_header_ & 0x7f) == 127)
338  {
339  state_ = WebSocketReadState::Len64;
340  }
341  else if ((mini_header_ & 0x7f) == 126)
342  {
343  state_ = WebSocketReadState::Len16;
344  }
345  else
346  {
347  remaining_length_ = mini_header_ & 0x7f;
348  state_ = WebSocketReadState::Mask;
349  }
350  do_read();
351  }
352  else
353  {
354  close_connection_ = true;
355  adaptor_.shutdown_readwrite();
356  adaptor_.close();
357  if (error_handler_)
358  error_handler_(*this, ec.message());
359  check_destroy();
360  }
361  });
362  }
363  break;
364  case WebSocketReadState::Len16:
365  {
366  remaining_length_ = 0;
367  remaining_length16_ = 0;
368  asio::async_read(
369  adaptor_.socket(), asio::buffer(&remaining_length16_, 2),
370  [this](const asio::error_code& ec, std::size_t
371 #ifdef CROW_ENABLE_DEBUG
372  bytes_transferred
373 #endif
374  ) {
375  is_reading = false;
376  remaining_length16_ = ntohs(remaining_length16_);
377  remaining_length_ = remaining_length16_;
378 #ifdef CROW_ENABLE_DEBUG
379  if (!ec && bytes_transferred != 2)
380  {
381  throw std::runtime_error("WebSocket:Len16:async_read fail:asio bug?");
382  }
383 #endif
384 
385  if (!ec)
386  {
387  state_ = WebSocketReadState::Mask;
388  do_read();
389  }
390  else
391  {
392  close_connection_ = true;
393  adaptor_.shutdown_readwrite();
394  adaptor_.close();
395  if (error_handler_)
396  error_handler_(*this, ec.message());
397  check_destroy();
398  }
399  });
400  }
401  break;
402  case WebSocketReadState::Len64:
403  {
404  asio::async_read(
405  adaptor_.socket(), asio::buffer(&remaining_length_, 8),
406  [this](const asio::error_code& ec, std::size_t
407 #ifdef CROW_ENABLE_DEBUG
408  bytes_transferred
409 #endif
410  ) {
411  is_reading = false;
412  remaining_length_ = ((1 == ntohl(1)) ? (remaining_length_) : (static_cast<uint64_t>(ntohl((remaining_length_)&0xFFFFFFFF)) << 32) | ntohl((remaining_length_) >> 32));
413 #ifdef CROW_ENABLE_DEBUG
414  if (!ec && bytes_transferred != 8)
415  {
416  throw std::runtime_error("WebSocket:Len16:async_read fail:asio bug?");
417  }
418 #endif
419 
420  if (!ec)
421  {
422  state_ = WebSocketReadState::Mask;
423  do_read();
424  }
425  else
426  {
427  close_connection_ = true;
428  adaptor_.shutdown_readwrite();
429  adaptor_.close();
430  if (error_handler_)
431  error_handler_(*this, ec.message());
432  check_destroy();
433  }
434  });
435  }
436  break;
437  case WebSocketReadState::Mask:
438  if (remaining_length_ > max_payload_bytes_)
439  {
440  close_connection_ = true;
441  adaptor_.close();
442  if (error_handler_)
443  error_handler_(*this, "Message length exceeds maximum payload.");
444  check_destroy();
445  }
446  else if (has_mask_)
447  {
448  asio::async_read(
449  adaptor_.socket(), asio::buffer((char*)&mask_, 4),
450  [this](const asio::error_code& ec, std::size_t
451 #ifdef CROW_ENABLE_DEBUG
452  bytes_transferred
453 #endif
454  ) {
455  is_reading = false;
456 #ifdef CROW_ENABLE_DEBUG
457  if (!ec && bytes_transferred != 4)
458  {
459  throw std::runtime_error("WebSocket:Mask:async_read fail:asio bug?");
460  }
461 #endif
462 
463  if (!ec)
464  {
465  state_ = WebSocketReadState::Payload;
466  do_read();
467  }
468  else
469  {
470  close_connection_ = true;
471  if (error_handler_)
472  error_handler_(*this, ec.message());
473  adaptor_.shutdown_readwrite();
474  adaptor_.close();
475  check_destroy();
476  }
477  });
478  }
479  else
480  {
481  state_ = WebSocketReadState::Payload;
482  do_read();
483  }
484  break;
485  case WebSocketReadState::Payload:
486  {
487  auto to_read = static_cast<std::uint64_t>(buffer_.size());
488  if (remaining_length_ < to_read)
489  to_read = remaining_length_;
490  adaptor_.socket().async_read_some(
491  asio::buffer(buffer_, static_cast<std::size_t>(to_read)),
492  [this](const asio::error_code& ec, std::size_t bytes_transferred) {
493  is_reading = false;
494 
495  if (!ec)
496  {
497  fragment_.insert(fragment_.end(), buffer_.begin(), buffer_.begin() + bytes_transferred);
498  remaining_length_ -= bytes_transferred;
499  if (remaining_length_ == 0)
500  {
501  if (handle_fragment())
502  {
503  state_ = WebSocketReadState::MiniHeader;
504  do_read();
505  }
506  }
507  else
508  do_read();
509  }
510  else
511  {
512  close_connection_ = true;
513  if (error_handler_)
514  error_handler_(*this, ec.message());
515  adaptor_.shutdown_readwrite();
516  adaptor_.close();
517  check_destroy();
518  }
519  });
520  }
521  break;
522  }
523  }
524 
525  /// Check if the FIN bit is set.
526  bool is_FIN()
527  {
528  return mini_header_ & 0x8000;
529  }
530 
531  /// Extract the opcode from the header.
532  int opcode()
533  {
534  return (mini_header_ & 0x0f00) >> 8;
535  }
536 
537  /// Process the payload fragment.
538 
539  ///
540  /// Unmasks the fragment, checks the opcode, merges fragments into 1 message body, and calls the appropriate handler.
542  {
543  if (has_mask_)
544  {
545  for (decltype(fragment_.length()) i = 0; i < fragment_.length(); i++)
546  {
547  fragment_[i] ^= ((char*)&mask_)[i % 4];
548  }
549  }
550  switch (opcode())
551  {
552  case 0: // Continuation
553  {
554  message_ += fragment_;
555  if (is_FIN())
556  {
557  if (message_handler_)
558  message_handler_(*this, message_, is_binary_);
559  message_.clear();
560  }
561  }
562  break;
563  case 1: // Text
564  {
565  is_binary_ = false;
566  message_ += fragment_;
567  if (is_FIN())
568  {
569  if (message_handler_)
570  message_handler_(*this, message_, is_binary_);
571  message_.clear();
572  }
573  }
574  break;
575  case 2: // Binary
576  {
577  is_binary_ = true;
578  message_ += fragment_;
579  if (is_FIN())
580  {
581  if (message_handler_)
582  message_handler_(*this, message_, is_binary_);
583  message_.clear();
584  }
585  }
586  break;
587  case 0x8: // Close
588  {
589  has_recv_close_ = true;
590  if (!has_sent_close_)
591  {
592  close(fragment_);
593  }
594  else
595  {
596  adaptor_.shutdown_readwrite();
597  adaptor_.close();
598  close_connection_ = true;
599  if (!is_close_handler_called_)
600  {
601  if (close_handler_)
602  close_handler_(*this, fragment_);
603  is_close_handler_called_ = true;
604  }
605  check_destroy();
606  return false;
607  }
608  }
609  break;
610  case 0x9: // Ping
611  {
612  send_pong(fragment_);
613  }
614  break;
615  case 0xA: // Pong
616  {
617  pong_received_ = true;
618  }
619  break;
620  }
621 
622  fragment_.clear();
623  return true;
624  }
625 
626  /// Send the buffers' data through the socket.
627 
628  ///
629  /// Also destroys the object if the Close flag is set.
630  void do_write()
631  {
632  if (sending_buffers_.empty())
633  {
634  sending_buffers_.swap(write_buffers_);
635  std::vector<asio::const_buffer> buffers;
636  buffers.reserve(sending_buffers_.size());
637  for (auto& s : sending_buffers_)
638  {
639  buffers.emplace_back(asio::buffer(s));
640  }
641  auto watch = std::weak_ptr<void>{anchor_};
642  asio::async_write(
643  adaptor_.socket(), buffers,
644  [&, watch](const asio::error_code& ec, std::size_t /*bytes_transferred*/) {
645  if (!ec && !close_connection_)
646  {
647  sending_buffers_.clear();
648  if (!write_buffers_.empty())
649  do_write();
650  if (has_sent_close_)
651  close_connection_ = true;
652  }
653  else
654  {
655  auto anchor = watch.lock();
656  if (anchor == nullptr) { return; }
657 
658  sending_buffers_.clear();
659  close_connection_ = true;
660  check_destroy();
661  }
662  });
663  }
664  }
665 
666  /// Destroy the Connection.
668  {
669  //if (has_sent_close_ && has_recv_close_)
670  if (!is_close_handler_called_)
671  if (close_handler_)
672  close_handler_(*this, "uncleanly");
673  handler_->remove_websocket(this);
674  if (sending_buffers_.empty() && !is_reading)
675  delete this;
676  }
677 
678 
680  {
681  std::string payload;
682  Connection* self;
683  int opcode;
684 
685  void operator()()
686  {
687  self->send_data_impl(this);
688  }
689  };
690 
691  void send_data_impl(SendMessageType* s)
692  {
693  auto header = build_header(s->opcode, s->payload.size());
694  write_buffers_.emplace_back(std::move(header));
695  write_buffers_.emplace_back(std::move(s->payload));
696  do_write();
697  }
698 
699  void send_data(int opcode, std::string&& msg)
700  {
701  SendMessageType event_arg{
702  std::move(msg),
703  this,
704  opcode};
705 
706  post(std::move(event_arg));
707  }
708 
709  private:
710  Adaptor adaptor_;
711  Handler* handler_;
712 
713  std::vector<std::string> sending_buffers_;
714  std::vector<std::string> write_buffers_;
715 
716  std::array<char, 4096> buffer_;
717  bool is_binary_;
718  std::string message_;
719  std::string fragment_;
720  WebSocketReadState state_{WebSocketReadState::MiniHeader};
721  uint16_t remaining_length16_{0};
722  uint64_t remaining_length_{0};
723  uint64_t max_payload_bytes_{UINT64_MAX};
724  bool close_connection_{false};
725  bool is_reading{false};
726  bool has_mask_{false};
727  uint32_t mask_;
728  uint16_t mini_header_;
729  bool has_sent_close_{false};
730  bool has_recv_close_{false};
731  bool error_occurred_{false};
732  bool pong_received_{false};
733  bool is_close_handler_called_{false};
734 
735  std::shared_ptr<void> anchor_ = std::make_shared<int>(); // Value is just for placeholding
736 
737  std::function<void(crow::websocket::connection&)> open_handler_;
738  std::function<void(crow::websocket::connection&, const std::string&, bool)> message_handler_;
739  std::function<void(crow::websocket::connection&, const std::string&)> close_handler_;
740  std::function<void(crow::websocket::connection&, const std::string&)> error_handler_;
741  std::function<bool(const crow::request&, void**)> accept_handler_;
742  };
743  } // namespace websocket
744 } // namespace crow
crow::websocket::Connection::send_text
void send_text(std::string msg) override
Send a plaintext message.
Definition: websocket.h:193
crow::websocket::Connection::do_read
void do_read()
Read a websocket message.
Definition: websocket.h:282
crow::websocket::Connection::SendMessageType
Definition: websocket.h:679
crow::websocket::Connection::post
void post(CompletionHandler &&handler)
Send data through the socket and return immediately.
Definition: websocket.h:161
crow::websocket::Connection::WeakWrappedMessage
Definition: websocket.h:136
crow::websocket::Connection::build_header
std::string build_header(int opcode, size_t size)
Generate the websocket headers using an opcode and the message size (in bytes).
Definition: websocket.h:231
crow::websocket::Connection::do_write
void do_write()
Send the buffers' data through the socket.
Definition: websocket.h:630
crow::websocket::Connection::handle_fragment
bool handle_fragment()
Process the payload fragment.
Definition: websocket.h:541
crow::websocket::connection
A base class for websocket connection.
Definition: websocket.h:23
crow::websocket::Connection::is_FIN
bool is_FIN()
Check if the FIN bit is set.
Definition: websocket.h:526
crow::websocket::Connection::close
void close(std::string const &msg) override
Send a close signal.
Definition: websocket.h:202
crow::request
An HTTP request.
Definition: http_request.h:27
crow::websocket::Connection::send_pong
void send_pong(std::string msg) override
Send a "Pong" message.
Definition: websocket.h:181
crow::websocket::Connection::start
void start(std::string &&hello)
Send the HTTP upgrade response.
Definition: websocket.h:258
crow::websocket::Connection::dispatch
void dispatch(CompletionHandler &&handler)
Send data through the socket.
Definition: websocket.h:152
crow::websocket::Connection::check_destroy
void check_destroy()
Destroy the Connection.
Definition: websocket.h:667
crow::websocket::Connection::send_binary
void send_binary(std::string msg) override
Send a binary encoded message.
Definition: websocket.h:187
crow::websocket::Connection::send_ping
void send_ping(std::string msg) override
Send a "Ping" message.
Definition: websocket.h:172
crow::websocket::Connection::Connection
Connection(const crow::request &req, Adaptor &&adaptor, Handler *handler, uint64_t max_payload, std::function< void(crow::websocket::connection &)> open_handler, std::function< void(crow::websocket::connection &, const std::string &, bool)> message_handler, std::function< void(crow::websocket::connection &, const std::string &)> close_handler, std::function< void(crow::websocket::connection &, const std::string &)> error_handler, std::function< bool(const crow::request &, void **)> accept_handler)
Constructor for a connection.
Definition: websocket.h:74
crow::websocket::Connection
A websocket connection.
Definition: websocket.h:66
crow::websocket::Connection::opcode
int opcode()
Extract the opcode from the header.
Definition: websocket.h:532