Crow  1.1
A C++ microframework for the web
http_server.h
1 #pragma once
2 
3 #ifdef CROW_USE_BOOST
4 #include <boost/asio.hpp>
5 #ifdef CROW_ENABLE_SSL
6 #include <boost/asio/ssl.hpp>
7 #endif
8 #else
9 #ifndef ASIO_STANDALONE
10 #define ASIO_STANDALONE
11 #endif
12 #include <asio.hpp>
13 #ifdef CROW_ENABLE_SSL
14 #include <asio/ssl.hpp>
15 #endif
16 #endif
17 
18 #include <atomic>
19 #include <chrono>
20 #include <cstdint>
21 #include <future>
22 #include <memory>
23 #include <vector>
24 
25 #include "crow/version.h"
26 #include "crow/http_connection.h"
27 #include "crow/logging.h"
28 #include "crow/task_timer.h"
29 
30 
31 namespace crow // NOTE: Already documented in "crow/app.h"
32 {
33 #ifdef CROW_USE_BOOST
34  namespace asio = boost::asio;
35  using error_code = boost::system::error_code;
36 #else
37  using error_code = asio::error_code;
38 #endif
39  using tcp = asio::ip::tcp;
40 
41  template<typename Handler, typename Adaptor = SocketAdaptor, typename... Middlewares>
42  class Server
43  {
44  public:
45  Server(Handler* handler, std::string bindaddr, uint16_t port, std::string server_name = std::string("Crow/") + VERSION, std::tuple<Middlewares...>* middlewares = nullptr, uint16_t concurrency = 1, uint8_t timeout = 5, typename Adaptor::context* adaptor_ctx = nullptr):
46  acceptor_(io_service_, tcp::endpoint(asio::ip::address::from_string(bindaddr), port)),
47  signals_(io_service_),
48  tick_timer_(io_service_),
49  handler_(handler),
50  concurrency_(concurrency),
51  timeout_(timeout),
52  server_name_(server_name),
53  port_(port),
54  bindaddr_(bindaddr),
55  task_queue_length_pool_(concurrency_ - 1),
56  middlewares_(middlewares),
57  adaptor_ctx_(adaptor_ctx)
58  {}
59 
60  void set_tick_function(std::chrono::milliseconds d, std::function<void()> f)
61  {
62  tick_interval_ = d;
63  tick_function_ = f;
64  }
65 
66  void on_tick()
67  {
68  tick_function_();
69  tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
70  tick_timer_.async_wait([this](const error_code& ec) {
71  if (ec)
72  return;
73  on_tick();
74  });
75  }
76 
77  void run()
78  {
79  uint16_t worker_thread_count = concurrency_ - 1;
80  for (int i = 0; i < worker_thread_count; i++)
81  io_service_pool_.emplace_back(new asio::io_service());
82  get_cached_date_str_pool_.resize(worker_thread_count);
83  task_timer_pool_.resize(worker_thread_count);
84 
85  std::vector<std::future<void>> v;
86  std::atomic<int> init_count(0);
87  for (uint16_t i = 0; i < worker_thread_count; i++)
88  v.push_back(
89  std::async(
90  std::launch::async, [this, i, &init_count] {
91  // thread local date string get function
92  auto last = std::chrono::steady_clock::now();
93 
94  std::string date_str;
95  auto update_date_str = [&] {
96  auto last_time_t = time(0);
97  tm my_tm;
98 
99 #if defined(_MSC_VER) || defined(__MINGW32__)
100  gmtime_s(&my_tm, &last_time_t);
101 #else
102  gmtime_r(&last_time_t, &my_tm);
103 #endif
104  date_str.resize(100);
105  size_t date_str_sz = strftime(&date_str[0], 99, "%a, %d %b %Y %H:%M:%S GMT", &my_tm);
106  date_str.resize(date_str_sz);
107  };
108  update_date_str();
109  get_cached_date_str_pool_[i] = [&]() -> std::string {
110  if (std::chrono::steady_clock::now() - last >= std::chrono::seconds(1))
111  {
112  last = std::chrono::steady_clock::now();
113  update_date_str();
114  }
115  return date_str;
116  };
117 
118  // initializing task timers
119  detail::task_timer task_timer(*io_service_pool_[i]);
120  task_timer.set_default_timeout(timeout_);
121  task_timer_pool_[i] = &task_timer;
122  task_queue_length_pool_[i] = 0;
123 
124  init_count++;
125  while (1)
126  {
127  try
128  {
129  if (io_service_pool_[i]->run() == 0)
130  {
131  // when io_service.run returns 0, there are no more works to do.
132  break;
133  }
134  }
135  catch (std::exception& e)
136  {
137  CROW_LOG_ERROR << "Worker Crash: An uncaught exception occurred: " << e.what();
138  }
139  }
140  }));
141 
142  if (tick_function_ && tick_interval_.count() > 0)
143  {
144  tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
145  tick_timer_.async_wait(
146  [this](const error_code& ec) {
147  if (ec)
148  return;
149  on_tick();
150  });
151  }
152 
153  port_ = acceptor_.local_endpoint().port();
154  handler_->port(port_);
155 
156 
157  CROW_LOG_INFO << server_name_ << " server is running at " << (handler_->ssl_used() ? "https://" : "http://") << bindaddr_ << ":" << acceptor_.local_endpoint().port() << " using " << concurrency_ << " threads";
158  CROW_LOG_INFO << "Call `app.loglevel(crow::LogLevel::Warning)` to hide Info level logs.";
159 
160  signals_.async_wait(
161  [&](const error_code& /*error*/, int /*signal_number*/) {
162  stop();
163  });
164 
165  while (worker_thread_count != init_count)
166  std::this_thread::yield();
167 
168  do_accept();
169 
170  std::thread(
171  [this] {
172  notify_start();
173  io_service_.run();
174  CROW_LOG_INFO << "Exiting.";
175  })
176  .join();
177  }
178 
179  void stop()
180  {
181  shutting_down_ = true; // Prevent the acceptor from taking new connections
182  for (auto& io_service : io_service_pool_)
183  {
184  if (io_service != nullptr)
185  {
186  CROW_LOG_INFO << "Closing IO service " << &io_service;
187  io_service->stop(); // Close all io_services (and HTTP connections)
188  }
189  }
190 
191  CROW_LOG_INFO << "Closing main IO service (" << &io_service_ << ')';
192  io_service_.stop(); // Close main io_service
193  }
194 
195  uint16_t port(){
196  return acceptor_.local_endpoint().port();
197  }
198 
199  /// Wait until the server has properly started
201  {
202  std::unique_lock<std::mutex> lock(start_mutex_);
203  while (!server_started_)
204  cv_started_.wait(lock);
205  }
206 
207  void signal_clear()
208  {
209  signals_.clear();
210  }
211 
212  void signal_add(int signal_number)
213  {
214  signals_.add(signal_number);
215  }
216 
217  private:
218  uint16_t pick_io_service_idx()
219  {
220  uint16_t min_queue_idx = 0;
221 
222  // TODO improve load balancing
223  // size_t is used here to avoid the security issue https://codeql.github.com/codeql-query-help/cpp/cpp-comparison-with-wider-type/
224  // even though the max value of this can be only uint16_t as concurrency is uint16_t.
225  for (size_t i = 1; i < task_queue_length_pool_.size() && task_queue_length_pool_[min_queue_idx] > 0; i++)
226  // No need to check other io_services if the current one has no tasks
227  {
228  if (task_queue_length_pool_[i] < task_queue_length_pool_[min_queue_idx])
229  min_queue_idx = i;
230  }
231  return min_queue_idx;
232  }
233 
234  void do_accept()
235  {
236  if (!shutting_down_)
237  {
238  uint16_t service_idx = pick_io_service_idx();
239  asio::io_service& is = *io_service_pool_[service_idx];
240  task_queue_length_pool_[service_idx]++;
241  CROW_LOG_DEBUG << &is << " {" << service_idx << "} queue length: " << task_queue_length_pool_[service_idx];
242 
243  auto p = std::make_shared<Connection<Adaptor, Handler, Middlewares...>>(
244  is, handler_, server_name_, middlewares_,
245  get_cached_date_str_pool_[service_idx], *task_timer_pool_[service_idx], adaptor_ctx_, task_queue_length_pool_[service_idx]);
246 
247  acceptor_.async_accept(
248  p->socket(),
249  [this, p, &is, service_idx](error_code ec) {
250  if (!ec)
251  {
252  is.post(
253  [p] {
254  p->start();
255  });
256  }
257  else
258  {
259  task_queue_length_pool_[service_idx]--;
260  CROW_LOG_DEBUG << &is << " {" << service_idx << "} queue length: " << task_queue_length_pool_[service_idx];
261  }
262  do_accept();
263  });
264  }
265  }
266 
267  /// Notify anything using `wait_for_start()` to proceed
268  void notify_start()
269  {
270  std::unique_lock<std::mutex> lock(start_mutex_);
271  server_started_ = true;
272  cv_started_.notify_all();
273  }
274 
275  private:
276  std::vector<std::unique_ptr<asio::io_service>> io_service_pool_;
277  asio::io_service io_service_;
278  std::vector<detail::task_timer*> task_timer_pool_;
279  std::vector<std::function<std::string()>> get_cached_date_str_pool_;
280  tcp::acceptor acceptor_;
281  bool shutting_down_ = false;
282  bool server_started_{false};
283  std::condition_variable cv_started_;
284  std::mutex start_mutex_;
285  asio::signal_set signals_;
286 
287  asio::basic_waitable_timer<std::chrono::high_resolution_clock> tick_timer_;
288 
289  Handler* handler_;
290  uint16_t concurrency_{2};
291  std::uint8_t timeout_;
292  std::string server_name_;
293  uint16_t port_;
294  std::string bindaddr_;
295  std::vector<std::atomic<unsigned int>> task_queue_length_pool_;
296 
297  std::chrono::milliseconds tick_interval_;
298  std::function<void()> tick_function_;
299 
300  std::tuple<Middlewares...>* middlewares_;
301 
302  typename Adaptor::context* adaptor_ctx_;
303  };
304 } // namespace crow
Definition: http_server.h:43
void wait_for_start()
Wait until the server has properly started.
Definition: http_server.h:200
Definition: task_timer.h:36
void set_default_timeout(uint8_t timeout)
Definition: task_timer.h:107
The main namespace of the library. In this namespace is defined the most important classes and functi...