Crow  1.1
A C++ microframework for the web
 
Loading...
Searching...
No Matches
http_server.h
1#pragma once
2
3#ifdef CROW_USE_BOOST
4#include <boost/asio.hpp>
5#ifdef CROW_ENABLE_SSL
6#include <boost/asio/ssl.hpp>
7#endif
8#else
9#ifndef ASIO_STANDALONE
10#define ASIO_STANDALONE
11#endif
12#include <asio.hpp>
13#ifdef CROW_ENABLE_SSL
14#include <asio/ssl.hpp>
15#endif
16#endif
17
18#include <atomic>
19#include <chrono>
20#include <cstdint>
21#include <future>
22#include <memory>
23#include <vector>
24
25#include "crow/version.h"
26#include "crow/http_connection.h"
27#include "crow/logging.h"
28#include "crow/task_timer.h"
29#include "crow/socket_acceptors.h"
30
31
32namespace crow // NOTE: Already documented in "crow/app.h"
33{
34#ifdef CROW_USE_BOOST
35 namespace asio = boost::asio;
36 using error_code = boost::system::error_code;
37#else
38 using error_code = asio::error_code;
39#endif
40 using tcp = asio::ip::tcp;
41 using stream_protocol = asio::local::stream_protocol;
42
43 template<typename Handler, typename Acceptor = TCPAcceptor, typename Adaptor = SocketAdaptor, typename... Middlewares>
44 class Server
45 {
46 public:
47 Server(Handler* handler,
48 typename Acceptor::endpoint endpoint,
49 std::string server_name = std::string("Crow/") + VERSION,
50 std::tuple<Middlewares...>* middlewares = nullptr,
51 unsigned int concurrency = 1,
52 uint8_t timeout = 5,
53 typename Adaptor::context* adaptor_ctx = nullptr):
54 concurrency_(concurrency),
55 task_queue_length_pool_(concurrency_ - 1),
56 acceptor_(io_context_),
57 signals_(io_context_),
58 tick_timer_(io_context_),
59 handler_(handler),
60 timeout_(timeout),
61 server_name_(server_name),
62 middlewares_(middlewares),
63 adaptor_ctx_(adaptor_ctx)
64 {
65 if (startup_failed_) {
66 CROW_LOG_ERROR << "Startup failed; not running server.";
67 return;
68 }
69
70 error_code ec;
71
72 acceptor_.raw_acceptor().open(endpoint.protocol(), ec);
73 if (ec) {
74 CROW_LOG_ERROR << "Failed to open acceptor: " << ec.message();
75 startup_failed_ = true;
76 return;
77 }
78
79 acceptor_.raw_acceptor().set_option(Acceptor::reuse_address_option(), ec);
80 if (ec) {
81 CROW_LOG_ERROR << "Failed to set socket option: " << ec.message();
82 startup_failed_ = true;
83 return;
84 }
85
86 acceptor_.raw_acceptor().bind(endpoint, ec);
87 if (ec) {
88 CROW_LOG_ERROR << "Failed to bind to " << acceptor_.address()
89 << ":" << acceptor_.port() << " - " << ec.message();
90 startup_failed_ = true;
91 return;
92 }
93
94 acceptor_.raw_acceptor().listen(tcp::acceptor::max_listen_connections, ec);
95 if (ec) {
96 CROW_LOG_ERROR << "Failed to listen on port: " << ec.message();
97 startup_failed_ = true;
98 return;
99 }
100
101
102 }
103
104 void set_tick_function(std::chrono::milliseconds d, std::function<void()> f)
105 {
106 tick_interval_ = d;
107 tick_function_ = f;
108 }
109
110 void on_tick()
111 {
112 tick_function_();
113 tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
114 tick_timer_.async_wait([this](const error_code& ec) {
115 if (ec)
116 return;
117 on_tick();
118 });
119 }
120
121 void run()
122 {
123
124 if (startup_failed_) {
125 CROW_LOG_ERROR << "Server startup failed. Aborting run().";
126 return;
127 }
128
129 uint16_t worker_thread_count = concurrency_ - 1;
130 for (int i = 0; i < worker_thread_count; i++)
131 io_context_pool_.emplace_back(new asio::io_context());
132 get_cached_date_str_pool_.resize(worker_thread_count);
133 task_timer_pool_.resize(worker_thread_count);
134
135 std::vector<std::future<void>> v;
136 std::atomic<int> init_count(0);
137 for (uint16_t i = 0; i < worker_thread_count; i++)
138 v.push_back(
139 std::async(
140 std::launch::async, [this, i, &init_count] {
141 // thread local date string get function
142 auto last = std::chrono::steady_clock::now();
143
144 std::string date_str;
145 auto update_date_str = [&] {
146 auto last_time_t = time(0);
147 tm my_tm;
148
149#if defined(_MSC_VER) || defined(__MINGW32__)
150 gmtime_s(&my_tm, &last_time_t);
151#else
152 gmtime_r(&last_time_t, &my_tm);
153#endif
154 date_str.resize(100);
155 size_t date_str_sz = strftime(&date_str[0], 99, "%a, %d %b %Y %H:%M:%S GMT", &my_tm);
156 date_str.resize(date_str_sz);
157 };
158 update_date_str();
159 get_cached_date_str_pool_[i] = [&]() -> std::string {
160 if (std::chrono::steady_clock::now() - last >= std::chrono::seconds(1))
161 {
162 last = std::chrono::steady_clock::now();
163 update_date_str();
164 }
165 return date_str;
166 };
167
168 // initializing task timers
169 detail::task_timer task_timer(*io_context_pool_[i]);
170 task_timer.set_default_timeout(timeout_);
171 task_timer_pool_[i] = &task_timer;
172 task_queue_length_pool_[i] = 0;
173
174 init_count++;
175 while (1)
176 {
177 try
178 {
179 if (io_context_pool_[i]->run() == 0)
180 {
181 // when io_service.run returns 0, there are no more works to do.
182 break;
183 }
184 }
185 catch (std::exception& e)
186 {
187 CROW_LOG_ERROR << "Worker Crash: An uncaught exception occurred: " << e.what();
188 }
189 }
190 }));
191
192 if (tick_function_ && tick_interval_.count() > 0)
193 {
194 tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
195 tick_timer_.async_wait(
196 [this](const error_code& ec) {
197 if (ec)
198 return;
199 on_tick();
200 });
201 }
202 handler_->port(acceptor_.port());
203 CROW_LOG_INFO << server_name_
204 << " server is running at " << acceptor_.url_display(handler_->ssl_used())
205 << " using " << concurrency_ << " threads";
206 CROW_LOG_INFO << "Call `app.loglevel(crow::LogLevel::Warning)` to hide Info level logs.";
207
208 signals_.async_wait(
209 [&](const error_code& /*error*/, int /*signal_number*/) {
210 stop();
211 });
212
213 while (worker_thread_count != init_count)
214 std::this_thread::yield();
215
216 do_accept();
217
218 std::thread(
219 [this] {
220 notify_start();
221 io_context_.run();
222 CROW_LOG_INFO << "Exiting.";
223 })
224 .join();
225 }
226
227 void stop()
228 {
229 shutting_down_ = true; // Prevent the acceptor from taking new connections
230
231 // Explicitly close the acceptor
232 // else asio will throw an exception (linux only), when trying to start server again:
233 // what(): bind: Address already in use
234 if (acceptor_.raw_acceptor().is_open())
235 {
236 CROW_LOG_INFO << "Closing acceptor. " << &acceptor_;
237 error_code ec;
238 acceptor_.raw_acceptor().close(ec);
239 if (ec)
240 {
241 CROW_LOG_WARNING << "Failed to close acceptor: " << ec.message();
242 }
243 }
244
245 for (auto& io_context : io_context_pool_)
246 {
247 if (io_context != nullptr)
248 {
249 CROW_LOG_INFO << "Closing IO service " << &io_context;
250 io_context->stop(); // Close all io_services (and HTTP connections)
251 }
252 }
253
254 CROW_LOG_INFO << "Closing main IO service (" << &io_context_ << ')';
255 io_context_.stop(); // Close main io_service
256 }
257
258
259 uint16_t port() const {
260 return acceptor_.local_endpoint().port();
261 }
262
263 /// Wait until the server has properly started or until timeout
264 std::cv_status wait_for_start(std::chrono::steady_clock::time_point wait_until)
265 {
266 std::unique_lock<std::mutex> lock(start_mutex_);
267
268 std::cv_status status = std::cv_status::no_timeout;
269 while (!server_started_ && !startup_failed_ && status == std::cv_status::no_timeout)
270 status = cv_started_.wait_until(lock, wait_until);
271 return status;
272 }
273
274
275 void signal_clear()
276 {
277 signals_.clear();
278 }
279
280 void signal_add(int signal_number)
281 {
282 signals_.add(signal_number);
283 }
284
285 private:
286 size_t pick_io_context_idx()
287 {
288 size_t min_queue_idx = 0;
289
290 // TODO improve load balancing
291 // size_t is used here to avoid the security issue https://codeql.github.com/codeql-query-help/cpp/cpp-comparison-with-wider-type/
292 // even though the max value of this can be only uint16_t as concurrency is uint16_t.
293 for (size_t i = 1; i < task_queue_length_pool_.size() && task_queue_length_pool_[min_queue_idx] > 0; i++)
294 // No need to check other io_services if the current one has no tasks
295 {
296 if (task_queue_length_pool_[i] < task_queue_length_pool_[min_queue_idx])
297 min_queue_idx = i;
298 }
299 return min_queue_idx;
300 }
301
302 void do_accept()
303 {
304 if (!shutting_down_)
305 {
306 size_t context_idx = pick_io_context_idx();
307 asio::io_context& ic = *io_context_pool_[context_idx];
308 auto p = std::make_shared<Connection<Adaptor, Handler, Middlewares...>>(
309 ic, handler_, server_name_, middlewares_,
310 get_cached_date_str_pool_[context_idx], *task_timer_pool_[context_idx], adaptor_ctx_, task_queue_length_pool_[context_idx]);
311
312 CROW_LOG_DEBUG << &ic << " {" << context_idx << "} queue length: " << task_queue_length_pool_[context_idx];
313
314 acceptor_.raw_acceptor().async_accept(
315 p->socket(),
316 [this, p, &ic, context_idx](error_code ec) {
317 if (!ec)
318 {
319 asio::post(ic,
320 [p] {
321 p->start();
322 });
323 }
324 do_accept();
325 });
326 }
327 }
328
329 /// Notify anything using `wait_for_start()` to proceed
330 void notify_start()
331 {
332 std::unique_lock<std::mutex> lock(start_mutex_);
333 server_started_ = true;
334 cv_started_.notify_all();
335 }
336
337 private:
338 unsigned int concurrency_{2};
339 std::vector<std::atomic<unsigned int>> task_queue_length_pool_;
340 std::vector<std::unique_ptr<asio::io_context>> io_context_pool_;
341 asio::io_context io_context_;
342 std::vector<detail::task_timer*> task_timer_pool_;
343 std::vector<std::function<std::string()>> get_cached_date_str_pool_;
344 Acceptor acceptor_;
345 bool shutting_down_ = false;
346 bool server_started_{false};
347 bool startup_failed_ = false;
348 std::condition_variable cv_started_;
349 std::mutex start_mutex_;
350 asio::signal_set signals_;
351
352 asio::basic_waitable_timer<std::chrono::high_resolution_clock> tick_timer_;
353
354 Handler* handler_;
355 std::uint8_t timeout_;
356 std::string server_name_;
357 bool use_unix_;
358
359 std::chrono::milliseconds tick_interval_;
360 std::function<void()> tick_function_;
361
362 std::tuple<Middlewares...>* middlewares_;
363
364 typename Adaptor::context* adaptor_ctx_;
365 };
366} // namespace crow
Definition http_server.h:45
std::cv_status wait_for_start(std::chrono::steady_clock::time_point wait_until)
Wait until the server has properly started or until timeout.
Definition http_server.h:264
Definition task_timer.h:36
void set_default_timeout(uint8_t timeout)
Definition task_timer.h:107
The main namespace of the library. In this namespace is defined the most important classes and functi...