49 typename Acceptor::endpoint endpoint,
50 std::string server_name = std::string(
"Crow/") + VERSION,
51 std::tuple<Middlewares...>* middlewares =
nullptr,
52 unsigned int concurrency = 1,
54 typename Adaptor::context* adaptor_ctx =
nullptr):
55 concurrency_(concurrency),
56 task_queue_length_pool_(concurrency_ - 1),
57 acceptor_(io_context_),
58 signals_(io_context_),
59 tick_timer_(io_context_),
62 server_name_(server_name),
63 middlewares_(middlewares),
64 adaptor_ctx_(adaptor_ctx)
66 if (startup_failed_) {
67 CROW_LOG_ERROR <<
"Startup failed; not running server.";
73 acceptor_.raw_acceptor().open(endpoint.protocol(), ec);
75 CROW_LOG_ERROR <<
"Failed to open acceptor: " << ec.message();
76 startup_failed_ =
true;
80 acceptor_.raw_acceptor().set_option(Acceptor::reuse_address_option(), ec);
82 CROW_LOG_ERROR <<
"Failed to set socket option: " << ec.message();
83 startup_failed_ =
true;
87 acceptor_.raw_acceptor().bind(endpoint, ec);
89 CROW_LOG_ERROR <<
"Failed to bind to " << acceptor_.address()
90 <<
":" << acceptor_.port() <<
" - " << ec.message();
91 startup_failed_ =
true;
95 acceptor_.raw_acceptor().listen(tcp::acceptor::max_listen_connections, ec);
97 CROW_LOG_ERROR <<
"Failed to listen on port: " << ec.message();
98 startup_failed_ =
true;
105 void set_tick_function(std::chrono::milliseconds d, std::function<
void()> f)
114 tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
115 tick_timer_.async_wait([
this](
const error_code& ec) {
125 if (startup_failed_) {
126 CROW_LOG_ERROR <<
"Server startup failed. Aborting run().";
130 uint16_t worker_thread_count = concurrency_ - 1;
131 for (
int i = 0; i < worker_thread_count; i++)
132 io_context_pool_.emplace_back(
new asio::io_context());
133 get_cached_date_str_pool_.resize(worker_thread_count);
134 task_timer_pool_.resize(worker_thread_count);
136 std::vector<std::future<void>> v;
137 std::atomic<int> init_count(0);
138 for (uint16_t i = 0; i < worker_thread_count; i++)
141 std::launch::async, [
this, i, &init_count] {
143 auto last = std::chrono::steady_clock::now();
145 std::string date_str;
146 auto update_date_str = [&] {
147 auto last_time_t = time(0);
150#if defined(_MSC_VER) || defined(__MINGW32__)
151 gmtime_s(&my_tm, &last_time_t);
153 gmtime_r(&last_time_t, &my_tm);
155 date_str.resize(100);
156 size_t date_str_sz = strftime(&date_str[0], 99,
"%a, %d %b %Y %H:%M:%S GMT", &my_tm);
157 date_str.resize(date_str_sz);
160 get_cached_date_str_pool_[i] = [&]() -> std::string {
161 if (std::chrono::steady_clock::now() - last >= std::chrono::seconds(1))
163 last = std::chrono::steady_clock::now();
172 task_timer_pool_[i] = &task_timer;
173 task_queue_length_pool_[i] = 0;
180 if (io_context_pool_[i]->run() == 0)
186 catch (std::exception& e)
188 CROW_LOG_ERROR <<
"Worker Crash: An uncaught exception occurred: " << e.what();
193 if (tick_function_ && tick_interval_.count() > 0)
195 tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
196 tick_timer_.async_wait(
197 [
this](
const error_code& ec) {
203 handler_->port(acceptor_.port());
204 handler_->address_is_bound();
205 CROW_LOG_INFO << server_name_
206 <<
" server is running at " << acceptor_.url_display(handler_->ssl_used())
207 <<
" using " << concurrency_ <<
" threads";
208 CROW_LOG_INFO <<
"Call `app.loglevel(crow::LogLevel::Warning)` to hide Info level logs.";
211 [&](
const error_code& ,
int ) {
215 while (worker_thread_count != init_count)
216 std::this_thread::yield();
224 CROW_LOG_INFO <<
"Exiting.";
231 shutting_down_ =
true;
236 if (acceptor_.raw_acceptor().is_open())
238 CROW_LOG_INFO <<
"Closing acceptor. " << &acceptor_;
240 acceptor_.raw_acceptor().close(ec);
243 CROW_LOG_WARNING <<
"Failed to close acceptor: " << ec.message();
247 for (
auto& io_context : io_context_pool_)
249 if (io_context !=
nullptr)
251 CROW_LOG_INFO <<
"Closing IO service " << &io_context;
256 CROW_LOG_INFO <<
"Closing main IO service (" << &io_context_ <<
')';
261 uint16_t port()
const {
262 return acceptor_.local_endpoint().port();
268 std::unique_lock<std::mutex> lock(start_mutex_);
270 std::cv_status status = std::cv_status::no_timeout;
271 while (!server_started_ && !startup_failed_ && status == std::cv_status::no_timeout)
272 status = cv_started_.wait_until(lock, wait_until);
282 void signal_add(
int signal_number)
284 signals_.add(signal_number);
288 size_t pick_io_context_idx()
290 size_t min_queue_idx = 0;
295 for (
size_t i = 1; i < task_queue_length_pool_.size() && task_queue_length_pool_[min_queue_idx] > 0; i++)
298 if (task_queue_length_pool_[i] < task_queue_length_pool_[min_queue_idx])
301 return min_queue_idx;
308 size_t context_idx = pick_io_context_idx();
309 asio::io_context& ic = *io_context_pool_[context_idx];
310 auto p = std::make_shared<Connection<Adaptor, Handler, Middlewares...>>(
311 ic, handler_, server_name_, middlewares_,
312 get_cached_date_str_pool_[context_idx], *task_timer_pool_[context_idx], adaptor_ctx_, task_queue_length_pool_[context_idx]);
314 CROW_LOG_DEBUG << &ic <<
" {" << context_idx <<
"} queue length: " << task_queue_length_pool_[context_idx];
316 acceptor_.raw_acceptor().async_accept(
318 [
this, p, &ic](error_code ec) {
334 std::unique_lock<std::mutex> lock(start_mutex_);
335 server_started_ =
true;
336 cv_started_.notify_all();
340 unsigned int concurrency_{2};
341 std::vector<std::atomic<unsigned int>> task_queue_length_pool_;
342 std::vector<std::unique_ptr<asio::io_context>> io_context_pool_;
343 asio::io_context io_context_;
344 std::vector<detail::task_timer*> task_timer_pool_;
345 std::vector<std::function<std::string()>> get_cached_date_str_pool_;
347 bool shutting_down_ =
false;
348 bool server_started_{
false};
349 bool startup_failed_ =
false;
350 std::condition_variable cv_started_;
351 std::mutex start_mutex_;
352 asio::signal_set signals_;
354 asio::basic_waitable_timer<std::chrono::high_resolution_clock> tick_timer_;
357 std::uint8_t timeout_;
358 std::string server_name_;
361 std::chrono::milliseconds tick_interval_;
362 std::function<void()> tick_function_;
364 std::tuple<Middlewares...>* middlewares_;
366 typename Adaptor::context* adaptor_ctx_;