Crow  1.1
A C++ microframework for the web
 
Loading...
Searching...
No Matches
http_server.h
1#pragma once
2
3#ifdef CROW_USE_BOOST
4#include <boost/asio.hpp>
5#ifdef CROW_ENABLE_SSL
6#include <boost/asio/ssl.hpp>
7#endif
8#else
9#ifndef ASIO_STANDALONE
10#define ASIO_STANDALONE
11#endif
12#include <asio.hpp>
13#ifdef CROW_ENABLE_SSL
14#include <asio/ssl.hpp>
15#endif
16#endif
17
18#include <atomic>
19#include <chrono>
20#include <cstdint>
21#include <future>
22#include <memory>
23#include <vector>
24
25#include "crow/version.h"
26#include "crow/http_connection.h"
27#include "crow/logging.h"
28#include "crow/task_timer.h"
29
30
31namespace crow // NOTE: Already documented in "crow/app.h"
32{
33#ifdef CROW_USE_BOOST
34 namespace asio = boost::asio;
35 using error_code = boost::system::error_code;
36#else
37 using error_code = asio::error_code;
38#endif
39 using tcp = asio::ip::tcp;
40
41 template<typename Handler, typename Adaptor = SocketAdaptor, typename... Middlewares>
42 class Server
43 {
44 public:
45 Server(Handler* handler, std::string bindaddr, uint16_t port, std::string server_name = std::string("Crow/") + VERSION, std::tuple<Middlewares...>* middlewares = nullptr, uint16_t concurrency = 1, uint8_t timeout = 5, typename Adaptor::context* adaptor_ctx = nullptr):
46 acceptor_(io_service_, tcp::endpoint(asio::ip::address::from_string(bindaddr), port)),
47 signals_(io_service_),
48 tick_timer_(io_service_),
49 handler_(handler),
50 concurrency_(concurrency),
51 timeout_(timeout),
52 server_name_(server_name),
53 port_(port),
54 bindaddr_(bindaddr),
55 task_queue_length_pool_(concurrency_ - 1),
56 middlewares_(middlewares),
57 adaptor_ctx_(adaptor_ctx)
58 {}
59
60 void set_tick_function(std::chrono::milliseconds d, std::function<void()> f)
61 {
62 tick_interval_ = d;
63 tick_function_ = f;
64 }
65
66 void on_tick()
67 {
68 tick_function_();
69 tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
70 tick_timer_.async_wait([this](const error_code& ec) {
71 if (ec)
72 return;
73 on_tick();
74 });
75 }
76
77 void run()
78 {
79 uint16_t worker_thread_count = concurrency_ - 1;
80 for (int i = 0; i < worker_thread_count; i++)
81 io_service_pool_.emplace_back(new asio::io_service());
82 get_cached_date_str_pool_.resize(worker_thread_count);
83 task_timer_pool_.resize(worker_thread_count);
84
85 std::vector<std::future<void>> v;
86 std::atomic<int> init_count(0);
87 for (uint16_t i = 0; i < worker_thread_count; i++)
88 v.push_back(
89 std::async(
90 std::launch::async, [this, i, &init_count] {
91 // thread local date string get function
92 auto last = std::chrono::steady_clock::now();
93
94 std::string date_str;
95 auto update_date_str = [&] {
96 auto last_time_t = time(0);
97 tm my_tm;
98
99#if defined(_MSC_VER) || defined(__MINGW32__)
100 gmtime_s(&my_tm, &last_time_t);
101#else
102 gmtime_r(&last_time_t, &my_tm);
103#endif
104 date_str.resize(100);
105 size_t date_str_sz = strftime(&date_str[0], 99, "%a, %d %b %Y %H:%M:%S GMT", &my_tm);
106 date_str.resize(date_str_sz);
107 };
108 update_date_str();
109 get_cached_date_str_pool_[i] = [&]() -> std::string {
110 if (std::chrono::steady_clock::now() - last >= std::chrono::seconds(1))
111 {
112 last = std::chrono::steady_clock::now();
113 update_date_str();
114 }
115 return date_str;
116 };
117
118 // initializing task timers
119 detail::task_timer task_timer(*io_service_pool_[i]);
120 task_timer.set_default_timeout(timeout_);
121 task_timer_pool_[i] = &task_timer;
122 task_queue_length_pool_[i] = 0;
123
124 init_count++;
125 while (1)
126 {
127 try
128 {
129 if (io_service_pool_[i]->run() == 0)
130 {
131 // when io_service.run returns 0, there are no more works to do.
132 break;
133 }
134 }
135 catch (std::exception& e)
136 {
137 CROW_LOG_ERROR << "Worker Crash: An uncaught exception occurred: " << e.what();
138 }
139 }
140 }));
141
142 if (tick_function_ && tick_interval_.count() > 0)
143 {
144 tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
145 tick_timer_.async_wait(
146 [this](const error_code& ec) {
147 if (ec)
148 return;
149 on_tick();
150 });
151 }
152
153 port_ = acceptor_.local_endpoint().port();
154 handler_->port(port_);
155
156
157 CROW_LOG_INFO << server_name_ << " server is running at " << (handler_->ssl_used() ? "https://" : "http://") << bindaddr_ << ":" << acceptor_.local_endpoint().port() << " using " << concurrency_ << " threads";
158 CROW_LOG_INFO << "Call `app.loglevel(crow::LogLevel::Warning)` to hide Info level logs.";
159
160 signals_.async_wait(
161 [&](const error_code& /*error*/, int /*signal_number*/) {
162 stop();
163 });
164
165 while (worker_thread_count != init_count)
166 std::this_thread::yield();
167
168 do_accept();
169
170 std::thread(
171 [this] {
172 notify_start();
173 io_service_.run();
174 CROW_LOG_INFO << "Exiting.";
175 })
176 .join();
177 }
178
179 void stop()
180 {
181 shutting_down_ = true; // Prevent the acceptor from taking new connections
182 for (auto& io_service : io_service_pool_)
183 {
184 if (io_service != nullptr)
185 {
186 CROW_LOG_INFO << "Closing IO service " << &io_service;
187 io_service->stop(); // Close all io_services (and HTTP connections)
188 }
189 }
190
191 CROW_LOG_INFO << "Closing main IO service (" << &io_service_ << ')';
192 io_service_.stop(); // Close main io_service
193 }
194
195 /// Wait until the server has properly started
197 {
198 std::unique_lock<std::mutex> lock(start_mutex_);
199 if (!server_started_)
200 cv_started_.wait(lock);
201 }
202
203 void signal_clear()
204 {
205 signals_.clear();
206 }
207
208 void signal_add(int signal_number)
209 {
210 signals_.add(signal_number);
211 }
212
213 private:
214 uint16_t pick_io_service_idx()
215 {
216 uint16_t min_queue_idx = 0;
217
218 // TODO improve load balancing
219 // size_t is used here to avoid the security issue https://codeql.github.com/codeql-query-help/cpp/cpp-comparison-with-wider-type/
220 // even though the max value of this can be only uint16_t as concurrency is uint16_t.
221 for (size_t i = 1; i < task_queue_length_pool_.size() && task_queue_length_pool_[min_queue_idx] > 0; i++)
222 // No need to check other io_services if the current one has no tasks
223 {
224 if (task_queue_length_pool_[i] < task_queue_length_pool_[min_queue_idx])
225 min_queue_idx = i;
226 }
227 return min_queue_idx;
228 }
229
230 void do_accept()
231 {
232 if (!shutting_down_)
233 {
234 uint16_t service_idx = pick_io_service_idx();
235 asio::io_service& is = *io_service_pool_[service_idx];
236 task_queue_length_pool_[service_idx]++;
237 CROW_LOG_DEBUG << &is << " {" << service_idx << "} queue length: " << task_queue_length_pool_[service_idx];
238
239 auto p = std::make_shared<Connection<Adaptor, Handler, Middlewares...>>(
240 is, handler_, server_name_, middlewares_,
241 get_cached_date_str_pool_[service_idx], *task_timer_pool_[service_idx], adaptor_ctx_, task_queue_length_pool_[service_idx]);
242
243 acceptor_.async_accept(
244 p->socket(),
245 [this, p, &is, service_idx](error_code ec) {
246 if (!ec)
247 {
248 is.post(
249 [p] {
250 p->start();
251 });
252 }
253 else
254 {
255 task_queue_length_pool_[service_idx]--;
256 CROW_LOG_DEBUG << &is << " {" << service_idx << "} queue length: " << task_queue_length_pool_[service_idx];
257 }
258 do_accept();
259 });
260 }
261 }
262
263 /// Notify anything using `wait_for_start()` to proceed
264 void notify_start()
265 {
266 std::unique_lock<std::mutex> lock(start_mutex_);
267 server_started_ = true;
268 cv_started_.notify_all();
269 }
270
271 private:
272 std::vector<std::unique_ptr<asio::io_service>> io_service_pool_;
273 asio::io_service io_service_;
274 std::vector<detail::task_timer*> task_timer_pool_;
275 std::vector<std::function<std::string()>> get_cached_date_str_pool_;
276 tcp::acceptor acceptor_;
277 bool shutting_down_ = false;
278 bool server_started_{false};
279 std::condition_variable cv_started_;
280 std::mutex start_mutex_;
281 asio::signal_set signals_;
282
283 asio::basic_waitable_timer<std::chrono::high_resolution_clock> tick_timer_;
284
285 Handler* handler_;
286 uint16_t concurrency_{2};
287 std::uint8_t timeout_;
288 std::string server_name_;
289 uint16_t port_;
290 std::string bindaddr_;
291 std::vector<std::atomic<unsigned int>> task_queue_length_pool_;
292
293 std::chrono::milliseconds tick_interval_;
294 std::function<void()> tick_function_;
295
296 std::tuple<Middlewares...>* middlewares_;
297
298 typename Adaptor::context* adaptor_ctx_;
299 };
300} // namespace crow
Definition http_server.h:43
void wait_for_start()
Wait until the server has properly started.
Definition http_server.h:196
A class for scheduling functions to be called after a specific amount of ticks. A tick is equal to 1 ...
Definition task_timer.h:34
void set_default_timeout(std::uint8_t timeout)
Set the default timeout for this task_timer instance. (Default: 5)
Definition task_timer.h:98
The main namespace of the library. In this namespace is defined the most important classes and functi...