46 const tcp::endpoint& endpoint,
47 std::string server_name = std::string(
"Crow/") + VERSION,
48 std::tuple<Middlewares...>* middlewares =
nullptr,
49 uint16_t concurrency = 1,
51 typename Adaptor::context* adaptor_ctx =
nullptr):
52 acceptor_(io_context_,endpoint),
53 signals_(io_context_),
54 tick_timer_(io_context_),
56 concurrency_(concurrency),
58 server_name_(server_name),
59 task_queue_length_pool_(concurrency_ - 1),
60 middlewares_(middlewares),
61 adaptor_ctx_(adaptor_ctx)
64 void set_tick_function(std::chrono::milliseconds d, std::function<
void()> f)
73 tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
74 tick_timer_.async_wait([
this](
const error_code& ec) {
83 uint16_t worker_thread_count = concurrency_ - 1;
84 for (
int i = 0; i < worker_thread_count; i++)
85 io_context_pool_.emplace_back(
new asio::io_context());
86 get_cached_date_str_pool_.resize(worker_thread_count);
87 task_timer_pool_.resize(worker_thread_count);
89 std::vector<std::future<void>> v;
90 std::atomic<int> init_count(0);
91 for (uint16_t i = 0; i < worker_thread_count; i++)
94 std::launch::async, [
this, i, &init_count] {
96 auto last = std::chrono::steady_clock::now();
99 auto update_date_str = [&] {
100 auto last_time_t = time(0);
103#if defined(_MSC_VER) || defined(__MINGW32__)
104 gmtime_s(&my_tm, &last_time_t);
106 gmtime_r(&last_time_t, &my_tm);
108 date_str.resize(100);
109 size_t date_str_sz = strftime(&date_str[0], 99,
"%a, %d %b %Y %H:%M:%S GMT", &my_tm);
110 date_str.resize(date_str_sz);
113 get_cached_date_str_pool_[i] = [&]() -> std::string {
114 if (std::chrono::steady_clock::now() - last >= std::chrono::seconds(1))
116 last = std::chrono::steady_clock::now();
125 task_timer_pool_[i] = &task_timer;
126 task_queue_length_pool_[i] = 0;
133 if (io_context_pool_[i]->run() == 0)
139 catch (std::exception& e)
141 CROW_LOG_ERROR <<
"Worker Crash: An uncaught exception occurred: " << e.what();
146 if (tick_function_ && tick_interval_.count() > 0)
148 tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
149 tick_timer_.async_wait(
150 [
this](
const error_code& ec) {
157 handler_->port(acceptor_.local_endpoint().port());
160 CROW_LOG_INFO << server_name_
161 <<
" server is running at " << (handler_->ssl_used() ?
"https://" :
"http://")
162 << acceptor_.local_endpoint().address() <<
":" << acceptor_.local_endpoint().port() <<
" using " << concurrency_ <<
" threads";
163 CROW_LOG_INFO <<
"Call `app.loglevel(crow::LogLevel::Warning)` to hide Info level logs.";
166 [&](
const error_code& ,
int ) {
170 while (worker_thread_count != init_count)
171 std::this_thread::yield();
179 CROW_LOG_INFO <<
"Exiting.";
186 shutting_down_ =
true;
187 for (
auto& io_context : io_context_pool_)
189 if (io_context !=
nullptr)
191 CROW_LOG_INFO <<
"Closing IO service " << &io_context;
196 CROW_LOG_INFO <<
"Closing main IO service (" << &io_context_ <<
')';
200 uint16_t port()
const {
201 return acceptor_.local_endpoint().port();
207 std::unique_lock<std::mutex> lock(start_mutex_);
209 std::cv_status status = std::cv_status::no_timeout;
210 while (!server_started_ && ( status==std::cv_status::no_timeout ))
211 status = cv_started_.wait_until(lock,wait_until);
220 void signal_add(
int signal_number)
222 signals_.add(signal_number);
226 uint16_t pick_io_context_idx()
228 uint16_t min_queue_idx = 0;
233 for (
size_t i = 1; i < task_queue_length_pool_.size() && task_queue_length_pool_[min_queue_idx] > 0; i++)
236 if (task_queue_length_pool_[i] < task_queue_length_pool_[min_queue_idx])
239 return min_queue_idx;
246 uint16_t context_idx = pick_io_context_idx();
247 asio::io_context& ic = *io_context_pool_[context_idx];
248 task_queue_length_pool_[context_idx]++;
249 CROW_LOG_DEBUG << &ic <<
" {" << context_idx <<
"} queue length: " << task_queue_length_pool_[context_idx];
251 auto p = std::make_shared<Connection<Adaptor, Handler, Middlewares...>>(
252 ic, handler_, server_name_, middlewares_,
253 get_cached_date_str_pool_[context_idx], *task_timer_pool_[context_idx], adaptor_ctx_, task_queue_length_pool_[context_idx]);
255 acceptor_.async_accept(
257 [
this, p, &ic, context_idx](error_code ec) {
267 task_queue_length_pool_[context_idx]--;
268 CROW_LOG_DEBUG << &ic <<
" {" << context_idx <<
"} queue length: " << task_queue_length_pool_[context_idx];
278 std::unique_lock<std::mutex> lock(start_mutex_);
279 server_started_ =
true;
280 cv_started_.notify_all();
284 std::vector<std::unique_ptr<asio::io_context>> io_context_pool_;
285 asio::io_context io_context_;
286 std::vector<detail::task_timer*> task_timer_pool_;
287 std::vector<std::function<std::string()>> get_cached_date_str_pool_;
288 tcp::acceptor acceptor_;
289 bool shutting_down_ =
false;
290 bool server_started_{
false};
291 std::condition_variable cv_started_;
292 std::mutex start_mutex_;
293 asio::signal_set signals_;
295 asio::basic_waitable_timer<std::chrono::high_resolution_clock> tick_timer_;
298 uint16_t concurrency_{2};
299 std::uint8_t timeout_;
300 std::string server_name_;
301 std::vector<std::atomic<unsigned int>> task_queue_length_pool_;
303 std::chrono::milliseconds tick_interval_;
304 std::function<void()> tick_function_;
306 std::tuple<Middlewares...>* middlewares_;
308 typename Adaptor::context* adaptor_ctx_;