46 const tcp::endpoint& endpoint,
47 std::string server_name = std::string(
"Crow/") + VERSION,
48 std::tuple<Middlewares...>* middlewares =
nullptr,
49 uint16_t concurrency = 1,
51 typename Adaptor::context* adaptor_ctx =
nullptr):
52 acceptor_(io_context_),
53 signals_(io_context_),
54 tick_timer_(io_context_),
56 concurrency_(concurrency),
58 server_name_(server_name),
59 task_queue_length_pool_(concurrency_ - 1),
60 middlewares_(middlewares),
61 adaptor_ctx_(adaptor_ctx)
63 if (startup_failed_) {
64 CROW_LOG_ERROR <<
"Startup failed; not running server.";
70 acceptor_.open(endpoint.protocol(), ec);
72 CROW_LOG_ERROR <<
"Failed to open acceptor: " << ec.message();
73 startup_failed_ =
true;
77 acceptor_.set_option(tcp::acceptor::reuse_address(
true), ec);
79 CROW_LOG_ERROR <<
"Failed to set socket option: " << ec.message();
80 startup_failed_ =
true;
84 acceptor_.bind(endpoint, ec);
86 CROW_LOG_ERROR <<
"Failed to bind to " << endpoint.address().to_string()
87 <<
":" << endpoint.port() <<
" - " << ec.message();
88 startup_failed_ =
true;
92 acceptor_.listen(tcp::acceptor::max_listen_connections, ec);
94 CROW_LOG_ERROR <<
"Failed to listen on port: " << ec.message();
95 startup_failed_ =
true;
102 void set_tick_function(std::chrono::milliseconds d, std::function<
void()> f)
111 tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
112 tick_timer_.async_wait([
this](
const error_code& ec) {
122 if (startup_failed_) {
123 CROW_LOG_ERROR <<
"Server startup failed. Aborting run().";
127 uint16_t worker_thread_count = concurrency_ - 1;
128 for (
int i = 0; i < worker_thread_count; i++)
129 io_context_pool_.emplace_back(
new asio::io_context());
130 get_cached_date_str_pool_.resize(worker_thread_count);
131 task_timer_pool_.resize(worker_thread_count);
133 std::vector<std::future<void>> v;
134 std::atomic<int> init_count(0);
135 for (uint16_t i = 0; i < worker_thread_count; i++)
138 std::launch::async, [
this, i, &init_count] {
140 auto last = std::chrono::steady_clock::now();
142 std::string date_str;
143 auto update_date_str = [&] {
144 auto last_time_t = time(0);
147#if defined(_MSC_VER) || defined(__MINGW32__)
148 gmtime_s(&my_tm, &last_time_t);
150 gmtime_r(&last_time_t, &my_tm);
152 date_str.resize(100);
153 size_t date_str_sz = strftime(&date_str[0], 99,
"%a, %d %b %Y %H:%M:%S GMT", &my_tm);
154 date_str.resize(date_str_sz);
157 get_cached_date_str_pool_[i] = [&]() -> std::string {
158 if (std::chrono::steady_clock::now() - last >= std::chrono::seconds(1))
160 last = std::chrono::steady_clock::now();
169 task_timer_pool_[i] = &task_timer;
170 task_queue_length_pool_[i] = 0;
177 if (io_context_pool_[i]->run() == 0)
183 catch (std::exception& e)
185 CROW_LOG_ERROR <<
"Worker Crash: An uncaught exception occurred: " << e.what();
190 if (tick_function_ && tick_interval_.count() > 0)
192 tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
193 tick_timer_.async_wait(
194 [
this](
const error_code& ec) {
201 handler_->port(acceptor_.local_endpoint().port());
204 CROW_LOG_INFO << server_name_
205 <<
" server is running at " << (handler_->ssl_used() ?
"https://" :
"http://")
206 << acceptor_.local_endpoint().address() <<
":" << acceptor_.local_endpoint().port() <<
" using " << concurrency_ <<
" threads";
207 CROW_LOG_INFO <<
"Call `app.loglevel(crow::LogLevel::Warning)` to hide Info level logs.";
210 [&](
const error_code& ,
int ) {
214 while (worker_thread_count != init_count)
215 std::this_thread::yield();
223 CROW_LOG_INFO <<
"Exiting.";
230 shutting_down_ =
true;
235 if (acceptor_.is_open())
237 CROW_LOG_INFO <<
"Closing acceptor. " << &acceptor_;
241 for (
auto& io_context : io_context_pool_)
243 if (io_context !=
nullptr)
245 CROW_LOG_INFO <<
"Closing IO service " << &io_context;
250 CROW_LOG_INFO <<
"Closing main IO service (" << &io_context_ <<
')';
254 uint16_t port()
const {
255 return acceptor_.local_endpoint().port();
261 std::unique_lock<std::mutex> lock(start_mutex_);
263 std::cv_status status = std::cv_status::no_timeout;
264 while (!server_started_ && !startup_failed_ && status == std::cv_status::no_timeout)
265 status = cv_started_.wait_until(lock, wait_until);
275 void signal_add(
int signal_number)
277 signals_.add(signal_number);
281 uint16_t pick_io_context_idx()
283 uint16_t min_queue_idx = 0;
288 for (
size_t i = 1; i < task_queue_length_pool_.size() && task_queue_length_pool_[min_queue_idx] > 0; i++)
291 if (task_queue_length_pool_[i] < task_queue_length_pool_[min_queue_idx])
294 return min_queue_idx;
301 uint16_t context_idx = pick_io_context_idx();
302 asio::io_context& ic = *io_context_pool_[context_idx];
303 task_queue_length_pool_[context_idx]++;
304 CROW_LOG_DEBUG << &ic <<
" {" << context_idx <<
"} queue length: " << task_queue_length_pool_[context_idx];
306 auto p = std::make_shared<Connection<Adaptor, Handler, Middlewares...>>(
307 ic, handler_, server_name_, middlewares_,
308 get_cached_date_str_pool_[context_idx], *task_timer_pool_[context_idx], adaptor_ctx_, task_queue_length_pool_[context_idx]);
310 acceptor_.async_accept(
312 [
this, p, &ic, context_idx](error_code ec) {
322 task_queue_length_pool_[context_idx]--;
323 CROW_LOG_DEBUG << &ic <<
" {" << context_idx <<
"} queue length: " << task_queue_length_pool_[context_idx];
333 std::unique_lock<std::mutex> lock(start_mutex_);
334 server_started_ =
true;
335 cv_started_.notify_all();
339 std::vector<std::unique_ptr<asio::io_context>> io_context_pool_;
340 asio::io_context io_context_;
341 std::vector<detail::task_timer*> task_timer_pool_;
342 std::vector<std::function<std::string()>> get_cached_date_str_pool_;
343 tcp::acceptor acceptor_;
344 bool shutting_down_ =
false;
345 bool server_started_{
false};
346 bool startup_failed_ =
false;
347 std::condition_variable cv_started_;
348 std::mutex start_mutex_;
349 asio::signal_set signals_;
351 asio::basic_waitable_timer<std::chrono::high_resolution_clock> tick_timer_;
354 uint16_t concurrency_{2};
355 std::uint8_t timeout_;
356 std::string server_name_;
357 std::vector<std::atomic<unsigned int>> task_queue_length_pool_;
359 std::chrono::milliseconds tick_interval_;
360 std::function<void()> tick_function_;
362 std::tuple<Middlewares...>* middlewares_;
364 typename Adaptor::context* adaptor_ctx_;