Crow  1.1
A C++ microframework for the web
http_server.h
1 #pragma once
2 
3 #ifdef CROW_USE_BOOST
4 #include <boost/asio.hpp>
5 #ifdef CROW_ENABLE_SSL
6 #include <boost/asio/ssl.hpp>
7 #endif
8 #else
9 #ifndef ASIO_STANDALONE
10 #define ASIO_STANDALONE
11 #endif
12 #include <asio.hpp>
13 #ifdef CROW_ENABLE_SSL
14 #include <asio/ssl.hpp>
15 #endif
16 #endif
17 
18 #include <atomic>
19 #include <chrono>
20 #include <cstdint>
21 #include <future>
22 #include <memory>
23 #include <vector>
24 
25 #include "crow/version.h"
26 #include "crow/http_connection.h"
27 #include "crow/logging.h"
28 #include "crow/task_timer.h"
29 
30 
31 namespace crow // NOTE: Already documented in "crow/app.h"
32 {
33 #ifdef CROW_USE_BOOST
34  namespace asio = boost::asio;
35  using error_code = boost::system::error_code;
36 #else
37  using error_code = asio::error_code;
38 #endif
39  using tcp = asio::ip::tcp;
40 
41  template<typename Handler, typename Adaptor = SocketAdaptor, typename... Middlewares>
42  class Server
43  {
44  public:
45  Server(Handler* handler, std::string bindaddr, uint16_t port, std::string server_name = std::string("Crow/") + VERSION, std::tuple<Middlewares...>* middlewares = nullptr, uint16_t concurrency = 1, uint8_t timeout = 5, typename Adaptor::context* adaptor_ctx = nullptr):
46  acceptor_(io_service_, tcp::endpoint(asio::ip::address::from_string(bindaddr), port)),
47  signals_(io_service_),
48  tick_timer_(io_service_),
49  handler_(handler),
50  concurrency_(concurrency),
51  timeout_(timeout),
52  server_name_(server_name),
53  port_(port),
54  bindaddr_(bindaddr),
55  task_queue_length_pool_(concurrency_ - 1),
56  middlewares_(middlewares),
57  adaptor_ctx_(adaptor_ctx)
58  {}
59 
60  void set_tick_function(std::chrono::milliseconds d, std::function<void()> f)
61  {
62  tick_interval_ = d;
63  tick_function_ = f;
64  }
65 
66  void on_tick()
67  {
68  tick_function_();
69  tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
70  tick_timer_.async_wait([this](const error_code& ec) {
71  if (ec)
72  return;
73  on_tick();
74  });
75  }
76 
77  void run()
78  {
79  uint16_t worker_thread_count = concurrency_ - 1;
80  for (int i = 0; i < worker_thread_count; i++)
81  io_service_pool_.emplace_back(new asio::io_service());
82  get_cached_date_str_pool_.resize(worker_thread_count);
83  task_timer_pool_.resize(worker_thread_count);
84 
85  std::vector<std::future<void>> v;
86  std::atomic<int> init_count(0);
87  for (uint16_t i = 0; i < worker_thread_count; i++)
88  v.push_back(
89  std::async(
90  std::launch::async, [this, i, &init_count] {
91  // thread local date string get function
92  auto last = std::chrono::steady_clock::now();
93 
94  std::string date_str;
95  auto update_date_str = [&] {
96  auto last_time_t = time(0);
97  tm my_tm;
98 
99 #if defined(_MSC_VER) || defined(__MINGW32__)
100  gmtime_s(&my_tm, &last_time_t);
101 #else
102  gmtime_r(&last_time_t, &my_tm);
103 #endif
104  date_str.resize(100);
105  size_t date_str_sz = strftime(&date_str[0], 99, "%a, %d %b %Y %H:%M:%S GMT", &my_tm);
106  date_str.resize(date_str_sz);
107  };
108  update_date_str();
109  get_cached_date_str_pool_[i] = [&]() -> std::string {
110  if (std::chrono::steady_clock::now() - last >= std::chrono::seconds(1))
111  {
112  last = std::chrono::steady_clock::now();
113  update_date_str();
114  }
115  return date_str;
116  };
117 
118  // initializing task timers
119  detail::task_timer task_timer(*io_service_pool_[i]);
120  task_timer.set_default_timeout(timeout_);
121  task_timer_pool_[i] = &task_timer;
122  task_queue_length_pool_[i] = 0;
123 
124  init_count++;
125  while (1)
126  {
127  try
128  {
129  if (io_service_pool_[i]->run() == 0)
130  {
131  // when io_service.run returns 0, there are no more works to do.
132  break;
133  }
134  }
135  catch (std::exception& e)
136  {
137  CROW_LOG_ERROR << "Worker Crash: An uncaught exception occurred: " << e.what();
138  }
139  }
140  }));
141 
142  if (tick_function_ && tick_interval_.count() > 0)
143  {
144  tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
145  tick_timer_.async_wait(
146  [this](const error_code& ec) {
147  if (ec)
148  return;
149  on_tick();
150  });
151  }
152 
153  port_ = acceptor_.local_endpoint().port();
154  handler_->port(port_);
155 
156 
157  CROW_LOG_INFO << server_name_ << " server is running at " << (handler_->ssl_used() ? "https://" : "http://") << bindaddr_ << ":" << acceptor_.local_endpoint().port() << " using " << concurrency_ << " threads";
158  CROW_LOG_INFO << "Call `app.loglevel(crow::LogLevel::Warning)` to hide Info level logs.";
159 
160  signals_.async_wait(
161  [&](const error_code& /*error*/, int /*signal_number*/) {
162  stop();
163  });
164 
165  while (worker_thread_count != init_count)
166  std::this_thread::yield();
167 
168  do_accept();
169 
170  std::thread(
171  [this] {
172  notify_start();
173  io_service_.run();
174  CROW_LOG_INFO << "Exiting.";
175  })
176  .join();
177  }
178 
179  void stop()
180  {
181  shutting_down_ = true; // Prevent the acceptor from taking new connections
182  for (auto& io_service : io_service_pool_)
183  {
184  if (io_service != nullptr)
185  {
186  CROW_LOG_INFO << "Closing IO service " << &io_service;
187  io_service->stop(); // Close all io_services (and HTTP connections)
188  }
189  }
190 
191  CROW_LOG_INFO << "Closing main IO service (" << &io_service_ << ')';
192  io_service_.stop(); // Close main io_service
193  }
194 
195  /// Wait until the server has properly started
197  {
198  std::unique_lock<std::mutex> lock(start_mutex_);
199  if (!server_started_)
200  cv_started_.wait(lock);
201  }
202 
203  void signal_clear()
204  {
205  signals_.clear();
206  }
207 
208  void signal_add(int signal_number)
209  {
210  signals_.add(signal_number);
211  }
212 
213  private:
214  uint16_t pick_io_service_idx()
215  {
216  uint16_t min_queue_idx = 0;
217 
218  // TODO improve load balancing
219  // size_t is used here to avoid the security issue https://codeql.github.com/codeql-query-help/cpp/cpp-comparison-with-wider-type/
220  // even though the max value of this can be only uint16_t as concurrency is uint16_t.
221  for (size_t i = 1; i < task_queue_length_pool_.size() && task_queue_length_pool_[min_queue_idx] > 0; i++)
222  // No need to check other io_services if the current one has no tasks
223  {
224  if (task_queue_length_pool_[i] < task_queue_length_pool_[min_queue_idx])
225  min_queue_idx = i;
226  }
227  return min_queue_idx;
228  }
229 
230  void do_accept()
231  {
232  if (!shutting_down_)
233  {
234  uint16_t service_idx = pick_io_service_idx();
235  asio::io_service& is = *io_service_pool_[service_idx];
236  task_queue_length_pool_[service_idx]++;
237  CROW_LOG_DEBUG << &is << " {" << service_idx << "} queue length: " << task_queue_length_pool_[service_idx];
238 
239  auto p = std::make_shared<Connection<Adaptor, Handler, Middlewares...>>(
240  is, handler_, server_name_, middlewares_,
241  get_cached_date_str_pool_[service_idx], *task_timer_pool_[service_idx], adaptor_ctx_, task_queue_length_pool_[service_idx]);
242 
243  acceptor_.async_accept(
244  p->socket(),
245  [this, p, &is, service_idx](error_code ec) {
246  if (!ec)
247  {
248  is.post(
249  [p] {
250  p->start();
251  });
252  }
253  else
254  {
255  task_queue_length_pool_[service_idx]--;
256  CROW_LOG_DEBUG << &is << " {" << service_idx << "} queue length: " << task_queue_length_pool_[service_idx];
257  }
258  do_accept();
259  });
260  }
261  }
262 
263  /// Notify anything using `wait_for_start()` to proceed
264  void notify_start()
265  {
266  std::unique_lock<std::mutex> lock(start_mutex_);
267  server_started_ = true;
268  cv_started_.notify_all();
269  }
270 
271  private:
272  std::vector<std::unique_ptr<asio::io_service>> io_service_pool_;
273  asio::io_service io_service_;
274  std::vector<detail::task_timer*> task_timer_pool_;
275  std::vector<std::function<std::string()>> get_cached_date_str_pool_;
276  tcp::acceptor acceptor_;
277  bool shutting_down_ = false;
278  bool server_started_{false};
279  std::condition_variable cv_started_;
280  std::mutex start_mutex_;
281  asio::signal_set signals_;
282 
283  asio::basic_waitable_timer<std::chrono::high_resolution_clock> tick_timer_;
284 
285  Handler* handler_;
286  uint16_t concurrency_{2};
287  std::uint8_t timeout_;
288  std::string server_name_;
289  uint16_t port_;
290  std::string bindaddr_;
291  std::vector<std::atomic<unsigned int>> task_queue_length_pool_;
292 
293  std::chrono::milliseconds tick_interval_;
294  std::function<void()> tick_function_;
295 
296  std::tuple<Middlewares...>* middlewares_;
297 
298  typename Adaptor::context* adaptor_ctx_;
299  };
300 } // namespace crow
Definition: http_server.h:43
void wait_for_start()
Wait until the server has properly started.
Definition: http_server.h:196
A class for scheduling functions to be called after a specific amount of ticks. A tick is equal to 1 ...
Definition: task_timer.h:34
void set_default_timeout(std::uint8_t timeout)
Set the default timeout for this task_timer instance. (Default: 5)
Definition: task_timer.h:98
The main namespace of the library. In this namespace is defined the most important classes and functi...