Crow  1.1
A C++ microframework for the web
 
Loading...
Searching...
No Matches
http_server.h
1#pragma once
2
3#ifdef CROW_USE_BOOST
4#include <boost/asio.hpp>
5#ifdef CROW_ENABLE_SSL
6#include <boost/asio/ssl.hpp>
7#endif
8#else
9#ifndef ASIO_STANDALONE
10#define ASIO_STANDALONE
11#endif
12#include <asio.hpp>
13#ifdef CROW_ENABLE_SSL
14#include <asio/ssl.hpp>
15#endif
16#endif
17
18#include <atomic>
19#include <chrono>
20#include <cstdint>
21#include <future>
22#include <memory>
23#include <vector>
24
25#include "crow/version.h"
26#include "crow/http_connection.h"
27#include "crow/logging.h"
28#include "crow/task_timer.h"
29
30
31namespace crow // NOTE: Already documented in "crow/app.h"
32{
33#ifdef CROW_USE_BOOST
34 namespace asio = boost::asio;
35 using error_code = boost::system::error_code;
36#else
37 using error_code = asio::error_code;
38#endif
39 using tcp = asio::ip::tcp;
40
41 template<typename Handler, typename Adaptor = SocketAdaptor, typename... Middlewares>
42 class Server
43 {
44 public:
45 Server(Handler* handler,
46 const tcp::endpoint& endpoint,
47 std::string server_name = std::string("Crow/") + VERSION,
48 std::tuple<Middlewares...>* middlewares = nullptr,
49 uint16_t concurrency = 1,
50 uint8_t timeout = 5,
51 typename Adaptor::context* adaptor_ctx = nullptr):
52 acceptor_(io_context_,endpoint),
53 signals_(io_context_),
54 tick_timer_(io_context_),
55 handler_(handler),
56 concurrency_(concurrency),
57 timeout_(timeout),
58 server_name_(server_name),
59 task_queue_length_pool_(concurrency_ - 1),
60 middlewares_(middlewares),
61 adaptor_ctx_(adaptor_ctx)
62 {}
63
64 void set_tick_function(std::chrono::milliseconds d, std::function<void()> f)
65 {
66 tick_interval_ = d;
67 tick_function_ = f;
68 }
69
70 void on_tick()
71 {
72 tick_function_();
73 tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
74 tick_timer_.async_wait([this](const error_code& ec) {
75 if (ec)
76 return;
77 on_tick();
78 });
79 }
80
81 void run()
82 {
83 uint16_t worker_thread_count = concurrency_ - 1;
84 for (int i = 0; i < worker_thread_count; i++)
85 io_context_pool_.emplace_back(new asio::io_context());
86 get_cached_date_str_pool_.resize(worker_thread_count);
87 task_timer_pool_.resize(worker_thread_count);
88
89 std::vector<std::future<void>> v;
90 std::atomic<int> init_count(0);
91 for (uint16_t i = 0; i < worker_thread_count; i++)
92 v.push_back(
93 std::async(
94 std::launch::async, [this, i, &init_count] {
95 // thread local date string get function
96 auto last = std::chrono::steady_clock::now();
97
98 std::string date_str;
99 auto update_date_str = [&] {
100 auto last_time_t = time(0);
101 tm my_tm;
102
103#if defined(_MSC_VER) || defined(__MINGW32__)
104 gmtime_s(&my_tm, &last_time_t);
105#else
106 gmtime_r(&last_time_t, &my_tm);
107#endif
108 date_str.resize(100);
109 size_t date_str_sz = strftime(&date_str[0], 99, "%a, %d %b %Y %H:%M:%S GMT", &my_tm);
110 date_str.resize(date_str_sz);
111 };
112 update_date_str();
113 get_cached_date_str_pool_[i] = [&]() -> std::string {
114 if (std::chrono::steady_clock::now() - last >= std::chrono::seconds(1))
115 {
116 last = std::chrono::steady_clock::now();
117 update_date_str();
118 }
119 return date_str;
120 };
121
122 // initializing task timers
123 detail::task_timer task_timer(*io_context_pool_[i]);
124 task_timer.set_default_timeout(timeout_);
125 task_timer_pool_[i] = &task_timer;
126 task_queue_length_pool_[i] = 0;
127
128 init_count++;
129 while (1)
130 {
131 try
132 {
133 if (io_context_pool_[i]->run() == 0)
134 {
135 // when io_service.run returns 0, there are no more works to do.
136 break;
137 }
138 }
139 catch (std::exception& e)
140 {
141 CROW_LOG_ERROR << "Worker Crash: An uncaught exception occurred: " << e.what();
142 }
143 }
144 }));
145
146 if (tick_function_ && tick_interval_.count() > 0)
147 {
148 tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
149 tick_timer_.async_wait(
150 [this](const error_code& ec) {
151 if (ec)
152 return;
153 on_tick();
154 });
155 }
156
157 handler_->port(acceptor_.local_endpoint().port());
158
159
160 CROW_LOG_INFO << server_name_
161 << " server is running at " << (handler_->ssl_used() ? "https://" : "http://")
162 << acceptor_.local_endpoint().address() << ":" << acceptor_.local_endpoint().port() << " using " << concurrency_ << " threads";
163 CROW_LOG_INFO << "Call `app.loglevel(crow::LogLevel::Warning)` to hide Info level logs.";
164
165 signals_.async_wait(
166 [&](const error_code& /*error*/, int /*signal_number*/) {
167 stop();
168 });
169
170 while (worker_thread_count != init_count)
171 std::this_thread::yield();
172
173 do_accept();
174
175 std::thread(
176 [this] {
177 notify_start();
178 io_context_.run();
179 CROW_LOG_INFO << "Exiting.";
180 })
181 .join();
182 }
183
184 void stop()
185 {
186 shutting_down_ = true; // Prevent the acceptor from taking new connections
187 for (auto& io_context : io_context_pool_)
188 {
189 if (io_context != nullptr)
190 {
191 CROW_LOG_INFO << "Closing IO service " << &io_context;
192 io_context->stop(); // Close all io_services (and HTTP connections)
193 }
194 }
195
196 CROW_LOG_INFO << "Closing main IO service (" << &io_context_ << ')';
197 io_context_.stop(); // Close main io_service
198 }
199
200 uint16_t port() const {
201 return acceptor_.local_endpoint().port();
202 }
203
204 /// Wait until the server has properly started or until timeout
205 std::cv_status wait_for_start(std::chrono::steady_clock::time_point wait_until)
206 {
207 std::unique_lock<std::mutex> lock(start_mutex_);
208
209 std::cv_status status = std::cv_status::no_timeout;
210 while (!server_started_ && ( status==std::cv_status::no_timeout ))
211 status = cv_started_.wait_until(lock,wait_until);
212 return status;
213 }
214
215 void signal_clear()
216 {
217 signals_.clear();
218 }
219
220 void signal_add(int signal_number)
221 {
222 signals_.add(signal_number);
223 }
224
225 private:
226 uint16_t pick_io_context_idx()
227 {
228 uint16_t min_queue_idx = 0;
229
230 // TODO improve load balancing
231 // size_t is used here to avoid the security issue https://codeql.github.com/codeql-query-help/cpp/cpp-comparison-with-wider-type/
232 // even though the max value of this can be only uint16_t as concurrency is uint16_t.
233 for (size_t i = 1; i < task_queue_length_pool_.size() && task_queue_length_pool_[min_queue_idx] > 0; i++)
234 // No need to check other io_services if the current one has no tasks
235 {
236 if (task_queue_length_pool_[i] < task_queue_length_pool_[min_queue_idx])
237 min_queue_idx = i;
238 }
239 return min_queue_idx;
240 }
241
242 void do_accept()
243 {
244 if (!shutting_down_)
245 {
246 uint16_t context_idx = pick_io_context_idx();
247 asio::io_context& ic = *io_context_pool_[context_idx];
248 task_queue_length_pool_[context_idx]++;
249 CROW_LOG_DEBUG << &ic << " {" << context_idx << "} queue length: " << task_queue_length_pool_[context_idx];
250
251 auto p = std::make_shared<Connection<Adaptor, Handler, Middlewares...>>(
252 ic, handler_, server_name_, middlewares_,
253 get_cached_date_str_pool_[context_idx], *task_timer_pool_[context_idx], adaptor_ctx_, task_queue_length_pool_[context_idx]);
254
255 acceptor_.async_accept(
256 p->socket(),
257 [this, p, &ic, context_idx](error_code ec) {
258 if (!ec)
259 {
260 asio::post(ic,
261 [p] {
262 p->start();
263 });
264 }
265 else
266 {
267 task_queue_length_pool_[context_idx]--;
268 CROW_LOG_DEBUG << &ic << " {" << context_idx << "} queue length: " << task_queue_length_pool_[context_idx];
269 }
270 do_accept();
271 });
272 }
273 }
274
275 /// Notify anything using `wait_for_start()` to proceed
276 void notify_start()
277 {
278 std::unique_lock<std::mutex> lock(start_mutex_);
279 server_started_ = true;
280 cv_started_.notify_all();
281 }
282
283 private:
284 std::vector<std::unique_ptr<asio::io_context>> io_context_pool_;
285 asio::io_context io_context_;
286 std::vector<detail::task_timer*> task_timer_pool_;
287 std::vector<std::function<std::string()>> get_cached_date_str_pool_;
288 tcp::acceptor acceptor_;
289 bool shutting_down_ = false;
290 bool server_started_{false};
291 std::condition_variable cv_started_;
292 std::mutex start_mutex_;
293 asio::signal_set signals_;
294
295 asio::basic_waitable_timer<std::chrono::high_resolution_clock> tick_timer_;
296
297 Handler* handler_;
298 uint16_t concurrency_{2};
299 std::uint8_t timeout_;
300 std::string server_name_;
301 std::vector<std::atomic<unsigned int>> task_queue_length_pool_;
302
303 std::chrono::milliseconds tick_interval_;
304 std::function<void()> tick_function_;
305
306 std::tuple<Middlewares...>* middlewares_;
307
308 typename Adaptor::context* adaptor_ctx_;
309 };
310} // namespace crow
Definition http_server.h:43
std::cv_status wait_for_start(std::chrono::steady_clock::time_point wait_until)
Wait until the server has properly started or until timeout.
Definition http_server.h:205
Definition task_timer.h:36
void set_default_timeout(uint8_t timeout)
Definition task_timer.h:107
The main namespace of the library. In this namespace is defined the most important classes and functi...