Crow  1.1
A C++ microframework for the web
 
Loading...
Searching...
No Matches
http_server.h
1#pragma once
2
3#ifdef CROW_USE_BOOST
4#include <boost/asio.hpp>
5#ifdef CROW_ENABLE_SSL
6#include <boost/asio/ssl.hpp>
7#endif
8#else
9#ifndef ASIO_STANDALONE
10#define ASIO_STANDALONE
11#endif
12#include <asio.hpp>
13#ifdef CROW_ENABLE_SSL
14#include <asio/ssl.hpp>
15#endif
16#endif
17
18#include <atomic>
19#include <chrono>
20#include <cstdint>
21#include <future>
22#include <memory>
23#include <thread>
24#include <vector>
25
26#include "crow/version.h"
27#include "crow/http_connection.h"
28#include "crow/logging.h"
29#include "crow/task_timer.h"
30#include "crow/socket_acceptors.h"
31
32
33namespace crow // NOTE: Already documented in "crow/app.h"
34{
35#ifdef CROW_USE_BOOST
36 namespace asio = boost::asio;
37 using error_code = boost::system::error_code;
38#else
39 using error_code = asio::error_code;
40#endif
41 using tcp = asio::ip::tcp;
42 using stream_protocol = asio::local::stream_protocol;
43
44 template<typename Handler, typename Acceptor = TCPAcceptor, typename Adaptor = SocketAdaptor, typename... Middlewares>
45 class Server
46 {
47 public:
48 Server(Handler* handler,
49 typename Acceptor::endpoint endpoint,
50 std::string server_name = std::string("Crow/") + VERSION,
51 std::tuple<Middlewares...>* middlewares = nullptr,
52 unsigned int concurrency = 1,
53 uint8_t timeout = 5,
54 typename Adaptor::context* adaptor_ctx = nullptr):
55 concurrency_(concurrency),
56 task_queue_length_pool_(concurrency_ - 1),
57 acceptor_(io_context_),
58 signals_(io_context_),
59 tick_timer_(io_context_),
60 handler_(handler),
61 timeout_(timeout),
62 server_name_(server_name),
63 middlewares_(middlewares),
64 adaptor_ctx_(adaptor_ctx)
65 {
66 if (startup_failed_) {
67 CROW_LOG_ERROR << "Startup failed; not running server.";
68 return;
69 }
70
71 error_code ec;
72
73 acceptor_.raw_acceptor().open(endpoint.protocol(), ec);
74 if (ec) {
75 CROW_LOG_ERROR << "Failed to open acceptor: " << ec.message();
76 startup_failed_ = true;
77 return;
78 }
79
80 acceptor_.raw_acceptor().set_option(Acceptor::reuse_address_option(), ec);
81 if (ec) {
82 CROW_LOG_ERROR << "Failed to set socket option: " << ec.message();
83 startup_failed_ = true;
84 return;
85 }
86
87 acceptor_.raw_acceptor().bind(endpoint, ec);
88 if (ec) {
89 CROW_LOG_ERROR << "Failed to bind to " << acceptor_.address()
90 << ":" << acceptor_.port() << " - " << ec.message();
91 startup_failed_ = true;
92 return;
93 }
94
95 acceptor_.raw_acceptor().listen(tcp::acceptor::max_listen_connections, ec);
96 if (ec) {
97 CROW_LOG_ERROR << "Failed to listen on port: " << ec.message();
98 startup_failed_ = true;
99 return;
100 }
101
102
103 }
104
105 void set_tick_function(std::chrono::milliseconds d, std::function<void()> f)
106 {
107 tick_interval_ = d;
108 tick_function_ = f;
109 }
110
111 void on_tick()
112 {
113 tick_function_();
114 tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
115 tick_timer_.async_wait([this](const error_code& ec) {
116 if (ec)
117 return;
118 on_tick();
119 });
120 }
121
122 void run()
123 {
124
125 if (startup_failed_) {
126 CROW_LOG_ERROR << "Server startup failed. Aborting run().";
127 return;
128 }
129
130 uint16_t worker_thread_count = concurrency_ - 1;
131 for (int i = 0; i < worker_thread_count; i++)
132 io_context_pool_.emplace_back(new asio::io_context());
133 get_cached_date_str_pool_.resize(worker_thread_count);
134 task_timer_pool_.resize(worker_thread_count);
135
136 std::vector<std::future<void>> v;
137 std::atomic<int> init_count(0);
138 for (uint16_t i = 0; i < worker_thread_count; i++)
139 v.push_back(
140 std::async(
141 std::launch::async, [this, i, &init_count] {
142 // thread local date string get function
143 auto last = std::chrono::steady_clock::now();
144
145 std::string date_str;
146 auto update_date_str = [&] {
147 auto last_time_t = time(0);
148 tm my_tm;
149
150#if defined(_MSC_VER) || defined(__MINGW32__)
151 gmtime_s(&my_tm, &last_time_t);
152#else
153 gmtime_r(&last_time_t, &my_tm);
154#endif
155 date_str.resize(100);
156 size_t date_str_sz = strftime(&date_str[0], 99, "%a, %d %b %Y %H:%M:%S GMT", &my_tm);
157 date_str.resize(date_str_sz);
158 };
159 update_date_str();
160 get_cached_date_str_pool_[i] = [&]() -> std::string {
161 if (std::chrono::steady_clock::now() - last >= std::chrono::seconds(1))
162 {
163 last = std::chrono::steady_clock::now();
164 update_date_str();
165 }
166 return date_str;
167 };
168
169 // initializing task timers
170 detail::task_timer task_timer(*io_context_pool_[i]);
171 task_timer.set_default_timeout(timeout_);
172 task_timer_pool_[i] = &task_timer;
173 task_queue_length_pool_[i] = 0;
174
175 init_count++;
176 while (1)
177 {
178 try
179 {
180 if (io_context_pool_[i]->run() == 0)
181 {
182 // when io_service.run returns 0, there are no more works to do.
183 break;
184 }
185 }
186 catch (std::exception& e)
187 {
188 CROW_LOG_ERROR << "Worker Crash: An uncaught exception occurred: " << e.what();
189 }
190 }
191 }));
192
193 if (tick_function_ && tick_interval_.count() > 0)
194 {
195 tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
196 tick_timer_.async_wait(
197 [this](const error_code& ec) {
198 if (ec)
199 return;
200 on_tick();
201 });
202 }
203 handler_->port(acceptor_.port());
204 handler_->address_is_bound();
205 CROW_LOG_INFO << server_name_
206 << " server is running at " << acceptor_.url_display(handler_->ssl_used())
207 << " using " << concurrency_ << " threads";
208 CROW_LOG_INFO << "Call `app.loglevel(crow::LogLevel::Warning)` to hide Info level logs.";
209
210 signals_.async_wait(
211 [&](const error_code& /*error*/, int /*signal_number*/) {
212 stop();
213 });
214
215 while (worker_thread_count != init_count)
216 std::this_thread::yield();
217
218 do_accept();
219
220 std::thread(
221 [this] {
222 notify_start();
223 io_context_.run();
224 CROW_LOG_INFO << "Exiting.";
225 })
226 .join();
227 }
228
229 void stop()
230 {
231 shutting_down_ = true; // Prevent the acceptor from taking new connections
232
233 // Explicitly close the acceptor
234 // else asio will throw an exception (linux only), when trying to start server again:
235 // what(): bind: Address already in use
236 if (acceptor_.raw_acceptor().is_open())
237 {
238 CROW_LOG_INFO << "Closing acceptor. " << &acceptor_;
239 error_code ec;
240 acceptor_.raw_acceptor().close(ec);
241 if (ec)
242 {
243 CROW_LOG_WARNING << "Failed to close acceptor: " << ec.message();
244 }
245 }
246
247 for (auto& io_context : io_context_pool_)
248 {
249 if (io_context != nullptr)
250 {
251 CROW_LOG_INFO << "Closing IO service " << &io_context;
252 io_context->stop(); // Close all io_services (and HTTP connections)
253 }
254 }
255
256 CROW_LOG_INFO << "Closing main IO service (" << &io_context_ << ')';
257 io_context_.stop(); // Close main io_service
258 }
259
260
261 uint16_t port() const {
262 return acceptor_.local_endpoint().port();
263 }
264
265 /// Wait until the server has properly started or until timeout
266 std::cv_status wait_for_start(std::chrono::steady_clock::time_point wait_until)
267 {
268 std::unique_lock<std::mutex> lock(start_mutex_);
269
270 std::cv_status status = std::cv_status::no_timeout;
271 while (!server_started_ && !startup_failed_ && status == std::cv_status::no_timeout)
272 status = cv_started_.wait_until(lock, wait_until);
273 return status;
274 }
275
276
277 void signal_clear()
278 {
279 signals_.clear();
280 }
281
282 void signal_add(int signal_number)
283 {
284 signals_.add(signal_number);
285 }
286
287 private:
288 size_t pick_io_context_idx()
289 {
290 size_t min_queue_idx = 0;
291
292 // TODO improve load balancing
293 // size_t is used here to avoid the security issue https://codeql.github.com/codeql-query-help/cpp/cpp-comparison-with-wider-type/
294 // even though the max value of this can be only uint16_t as concurrency is uint16_t.
295 for (size_t i = 1; i < task_queue_length_pool_.size() && task_queue_length_pool_[min_queue_idx] > 0; i++)
296 // No need to check other io_services if the current one has no tasks
297 {
298 if (task_queue_length_pool_[i] < task_queue_length_pool_[min_queue_idx])
299 min_queue_idx = i;
300 }
301 return min_queue_idx;
302 }
303
304 void do_accept()
305 {
306 if (!shutting_down_)
307 {
308 size_t context_idx = pick_io_context_idx();
309 asio::io_context& ic = *io_context_pool_[context_idx];
310 auto p = std::make_shared<Connection<Adaptor, Handler, Middlewares...>>(
311 ic, handler_, server_name_, middlewares_,
312 get_cached_date_str_pool_[context_idx], *task_timer_pool_[context_idx], adaptor_ctx_, task_queue_length_pool_[context_idx]);
313
314 CROW_LOG_DEBUG << &ic << " {" << context_idx << "} queue length: " << task_queue_length_pool_[context_idx];
315
316 acceptor_.raw_acceptor().async_accept(
317 p->socket(),
318 [this, p, &ic](error_code ec) {
319 if (!ec)
320 {
321 asio::post(ic,
322 [p] {
323 p->start();
324 });
325 }
326 do_accept();
327 });
328 }
329 }
330
331 /// Notify anything using `wait_for_start()` to proceed
332 void notify_start()
333 {
334 std::unique_lock<std::mutex> lock(start_mutex_);
335 server_started_ = true;
336 cv_started_.notify_all();
337 }
338
339 private:
340 unsigned int concurrency_{2};
341 std::vector<std::atomic<unsigned int>> task_queue_length_pool_;
342 std::vector<std::unique_ptr<asio::io_context>> io_context_pool_;
343 asio::io_context io_context_;
344 std::vector<detail::task_timer*> task_timer_pool_;
345 std::vector<std::function<std::string()>> get_cached_date_str_pool_;
346 Acceptor acceptor_;
347 bool shutting_down_ = false;
348 bool server_started_{false};
349 bool startup_failed_ = false;
350 std::condition_variable cv_started_;
351 std::mutex start_mutex_;
352 asio::signal_set signals_;
353
354 asio::basic_waitable_timer<std::chrono::high_resolution_clock> tick_timer_;
355
356 Handler* handler_;
357 std::uint8_t timeout_;
358 std::string server_name_;
359 bool use_unix_;
360
361 std::chrono::milliseconds tick_interval_;
362 std::function<void()> tick_function_;
363
364 std::tuple<Middlewares...>* middlewares_;
365
366 typename Adaptor::context* adaptor_ctx_;
367 };
368} // namespace crow
Definition http_server.h:46
std::cv_status wait_for_start(std::chrono::steady_clock::time_point wait_until)
Wait until the server has properly started or until timeout.
Definition http_server.h:266
Definition task_timer.h:36
void set_default_timeout(uint8_t timeout)
Definition task_timer.h:107
The main namespace of the library. In this namespace is defined the most important classes and functi...