Crow  1.1
A C++ microframework for the web
 
Loading...
Searching...
No Matches
http_connection.h
1#pragma once
2
3#ifdef CROW_USE_BOOST
4#include <boost/asio.hpp>
5#else
6#ifndef ASIO_STANDALONE
7#define ASIO_STANDALONE
8#endif
9#include <asio.hpp>
10#endif
11
12#include <algorithm>
13#include <atomic>
14#include <chrono>
15#include <memory>
16#include <vector>
17
18#include "crow/http_parser_merged.h"
19#include "crow/common.h"
20#include "crow/compression.h"
21#include "crow/http_response.h"
22#include "crow/logging.h"
23#include "crow/middleware.h"
24#include "crow/middleware_context.h"
25#include "crow/parser.h"
26#include "crow/settings.h"
27#include "crow/socket_adaptors.h"
28#include "crow/task_timer.h"
29#include "crow/utility.h"
30
31namespace crow
32{
33#ifdef CROW_USE_BOOST
34 namespace asio = boost::asio;
35 using error_code = boost::system::error_code;
36#else
37 using error_code = asio::error_code;
38#endif
39 using tcp = asio::ip::tcp;
40
41#ifdef CROW_ENABLE_DEBUG
42 static std::atomic<int> connectionCount;
43#endif
44
45 /// An HTTP connection.
46 template<typename Adaptor, typename Handler, typename... Middlewares>
47 class Connection : public std::enable_shared_from_this<Connection<Adaptor, Handler, Middlewares...>>
48 {
49 friend struct crow::response;
50
51 public:
53 asio::io_context& io_context,
54 Handler* handler,
55 const std::string& server_name,
56 std::tuple<Middlewares...>* middlewares,
57 std::function<std::string()>& get_cached_date_str_f,
58 detail::task_timer& task_timer,
59 typename Adaptor::context* adaptor_ctx_,
60 std::atomic<unsigned int>& queue_length):
61 adaptor_(io_context, adaptor_ctx_),
62 handler_(handler),
63 parser_(this),
64 req_(parser_.req),
65 server_name_(server_name),
66 middlewares_(middlewares),
67 get_cached_date_str(get_cached_date_str_f),
68 task_timer_(task_timer),
69 res_stream_threshold_(handler->stream_threshold()),
70 queue_length_(queue_length)
71 {
72 queue_length_++;
73#ifdef CROW_ENABLE_DEBUG
74 connectionCount++;
75 CROW_LOG_DEBUG << "Connection (" << this << ") allocated, total: " << connectionCount;
76#endif
77 }
78
80 {
81 queue_length_--;
82#ifdef CROW_ENABLE_DEBUG
83 connectionCount--;
84 CROW_LOG_DEBUG << "Connection (" << this << ") freed, total: " << connectionCount;
85#endif
86 }
87
88 /// The TCP socket on top of which the connection is established.
89 decltype(std::declval<Adaptor>().raw_socket())& socket()
90 {
91 return adaptor_.raw_socket();
92 }
93
94 void start()
95 {
96 auto self = this->shared_from_this();
97 adaptor_.start([self](const error_code& ec) {
98 if (!ec)
99 {
100 self->start_deadline();
101 self->parser_.clear();
102
103 self->do_read();
104 }
105 else
106 {
107 CROW_LOG_ERROR << "Could not start adaptor: " << ec.message();
108 }
109 });
110 }
111
112 void handle_url()
113 {
114 routing_handle_result_ = handler_->handle_initial(req_, res);
115 // if no route is found for the request method, return the response without parsing or processing anything further.
116 if (!routing_handle_result_->rule_index && !routing_handle_result_->catch_all && (req_.method != HTTPMethod::Options || routing_handle_result_->method == HTTPMethod::InternalMethodCount))
117 {
118 parser_.done();
119 need_to_call_after_handlers_ = true;
121 }
122 }
123
124 void handle_header()
125 {
126 // HTTP 1.1 Expect: 100-continue
127 if (req_.http_ver_major == 1 && req_.http_ver_minor == 1 && get_header_value(req_.headers, "expect") == "100-continue")
128 {
129 continue_requested = true;
130 buffers_.clear();
131 static const std::string expect_100_continue = "HTTP/1.1 100 Continue\r\n\r\n";
132 buffers_.emplace_back(expect_100_continue.data(), expect_100_continue.size());
133 error_code ec = do_write_sync(buffers_);
134 if (ec)
135 {
136 CROW_LOG_ERROR << ec << " buffer write error happened while handling sending continuation buffer header";
137 }
138 }
139 if (!routing_handle_result_->rule_index && !routing_handle_result_->catch_all && req_.method == HTTPMethod::Options)
140 {
141 parser_.done();
142 need_to_call_after_handlers_ = true;
144 }
145 }
146
147 void handle()
148 {
149 // TODO(EDev): cancel_deadline_timer should be looked into, it might be a good idea to add it to handle_url() and then restart the timer once everything passes
150 cancel_deadline_timer();
151 bool is_invalid_request = false;
152 add_keep_alive_ = false;
153
154 // Create context
155 ctx_ = detail::context<Middlewares...>();
156 req_.middleware_context = static_cast<void*>(&ctx_);
157 req_.middleware_container = static_cast<void*>(middlewares_);
158 req_.io_context = &adaptor_.get_io_context();
159 req_.remote_ip_address = adaptor_.address();
160 add_keep_alive_ = req_.keep_alive;
161 close_connection_ = req_.close_connection;
162
163 if (req_.check_version(1, 1)) // HTTP/1.1
164 {
165 if (!req_.headers.count("host"))
166 {
167 is_invalid_request = true;
168 res = response(400);
169 }
170 else if (req_.upgrade && req_.method != HTTPMethod::Options)
171 {
172 // h2 or h2c headers
173 if (req_.get_header_value("upgrade").find("h2")==0)
174 {
175 // TODO(ipkn): HTTP/2
176 // currently, ignore upgrade header
177 }
178 else
179 {
180
181 detail::middleware_call_helper<detail::middleware_call_criteria_only_global,
182 0, decltype(ctx_), decltype(*middlewares_)>({}, *middlewares_, req_, res, ctx_);
183 close_connection_ = true;
184 handler_->handle_upgrade(req_, res, std::move(adaptor_));
185 return;
186 }
187 }
188 }
189
190 CROW_LOG_INFO << "Request: " << utility::lexical_cast<std::string>(adaptor_.remote_endpoint()) << " " << this << " HTTP/" << (char)(req_.http_ver_major + '0') << "." << (char)(req_.http_ver_minor + '0') << ' ' << method_name(req_.method) << " " << req_.url;
191
192
193 need_to_call_after_handlers_ = false;
194 if (!is_invalid_request)
195 {
196 res.complete_request_handler_ = nullptr;
197 auto self = this->shared_from_this();
198 res.is_alive_helper_ = [self]() -> bool {
199 return self->adaptor_.is_open();
200 };
201
202 detail::middleware_call_helper<detail::middleware_call_criteria_only_global,
203 0, decltype(ctx_), decltype(*middlewares_)>({}, *middlewares_, req_, res, ctx_);
204
205 if (!res.completed_)
206 {
207 res.complete_request_handler_ = [self] {
208 self->complete_request();
209 };
210 need_to_call_after_handlers_ = true;
211 handler_->handle(req_, res, routing_handle_result_);
212 if (add_keep_alive_)
213 res.set_header("connection", "Keep-Alive");
214 }
215 else
216 {
218 }
219 }
220 else
221 {
223 }
224 }
225
226 /// Call the after handle middleware and send the write the response to the connection.
228 {
229 CROW_LOG_INFO << "Response: " << this << ' ' << req_.raw_url << ' ' << res.code << ' ' << close_connection_;
230 res.is_alive_helper_ = nullptr;
231
232 if (need_to_call_after_handlers_)
233 {
234 need_to_call_after_handlers_ = false;
235
236 // call all after_handler of middlewares
237 detail::after_handlers_call_helper<
239 (static_cast<int>(sizeof...(Middlewares)) - 1),
240 decltype(ctx_),
241 decltype(*middlewares_)>({}, *middlewares_, ctx_, req_, res);
242 }
243#ifdef CROW_ENABLE_COMPRESSION
244 if (!res.body.empty() && handler_->compression_used())
245 {
246 std::string accept_encoding = req_.get_header_value("Accept-Encoding");
247 if (!accept_encoding.empty() && res.compressed)
248 {
249 switch (handler_->compression_algorithm())
250 {
251 case compression::DEFLATE:
252 if (accept_encoding.find("deflate") != std::string::npos)
253 {
254 res.body = compression::compress_string(res.body, compression::algorithm::DEFLATE);
255 res.set_header("Content-Encoding", "deflate");
256 }
257 break;
258 case compression::GZIP:
259 if (accept_encoding.find("gzip") != std::string::npos)
260 {
261 res.body = compression::compress_string(res.body, compression::algorithm::GZIP);
262 res.set_header("Content-Encoding", "gzip");
263 }
264 break;
265 default:
266 break;
267 }
268 }
269 }
270#endif
271
272 prepare_buffers();
273
274 if (res.is_static_type())
275 {
276 do_write_static();
277 }
278 else
279 {
280 do_write_general();
281 }
282 }
283
284 private:
285 void prepare_buffers()
286 {
287 res.complete_request_handler_ = nullptr;
288 res.is_alive_helper_ = nullptr;
289
290 if (!adaptor_.is_open())
291 {
292 //CROW_LOG_DEBUG << this << " delete (socket is closed) " << is_reading << ' ' << is_writing;
293 //delete this;
294 return;
295 }
296 res.write_header_into_buffer(buffers_, content_length_, add_keep_alive_, server_name_);
297 }
298
299 void do_write_static()
300 {
301 asio::write(adaptor_.socket(), buffers_);
302
303 if (res.file_info.statResult == 0)
304 {
305 std::ifstream is(res.file_info.path.c_str(), std::ios::in | std::ios::binary);
306 std::vector<asio::const_buffer> buffers{1};
307 char buf[16384];
308 is.read(buf, sizeof(buf));
309 while (is.gcount() > 0)
310 {
311 buffers[0] = asio::buffer(buf, is.gcount());
312 error_code ec = do_write_sync(buffers);
313 if (ec) {
314 CROW_LOG_ERROR << ec << " - buffer write error happened while sending content of file "
315 << res.file_info.path << ". Writing stopped premature.";
316 break;
317 }
318 is.read(buf, sizeof(buf));
319 }
320 }
321 if (close_connection_)
322 {
323 adaptor_.shutdown_readwrite();
324 adaptor_.close();
325 CROW_LOG_DEBUG << this << " from write (static)";
326 }
327
328 res.end();
329 res.clear();
330 buffers_.clear();
331 parser_.clear();
332 }
333
334 void do_write_general()
335 {
336 error_code ec;
337 if (res.body.length() < res_stream_threshold_)
338 {
339 res_body_copy_.swap(res.body);
340 buffers_.emplace_back(res_body_copy_.data(), res_body_copy_.size());
341
342 ec = do_write_sync(buffers_);
343 if (ec) {
344 CROW_LOG_ERROR << ec << " - buffer write error happened while sending response. Writing stopped premature.";
345 }
346 if (need_to_start_read_after_complete_)
347 {
348 need_to_start_read_after_complete_ = false;
349 start_deadline();
350 do_read();
351 }
352 }
353 else
354 {
355 asio::write(adaptor_.socket(), buffers_,ec); // Write the response start / headers
356 if (ec) {
357 CROW_LOG_ERROR << ec << "- buffer write error happened while sending response start / headers. Writing stopped premature.";
358 }
359 cancel_deadline_timer();
360 if (res.body.length() > 0)
361 {
362 std::vector<asio::const_buffer> buffers{1};
363 const uint8_t* data = reinterpret_cast<const uint8_t*>(res.body.data());
364 size_t length = res.body.length();
365 for (size_t transferred = 0; transferred < length;)
366 {
367 size_t to_transfer = CROW_MIN(16384UL, length - transferred);
368 buffers[0] = asio::const_buffer(data + transferred, to_transfer);
369 ec = do_write_sync(buffers);
370 if (ec) {
371 CROW_LOG_ERROR << ec << " - " << transferred << " - buffer write error happened while sending response. Writing stopped premature.";
372 break;
373 }
374 transferred += to_transfer;
375 }
376 }
377 if (close_connection_)
378 {
379 adaptor_.shutdown_readwrite();
380 adaptor_.close();
381 CROW_LOG_DEBUG << this << " from write (res_stream)";
382 }
383
384 res.end();
385 res.clear();
386 buffers_.clear();
387 parser_.clear();
388 }
389 }
390
391 void do_read()
392 {
393 auto self = this->shared_from_this();
394 adaptor_.socket().async_read_some(
395 asio::buffer(buffer_),
396 [self](const error_code& ec, std::size_t bytes_transferred) {
397 bool error_while_reading = true;
398 if (!ec)
399 {
400 bool ret = self->parser_.feed(self->buffer_.data(), bytes_transferred);
401 if (ret && self->adaptor_.is_open())
402 {
403 error_while_reading = false;
404 }
405 }
406
407 if (error_while_reading)
408 {
409 self->cancel_deadline_timer();
410 self->parser_.done();
411 self->adaptor_.shutdown_read();
412 self->adaptor_.close();
413 CROW_LOG_DEBUG << self << " from read(1) with description: \"" << http_errno_description(static_cast<http_errno>(self->parser_.http_errno)) << '\"';
414 }
415 else if (self->close_connection_)
416 {
417 self->cancel_deadline_timer();
418 self->parser_.done();
419 // adaptor will close after write
420 }
421 else if (!self->need_to_call_after_handlers_)
422 {
423 self->start_deadline();
424 self->do_read();
425 }
426 else
427 {
428 // res will be completed later by user
429 self->need_to_start_read_after_complete_ = true;
430 }
431 });
432 }
433
434 void do_write()
435 {
436 auto self = this->shared_from_this();
437 asio::async_write(
438 adaptor_.socket(), buffers_,
439 [self](const error_code& ec, std::size_t /*bytes_transferred*/) {
440 self->res.clear();
441 self->res_body_copy_.clear();
442 if (!self->continue_requested)
443 {
444 self->parser_.clear();
445 }
446 else
447 {
448 self->continue_requested = false;
449 }
450
451 if (!ec)
452 {
453 if (self->close_connection_)
454 {
455 self->adaptor_.shutdown_write();
456 self->adaptor_.close();
457 CROW_LOG_DEBUG << self << " from write(1)";
458 }
459 }
460 else
461 {
462 CROW_LOG_DEBUG << self << " from write(2)";
463 }
464 });
465 }
466
467 inline error_code do_write_sync(std::vector<asio::const_buffer>& buffers)
468 {
469 error_code ec;
470 asio::write(adaptor_.socket(), buffers, ec);
471 if (ec)
472 {
473 // CROW_LOG_ERROR << ec << " - happened while sending buffers";
474 CROW_LOG_DEBUG << this << " from write (sync)(2)";
475 }
476
477 this->res.clear();
478 this->res_body_copy_.clear();
479 if (this->continue_requested)
480 {
481 this->continue_requested = false;
482 }
483 else
484 {
485 this->parser_.clear();
486 }
487
488 return ec;
489 }
490
491 void cancel_deadline_timer()
492 {
493 CROW_LOG_DEBUG << this << " timer cancelled: " << &task_timer_ << ' ' << task_id_;
494 task_timer_.cancel(task_id_);
495 }
496
497 void start_deadline(/*int timeout = 5*/)
498 {
499 cancel_deadline_timer();
500
501 auto self = this->shared_from_this();
502 task_id_ = task_timer_.schedule([self] {
503 if (!self->adaptor_.is_open())
504 {
505 return;
506 }
507 self->adaptor_.shutdown_readwrite();
508 self->adaptor_.close();
509 });
510 CROW_LOG_DEBUG << this << " timer added: " << &task_timer_ << ' ' << task_id_;
511 }
512
513 private:
514 Adaptor adaptor_;
515 Handler* handler_;
516
517 std::array<char, 4096> buffer_;
518
519 HTTPParser<Connection> parser_;
520 std::unique_ptr<routing_handle_result> routing_handle_result_;
521 request& req_;
522 response res;
523
524 bool close_connection_ = false;
525
526 const std::string& server_name_;
527 std::vector<asio::const_buffer> buffers_;
528
529 std::string content_length_;
530 std::string date_str_;
531 std::string res_body_copy_;
532
533 detail::task_timer::identifier_type task_id_{};
534
535 bool continue_requested{};
536 bool need_to_call_after_handlers_{};
537 bool need_to_start_read_after_complete_{};
538 bool add_keep_alive_{};
539
540 std::tuple<Middlewares...>* middlewares_;
541 detail::context<Middlewares...> ctx_;
542
543 std::function<std::string()>& get_cached_date_str;
544 detail::task_timer& task_timer_;
545
546 size_t res_stream_threshold_;
547
548 std::atomic<unsigned int>& queue_length_;
549 };
550
551} // namespace crow
An HTTP connection.
Definition http_connection.h:48
decltype(std::declval< Adaptor >().raw_socket()) & socket()
The TCP socket on top of which the connection is established.
Definition http_connection.h:89
void complete_request()
Call the after handle middleware and send the write the response to the connection.
Definition http_connection.h:227
Definition task_timer.h:36
The main namespace of the library. In this namespace is defined the most important classes and functi...
const std::string & get_header_value(const ci_map &headers, const std::string &key)
Find and return the value associated with the key. (returns an empty string if nothing is found)
Definition http_request.h:33
bool close_connection
Whether or not the server should shut down the TCP connection once a response is sent.
Definition http_request.h:57
bool keep_alive
Whether or not the server should send a connection: Keep-Alive header to the client.
Definition http_request.h:56
std::string url
The endpoint without any parameters.
Definition http_request.h:50
std::string remote_ip_address
The IP address from which the request was sent.
Definition http_request.h:54
bool upgrade
Whether or noth the server should change the HTTP connection to a different connection.
Definition http_request.h:58
HTTP response.
Definition http_response.h:40