4 #ifndef ASIO_STANDALONE
5 #define ASIO_STANDALONE
9 #include <asio/ssl.hpp>
17 #include "crow/version.h"
18 #include "crow/http_connection.h"
19 #include "crow/logging.h"
20 #include "crow/task_timer.h"
24 using tcp = asio::ip::tcp;
26 template<
typename Handler,
typename Adaptor = SocketAdaptor,
typename... Middlewares>
30 Server(Handler* handler, std::string bindaddr, uint16_t port, std::string server_name = std::string(
"Crow/") + VERSION, std::tuple<Middlewares...>* middlewares =
nullptr, uint16_t concurrency = 1, uint8_t timeout = 5,
typename Adaptor::context* adaptor_ctx =
nullptr):
31 acceptor_(io_service_, tcp::endpoint(asio::ip::address::from_string(bindaddr), port)),
32 signals_(io_service_),
33 tick_timer_(io_service_),
35 concurrency_(concurrency),
37 server_name_(server_name),
40 task_queue_length_pool_(concurrency_ - 1),
41 middlewares_(middlewares),
42 adaptor_ctx_(adaptor_ctx)
45 void set_tick_function(std::chrono::milliseconds d, std::function<
void()> f)
54 tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
55 tick_timer_.async_wait([
this](
const asio::error_code& ec) {
64 uint16_t worker_thread_count = concurrency_ - 1;
65 for (
int i = 0; i < worker_thread_count; i++)
66 io_service_pool_.emplace_back(
new asio::io_service());
67 get_cached_date_str_pool_.resize(worker_thread_count);
68 task_timer_pool_.resize(worker_thread_count);
70 std::vector<std::future<void>> v;
71 std::atomic<int> init_count(0);
72 for (uint16_t i = 0; i < worker_thread_count; i++)
75 std::launch::async, [
this, i, &init_count] {
77 auto last = std::chrono::steady_clock::now();
80 auto update_date_str = [&] {
81 auto last_time_t = time(0);
84 #if defined(_MSC_VER) || defined(__MINGW32__)
85 gmtime_s(&my_tm, &last_time_t);
87 gmtime_r(&last_time_t, &my_tm);
90 size_t date_str_sz = strftime(&date_str[0], 99,
"%a, %d %b %Y %H:%M:%S GMT", &my_tm);
91 date_str.resize(date_str_sz);
94 get_cached_date_str_pool_[i] = [&]() -> std::string {
95 if (std::chrono::steady_clock::now() - last >= std::chrono::seconds(1))
97 last = std::chrono::steady_clock::now();
106 task_timer_pool_[i] = &task_timer;
107 task_queue_length_pool_[i] = 0;
114 if (io_service_pool_[i]->run() == 0)
120 catch (std::exception& e)
122 CROW_LOG_ERROR <<
"Worker Crash: An uncaught exception occurred: " << e.what();
127 if (tick_function_ && tick_interval_.count() > 0)
129 tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
130 tick_timer_.async_wait(
131 [
this](
const asio::error_code& ec) {
138 port_ = acceptor_.local_endpoint().port();
139 handler_->port(port_);
142 CROW_LOG_INFO << server_name_ <<
" server is running at " << (handler_->ssl_used() ?
"https://" :
"http://") << bindaddr_ <<
":" << acceptor_.local_endpoint().port() <<
" using " << concurrency_ <<
" threads";
143 CROW_LOG_INFO <<
"Call `app.loglevel(crow::LogLevel::Warning)` to hide Info level logs.";
146 [&](
const asio::error_code& ,
int ) {
150 while (worker_thread_count != init_count)
151 std::this_thread::yield();
159 CROW_LOG_INFO <<
"Exiting.";
166 shutting_down_ =
true;
167 for (
auto& io_service : io_service_pool_)
169 if (io_service !=
nullptr)
171 CROW_LOG_INFO <<
"Closing IO service " << &io_service;
176 CROW_LOG_INFO <<
"Closing main IO service (" << &io_service_ <<
')';
183 std::unique_lock<std::mutex> lock(start_mutex_);
184 if (!server_started_)
185 cv_started_.wait(lock);
193 void signal_add(
int signal_number)
195 signals_.add(signal_number);
199 uint16_t pick_io_service_idx()
201 uint16_t min_queue_idx = 0;
206 for (
size_t i = 1; i < task_queue_length_pool_.size() && task_queue_length_pool_[min_queue_idx] > 0; i++)
209 if (task_queue_length_pool_[i] < task_queue_length_pool_[min_queue_idx])
212 return min_queue_idx;
219 uint16_t service_idx = pick_io_service_idx();
220 asio::io_service& is = *io_service_pool_[service_idx];
221 task_queue_length_pool_[service_idx]++;
222 CROW_LOG_DEBUG << &is <<
" {" << service_idx <<
"} queue length: " << task_queue_length_pool_[service_idx];
224 auto p = std::make_shared<Connection<Adaptor, Handler, Middlewares...>>(
225 is, handler_, server_name_, middlewares_,
226 get_cached_date_str_pool_[service_idx], *task_timer_pool_[service_idx], adaptor_ctx_, task_queue_length_pool_[service_idx]);
228 acceptor_.async_accept(
230 [
this, p, &is, service_idx](asio::error_code ec) {
240 task_queue_length_pool_[service_idx]--;
241 CROW_LOG_DEBUG << &is <<
" {" << service_idx <<
"} queue length: " << task_queue_length_pool_[service_idx];
251 std::unique_lock<std::mutex> lock(start_mutex_);
252 server_started_ =
true;
253 cv_started_.notify_all();
257 std::vector<std::unique_ptr<asio::io_service>> io_service_pool_;
258 asio::io_service io_service_;
259 std::vector<detail::task_timer*> task_timer_pool_;
260 std::vector<std::function<std::string()>> get_cached_date_str_pool_;
261 tcp::acceptor acceptor_;
262 bool shutting_down_ =
false;
263 bool server_started_{
false};
264 std::condition_variable cv_started_;
265 std::mutex start_mutex_;
266 asio::signal_set signals_;
268 asio::basic_waitable_timer<std::chrono::high_resolution_clock> tick_timer_;
271 uint16_t concurrency_{2};
272 std::uint8_t timeout_;
273 std::string server_name_;
275 std::string bindaddr_;
276 std::vector<std::atomic<unsigned int>> task_queue_length_pool_;
278 std::chrono::milliseconds tick_interval_;
279 std::function<void()> tick_function_;
281 std::tuple<Middlewares...>* middlewares_;
283 typename Adaptor::context* adaptor_ctx_;
Definition: http_server.h:28
void wait_for_start()
Wait until the server has properly started.
Definition: http_server.h:181
A class for scheduling functions to be called after a specific amount of ticks. A tick is equal to 1 ...
Definition: task_timer.h:23
void set_default_timeout(std::uint8_t timeout)
Set the default timeout for this task_timer instance. (Default: 5)
Definition: task_timer.h:87