Crow  1.1
A C++ microframework for the web
 
Loading...
Searching...
No Matches
http_server.h
1#pragma once
2
3#ifdef CROW_USE_BOOST
4#include <boost/asio.hpp>
5#ifdef CROW_ENABLE_SSL
6#include <boost/asio/ssl.hpp>
7#endif
8#else
9#ifndef ASIO_STANDALONE
10#define ASIO_STANDALONE
11#endif
12#include <asio.hpp>
13#ifdef CROW_ENABLE_SSL
14#include <asio/ssl.hpp>
15#endif
16#endif
17
18#include <atomic>
19#include <chrono>
20#include <cstdint>
21#include <future>
22#include <memory>
23#include <vector>
24
25#include "crow/version.h"
26#include "crow/http_connection.h"
27#include "crow/logging.h"
28#include "crow/task_timer.h"
29
30
31namespace crow // NOTE: Already documented in "crow/app.h"
32{
33#ifdef CROW_USE_BOOST
34 namespace asio = boost::asio;
35 using error_code = boost::system::error_code;
36#else
37 using error_code = asio::error_code;
38#endif
39 using tcp = asio::ip::tcp;
40
41 template<typename Handler, typename Adaptor = SocketAdaptor, typename... Middlewares>
42 class Server
43 {
44 public:
45 Server(Handler* handler,
46 const tcp::endpoint& endpoint,
47 std::string server_name = std::string("Crow/") + VERSION,
48 std::tuple<Middlewares...>* middlewares = nullptr,
49 uint16_t concurrency = 1,
50 uint8_t timeout = 5,
51 typename Adaptor::context* adaptor_ctx = nullptr):
52 acceptor_(io_context_),
53 signals_(io_context_),
54 tick_timer_(io_context_),
55 handler_(handler),
56 concurrency_(concurrency),
57 timeout_(timeout),
58 server_name_(server_name),
59 task_queue_length_pool_(concurrency_ - 1),
60 middlewares_(middlewares),
61 adaptor_ctx_(adaptor_ctx)
62 {
63 if (startup_failed_) {
64 CROW_LOG_ERROR << "Startup failed; not running server.";
65 return;
66 }
67
68 error_code ec;
69
70 acceptor_.open(endpoint.protocol(), ec);
71 if (ec) {
72 CROW_LOG_ERROR << "Failed to open acceptor: " << ec.message();
73 startup_failed_ = true;
74 return;
75 }
76
77 acceptor_.set_option(tcp::acceptor::reuse_address(true), ec);
78 if (ec) {
79 CROW_LOG_ERROR << "Failed to set socket option: " << ec.message();
80 startup_failed_ = true;
81 return;
82 }
83
84 acceptor_.bind(endpoint, ec);
85 if (ec) {
86 CROW_LOG_ERROR << "Failed to bind to " << endpoint.address().to_string()
87 << ":" << endpoint.port() << " - " << ec.message();
88 startup_failed_ = true;
89 return;
90 }
91
92 acceptor_.listen(tcp::acceptor::max_listen_connections, ec);
93 if (ec) {
94 CROW_LOG_ERROR << "Failed to listen on port: " << ec.message();
95 startup_failed_ = true;
96 return;
97 }
98
99
100 }
101
102 void set_tick_function(std::chrono::milliseconds d, std::function<void()> f)
103 {
104 tick_interval_ = d;
105 tick_function_ = f;
106 }
107
108 void on_tick()
109 {
110 tick_function_();
111 tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
112 tick_timer_.async_wait([this](const error_code& ec) {
113 if (ec)
114 return;
115 on_tick();
116 });
117 }
118
119 void run()
120 {
121
122 if (startup_failed_) {
123 CROW_LOG_ERROR << "Server startup failed. Aborting run().";
124 return;
125 }
126
127 uint16_t worker_thread_count = concurrency_ - 1;
128 for (int i = 0; i < worker_thread_count; i++)
129 io_context_pool_.emplace_back(new asio::io_context());
130 get_cached_date_str_pool_.resize(worker_thread_count);
131 task_timer_pool_.resize(worker_thread_count);
132
133 std::vector<std::future<void>> v;
134 std::atomic<int> init_count(0);
135 for (uint16_t i = 0; i < worker_thread_count; i++)
136 v.push_back(
137 std::async(
138 std::launch::async, [this, i, &init_count] {
139 // thread local date string get function
140 auto last = std::chrono::steady_clock::now();
141
142 std::string date_str;
143 auto update_date_str = [&] {
144 auto last_time_t = time(0);
145 tm my_tm;
146
147#if defined(_MSC_VER) || defined(__MINGW32__)
148 gmtime_s(&my_tm, &last_time_t);
149#else
150 gmtime_r(&last_time_t, &my_tm);
151#endif
152 date_str.resize(100);
153 size_t date_str_sz = strftime(&date_str[0], 99, "%a, %d %b %Y %H:%M:%S GMT", &my_tm);
154 date_str.resize(date_str_sz);
155 };
156 update_date_str();
157 get_cached_date_str_pool_[i] = [&]() -> std::string {
158 if (std::chrono::steady_clock::now() - last >= std::chrono::seconds(1))
159 {
160 last = std::chrono::steady_clock::now();
161 update_date_str();
162 }
163 return date_str;
164 };
165
166 // initializing task timers
167 detail::task_timer task_timer(*io_context_pool_[i]);
168 task_timer.set_default_timeout(timeout_);
169 task_timer_pool_[i] = &task_timer;
170 task_queue_length_pool_[i] = 0;
171
172 init_count++;
173 while (1)
174 {
175 try
176 {
177 if (io_context_pool_[i]->run() == 0)
178 {
179 // when io_service.run returns 0, there are no more works to do.
180 break;
181 }
182 }
183 catch (std::exception& e)
184 {
185 CROW_LOG_ERROR << "Worker Crash: An uncaught exception occurred: " << e.what();
186 }
187 }
188 }));
189
190 if (tick_function_ && tick_interval_.count() > 0)
191 {
192 tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
193 tick_timer_.async_wait(
194 [this](const error_code& ec) {
195 if (ec)
196 return;
197 on_tick();
198 });
199 }
200
201 handler_->port(acceptor_.local_endpoint().port());
202
203
204 CROW_LOG_INFO << server_name_
205 << " server is running at " << (handler_->ssl_used() ? "https://" : "http://")
206 << acceptor_.local_endpoint().address() << ":" << acceptor_.local_endpoint().port() << " using " << concurrency_ << " threads";
207 CROW_LOG_INFO << "Call `app.loglevel(crow::LogLevel::Warning)` to hide Info level logs.";
208
209 signals_.async_wait(
210 [&](const error_code& /*error*/, int /*signal_number*/) {
211 stop();
212 });
213
214 while (worker_thread_count != init_count)
215 std::this_thread::yield();
216
217 do_accept();
218
219 std::thread(
220 [this] {
221 notify_start();
222 io_context_.run();
223 CROW_LOG_INFO << "Exiting.";
224 })
225 .join();
226 }
227
228 void stop()
229 {
230 shutting_down_ = true; // Prevent the acceptor from taking new connections
231
232 // Explicitly close the acceptor
233 // else asio will throw an exception (linux only), when trying to start server again:
234 // what(): bind: Address already in use
235 if (acceptor_.is_open())
236 {
237 CROW_LOG_INFO << "Closing acceptor. " << &acceptor_;
238 acceptor_.close();
239 }
240
241 for (auto& io_context : io_context_pool_)
242 {
243 if (io_context != nullptr)
244 {
245 CROW_LOG_INFO << "Closing IO service " << &io_context;
246 io_context->stop(); // Close all io_services (and HTTP connections)
247 }
248 }
249
250 CROW_LOG_INFO << "Closing main IO service (" << &io_context_ << ')';
251 io_context_.stop(); // Close main io_service
252 }
253
254 uint16_t port() const {
255 return acceptor_.local_endpoint().port();
256 }
257
258 /// Wait until the server has properly started or until timeout
259 std::cv_status wait_for_start(std::chrono::steady_clock::time_point wait_until)
260 {
261 std::unique_lock<std::mutex> lock(start_mutex_);
262
263 std::cv_status status = std::cv_status::no_timeout;
264 while (!server_started_ && !startup_failed_ && status == std::cv_status::no_timeout)
265 status = cv_started_.wait_until(lock, wait_until);
266 return status;
267 }
268
269
270 void signal_clear()
271 {
272 signals_.clear();
273 }
274
275 void signal_add(int signal_number)
276 {
277 signals_.add(signal_number);
278 }
279
280 private:
281 uint16_t pick_io_context_idx()
282 {
283 uint16_t min_queue_idx = 0;
284
285 // TODO improve load balancing
286 // size_t is used here to avoid the security issue https://codeql.github.com/codeql-query-help/cpp/cpp-comparison-with-wider-type/
287 // even though the max value of this can be only uint16_t as concurrency is uint16_t.
288 for (size_t i = 1; i < task_queue_length_pool_.size() && task_queue_length_pool_[min_queue_idx] > 0; i++)
289 // No need to check other io_services if the current one has no tasks
290 {
291 if (task_queue_length_pool_[i] < task_queue_length_pool_[min_queue_idx])
292 min_queue_idx = i;
293 }
294 return min_queue_idx;
295 }
296
297 void do_accept()
298 {
299 if (!shutting_down_)
300 {
301 uint16_t context_idx = pick_io_context_idx();
302 asio::io_context& ic = *io_context_pool_[context_idx];
303 task_queue_length_pool_[context_idx]++;
304 CROW_LOG_DEBUG << &ic << " {" << context_idx << "} queue length: " << task_queue_length_pool_[context_idx];
305
306 auto p = std::make_shared<Connection<Adaptor, Handler, Middlewares...>>(
307 ic, handler_, server_name_, middlewares_,
308 get_cached_date_str_pool_[context_idx], *task_timer_pool_[context_idx], adaptor_ctx_, task_queue_length_pool_[context_idx]);
309
310 acceptor_.async_accept(
311 p->socket(),
312 [this, p, &ic, context_idx](error_code ec) {
313 if (!ec)
314 {
315 asio::post(ic,
316 [p] {
317 p->start();
318 });
319 }
320 else
321 {
322 task_queue_length_pool_[context_idx]--;
323 CROW_LOG_DEBUG << &ic << " {" << context_idx << "} queue length: " << task_queue_length_pool_[context_idx];
324 }
325 do_accept();
326 });
327 }
328 }
329
330 /// Notify anything using `wait_for_start()` to proceed
331 void notify_start()
332 {
333 std::unique_lock<std::mutex> lock(start_mutex_);
334 server_started_ = true;
335 cv_started_.notify_all();
336 }
337
338 private:
339 std::vector<std::unique_ptr<asio::io_context>> io_context_pool_;
340 asio::io_context io_context_;
341 std::vector<detail::task_timer*> task_timer_pool_;
342 std::vector<std::function<std::string()>> get_cached_date_str_pool_;
343 tcp::acceptor acceptor_;
344 bool shutting_down_ = false;
345 bool server_started_{false};
346 bool startup_failed_ = false;
347 std::condition_variable cv_started_;
348 std::mutex start_mutex_;
349 asio::signal_set signals_;
350
351 asio::basic_waitable_timer<std::chrono::high_resolution_clock> tick_timer_;
352
353 Handler* handler_;
354 uint16_t concurrency_{2};
355 std::uint8_t timeout_;
356 std::string server_name_;
357 std::vector<std::atomic<unsigned int>> task_queue_length_pool_;
358
359 std::chrono::milliseconds tick_interval_;
360 std::function<void()> tick_function_;
361
362 std::tuple<Middlewares...>* middlewares_;
363
364 typename Adaptor::context* adaptor_ctx_;
365 };
366} // namespace crow
Definition http_server.h:43
std::cv_status wait_for_start(std::chrono::steady_clock::time_point wait_until)
Wait until the server has properly started or until timeout.
Definition http_server.h:259
Definition task_timer.h:36
void set_default_timeout(uint8_t timeout)
Definition task_timer.h:107
The main namespace of the library. In this namespace is defined the most important classes and functi...