Crow  1.1
A C++ microframework for the web
http_server.h
1 #pragma once
2 
3 #include <chrono>
4 #ifndef ASIO_STANDALONE
5 #define ASIO_STANDALONE
6 #endif
7 #include <asio.hpp>
8 #ifdef CROW_ENABLE_SSL
9 #include <asio/ssl.hpp>
10 #endif
11 #include <cstdint>
12 #include <atomic>
13 #include <future>
14 #include <vector>
15 #include <memory>
16 
17 #include "crow/version.h"
18 #include "crow/http_connection.h"
19 #include "crow/logging.h"
20 #include "crow/task_timer.h"
21 
22 namespace crow
23 {
24  using tcp = asio::ip::tcp;
25 
26  template<typename Handler, typename Adaptor = SocketAdaptor, typename... Middlewares>
27  class Server
28  {
29  public:
30  Server(Handler* handler, std::string bindaddr, uint16_t port, std::string server_name = std::string("Crow/") + VERSION, std::tuple<Middlewares...>* middlewares = nullptr, uint16_t concurrency = 1, uint8_t timeout = 5, typename Adaptor::context* adaptor_ctx = nullptr):
31  acceptor_(io_service_, tcp::endpoint(asio::ip::address::from_string(bindaddr), port)),
32  signals_(io_service_),
33  tick_timer_(io_service_),
34  handler_(handler),
35  concurrency_(concurrency),
36  timeout_(timeout),
37  server_name_(server_name),
38  port_(port),
39  bindaddr_(bindaddr),
40  task_queue_length_pool_(concurrency_ - 1),
41  middlewares_(middlewares),
42  adaptor_ctx_(adaptor_ctx)
43  {}
44 
45  void set_tick_function(std::chrono::milliseconds d, std::function<void()> f)
46  {
47  tick_interval_ = d;
48  tick_function_ = f;
49  }
50 
51  void on_tick()
52  {
53  tick_function_();
54  tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
55  tick_timer_.async_wait([this](const asio::error_code& ec) {
56  if (ec)
57  return;
58  on_tick();
59  });
60  }
61 
62  void run()
63  {
64  uint16_t worker_thread_count = concurrency_ - 1;
65  for (int i = 0; i < worker_thread_count; i++)
66  io_service_pool_.emplace_back(new asio::io_service());
67  get_cached_date_str_pool_.resize(worker_thread_count);
68  task_timer_pool_.resize(worker_thread_count);
69 
70  std::vector<std::future<void>> v;
71  std::atomic<int> init_count(0);
72  for (uint16_t i = 0; i < worker_thread_count; i++)
73  v.push_back(
74  std::async(
75  std::launch::async, [this, i, &init_count] {
76  // thread local date string get function
77  auto last = std::chrono::steady_clock::now();
78 
79  std::string date_str;
80  auto update_date_str = [&] {
81  auto last_time_t = time(0);
82  tm my_tm;
83 
84 #if defined(_MSC_VER) || defined(__MINGW32__)
85  gmtime_s(&my_tm, &last_time_t);
86 #else
87  gmtime_r(&last_time_t, &my_tm);
88 #endif
89  date_str.resize(100);
90  size_t date_str_sz = strftime(&date_str[0], 99, "%a, %d %b %Y %H:%M:%S GMT", &my_tm);
91  date_str.resize(date_str_sz);
92  };
93  update_date_str();
94  get_cached_date_str_pool_[i] = [&]() -> std::string {
95  if (std::chrono::steady_clock::now() - last >= std::chrono::seconds(1))
96  {
97  last = std::chrono::steady_clock::now();
98  update_date_str();
99  }
100  return date_str;
101  };
102 
103  // initializing task timers
104  detail::task_timer task_timer(*io_service_pool_[i]);
105  task_timer.set_default_timeout(timeout_);
106  task_timer_pool_[i] = &task_timer;
107  task_queue_length_pool_[i] = 0;
108 
109  init_count++;
110  while (1)
111  {
112  try
113  {
114  if (io_service_pool_[i]->run() == 0)
115  {
116  // when io_service.run returns 0, there are no more works to do.
117  break;
118  }
119  }
120  catch (std::exception& e)
121  {
122  CROW_LOG_ERROR << "Worker Crash: An uncaught exception occurred: " << e.what();
123  }
124  }
125  }));
126 
127  if (tick_function_ && tick_interval_.count() > 0)
128  {
129  tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
130  tick_timer_.async_wait(
131  [this](const asio::error_code& ec) {
132  if (ec)
133  return;
134  on_tick();
135  });
136  }
137 
138  port_ = acceptor_.local_endpoint().port();
139  handler_->port(port_);
140 
141 
142  CROW_LOG_INFO << server_name_ << " server is running at " << (handler_->ssl_used() ? "https://" : "http://") << bindaddr_ << ":" << acceptor_.local_endpoint().port() << " using " << concurrency_ << " threads";
143  CROW_LOG_INFO << "Call `app.loglevel(crow::LogLevel::Warning)` to hide Info level logs.";
144 
145  signals_.async_wait(
146  [&](const asio::error_code& /*error*/, int /*signal_number*/) {
147  stop();
148  });
149 
150  while (worker_thread_count != init_count)
151  std::this_thread::yield();
152 
153  do_accept();
154 
155  std::thread(
156  [this] {
157  notify_start();
158  io_service_.run();
159  CROW_LOG_INFO << "Exiting.";
160  })
161  .join();
162  }
163 
164  void stop()
165  {
166  shutting_down_ = true; // Prevent the acceptor from taking new connections
167  for (auto& io_service : io_service_pool_)
168  {
169  if (io_service != nullptr)
170  {
171  CROW_LOG_INFO << "Closing IO service " << &io_service;
172  io_service->stop(); // Close all io_services (and HTTP connections)
173  }
174  }
175 
176  CROW_LOG_INFO << "Closing main IO service (" << &io_service_ << ')';
177  io_service_.stop(); // Close main io_service
178  }
179 
180  /// Wait until the server has properly started
182  {
183  std::unique_lock<std::mutex> lock(start_mutex_);
184  if (!server_started_)
185  cv_started_.wait(lock);
186  }
187 
188  void signal_clear()
189  {
190  signals_.clear();
191  }
192 
193  void signal_add(int signal_number)
194  {
195  signals_.add(signal_number);
196  }
197 
198  private:
199  uint16_t pick_io_service_idx()
200  {
201  uint16_t min_queue_idx = 0;
202 
203  // TODO improve load balancing
204  // size_t is used here to avoid the security issue https://codeql.github.com/codeql-query-help/cpp/cpp-comparison-with-wider-type/
205  // even though the max value of this can be only uint16_t as concurrency is uint16_t.
206  for (size_t i = 1; i < task_queue_length_pool_.size() && task_queue_length_pool_[min_queue_idx] > 0; i++)
207  // No need to check other io_services if the current one has no tasks
208  {
209  if (task_queue_length_pool_[i] < task_queue_length_pool_[min_queue_idx])
210  min_queue_idx = i;
211  }
212  return min_queue_idx;
213  }
214 
215  void do_accept()
216  {
217  if (!shutting_down_)
218  {
219  uint16_t service_idx = pick_io_service_idx();
220  asio::io_service& is = *io_service_pool_[service_idx];
221  task_queue_length_pool_[service_idx]++;
222  CROW_LOG_DEBUG << &is << " {" << service_idx << "} queue length: " << task_queue_length_pool_[service_idx];
223 
224  auto p = std::make_shared<Connection<Adaptor, Handler, Middlewares...>>(
225  is, handler_, server_name_, middlewares_,
226  get_cached_date_str_pool_[service_idx], *task_timer_pool_[service_idx], adaptor_ctx_, task_queue_length_pool_[service_idx]);
227 
228  acceptor_.async_accept(
229  p->socket(),
230  [this, p, &is, service_idx](asio::error_code ec) {
231  if (!ec)
232  {
233  is.post(
234  [p] {
235  p->start();
236  });
237  }
238  else
239  {
240  task_queue_length_pool_[service_idx]--;
241  CROW_LOG_DEBUG << &is << " {" << service_idx << "} queue length: " << task_queue_length_pool_[service_idx];
242  }
243  do_accept();
244  });
245  }
246  }
247 
248  /// Notify anything using `wait_for_start()` to proceed
249  void notify_start()
250  {
251  std::unique_lock<std::mutex> lock(start_mutex_);
252  server_started_ = true;
253  cv_started_.notify_all();
254  }
255 
256  private:
257  std::vector<std::unique_ptr<asio::io_service>> io_service_pool_;
258  asio::io_service io_service_;
259  std::vector<detail::task_timer*> task_timer_pool_;
260  std::vector<std::function<std::string()>> get_cached_date_str_pool_;
261  tcp::acceptor acceptor_;
262  bool shutting_down_ = false;
263  bool server_started_{false};
264  std::condition_variable cv_started_;
265  std::mutex start_mutex_;
266  asio::signal_set signals_;
267 
268  asio::basic_waitable_timer<std::chrono::high_resolution_clock> tick_timer_;
269 
270  Handler* handler_;
271  uint16_t concurrency_{2};
272  std::uint8_t timeout_;
273  std::string server_name_;
274  uint16_t port_;
275  std::string bindaddr_;
276  std::vector<std::atomic<unsigned int>> task_queue_length_pool_;
277 
278  std::chrono::milliseconds tick_interval_;
279  std::function<void()> tick_function_;
280 
281  std::tuple<Middlewares...>* middlewares_;
282 
283  typename Adaptor::context* adaptor_ctx_;
284  };
285 } // namespace crow
Definition: http_server.h:28
void wait_for_start()
Wait until the server has properly started.
Definition: http_server.h:181
A class for scheduling functions to be called after a specific amount of ticks. A tick is equal to 1 ...
Definition: task_timer.h:23
void set_default_timeout(std::uint8_t timeout)
Set the default timeout for this task_timer instance. (Default: 5)
Definition: task_timer.h:87