48 typename Acceptor::endpoint endpoint,
49 std::string server_name = std::string(
"Crow/") + VERSION,
50 std::tuple<Middlewares...>* middlewares =
nullptr,
51 unsigned int concurrency = 1,
53 typename Adaptor::context* adaptor_ctx =
nullptr):
54 concurrency_(concurrency),
55 task_queue_length_pool_(concurrency_ - 1),
56 acceptor_(io_context_),
57 signals_(io_context_),
58 tick_timer_(io_context_),
61 server_name_(server_name),
62 middlewares_(middlewares),
63 adaptor_ctx_(adaptor_ctx)
65 if (startup_failed_) {
66 CROW_LOG_ERROR <<
"Startup failed; not running server.";
72 acceptor_.raw_acceptor().open(endpoint.protocol(), ec);
74 CROW_LOG_ERROR <<
"Failed to open acceptor: " << ec.message();
75 startup_failed_ =
true;
79 acceptor_.raw_acceptor().set_option(Acceptor::reuse_address_option(), ec);
81 CROW_LOG_ERROR <<
"Failed to set socket option: " << ec.message();
82 startup_failed_ =
true;
86 acceptor_.raw_acceptor().bind(endpoint, ec);
88 CROW_LOG_ERROR <<
"Failed to bind to " << acceptor_.address()
89 <<
":" << acceptor_.port() <<
" - " << ec.message();
90 startup_failed_ =
true;
94 acceptor_.raw_acceptor().listen(tcp::acceptor::max_listen_connections, ec);
96 CROW_LOG_ERROR <<
"Failed to listen on port: " << ec.message();
97 startup_failed_ =
true;
104 void set_tick_function(std::chrono::milliseconds d, std::function<
void()> f)
113 tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
114 tick_timer_.async_wait([
this](
const error_code& ec) {
124 if (startup_failed_) {
125 CROW_LOG_ERROR <<
"Server startup failed. Aborting run().";
129 uint16_t worker_thread_count = concurrency_ - 1;
130 for (
int i = 0; i < worker_thread_count; i++)
131 io_context_pool_.emplace_back(
new asio::io_context());
132 get_cached_date_str_pool_.resize(worker_thread_count);
133 task_timer_pool_.resize(worker_thread_count);
135 std::vector<std::future<void>> v;
136 std::atomic<int> init_count(0);
137 for (uint16_t i = 0; i < worker_thread_count; i++)
140 std::launch::async, [
this, i, &init_count] {
142 auto last = std::chrono::steady_clock::now();
144 std::string date_str;
145 auto update_date_str = [&] {
146 auto last_time_t = time(0);
149#if defined(_MSC_VER) || defined(__MINGW32__)
150 gmtime_s(&my_tm, &last_time_t);
152 gmtime_r(&last_time_t, &my_tm);
154 date_str.resize(100);
155 size_t date_str_sz = strftime(&date_str[0], 99,
"%a, %d %b %Y %H:%M:%S GMT", &my_tm);
156 date_str.resize(date_str_sz);
159 get_cached_date_str_pool_[i] = [&]() -> std::string {
160 if (std::chrono::steady_clock::now() - last >= std::chrono::seconds(1))
162 last = std::chrono::steady_clock::now();
171 task_timer_pool_[i] = &task_timer;
172 task_queue_length_pool_[i] = 0;
179 if (io_context_pool_[i]->run() == 0)
185 catch (std::exception& e)
187 CROW_LOG_ERROR <<
"Worker Crash: An uncaught exception occurred: " << e.what();
192 if (tick_function_ && tick_interval_.count() > 0)
194 tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
195 tick_timer_.async_wait(
196 [
this](
const error_code& ec) {
202 handler_->port(acceptor_.port());
203 CROW_LOG_INFO << server_name_
204 <<
" server is running at " << acceptor_.url_display(handler_->ssl_used())
205 <<
" using " << concurrency_ <<
" threads";
206 CROW_LOG_INFO <<
"Call `app.loglevel(crow::LogLevel::Warning)` to hide Info level logs.";
209 [&](
const error_code& ,
int ) {
213 while (worker_thread_count != init_count)
214 std::this_thread::yield();
222 CROW_LOG_INFO <<
"Exiting.";
229 shutting_down_ =
true;
234 if (acceptor_.raw_acceptor().is_open())
236 CROW_LOG_INFO <<
"Closing acceptor. " << &acceptor_;
238 acceptor_.raw_acceptor().close(ec);
241 CROW_LOG_WARNING <<
"Failed to close acceptor: " << ec.message();
245 for (
auto& io_context : io_context_pool_)
247 if (io_context !=
nullptr)
249 CROW_LOG_INFO <<
"Closing IO service " << &io_context;
254 CROW_LOG_INFO <<
"Closing main IO service (" << &io_context_ <<
')';
259 uint16_t port()
const {
260 return acceptor_.local_endpoint().port();
266 std::unique_lock<std::mutex> lock(start_mutex_);
268 std::cv_status status = std::cv_status::no_timeout;
269 while (!server_started_ && !startup_failed_ && status == std::cv_status::no_timeout)
270 status = cv_started_.wait_until(lock, wait_until);
280 void signal_add(
int signal_number)
282 signals_.add(signal_number);
286 size_t pick_io_context_idx()
288 size_t min_queue_idx = 0;
293 for (
size_t i = 1; i < task_queue_length_pool_.size() && task_queue_length_pool_[min_queue_idx] > 0; i++)
296 if (task_queue_length_pool_[i] < task_queue_length_pool_[min_queue_idx])
299 return min_queue_idx;
306 size_t context_idx = pick_io_context_idx();
307 asio::io_context& ic = *io_context_pool_[context_idx];
308 auto p = std::make_shared<Connection<Adaptor, Handler, Middlewares...>>(
309 ic, handler_, server_name_, middlewares_,
310 get_cached_date_str_pool_[context_idx], *task_timer_pool_[context_idx], adaptor_ctx_, task_queue_length_pool_[context_idx]);
312 CROW_LOG_DEBUG << &ic <<
" {" << context_idx <<
"} queue length: " << task_queue_length_pool_[context_idx];
314 acceptor_.raw_acceptor().async_accept(
316 [
this, p, &ic, context_idx](error_code ec) {
332 std::unique_lock<std::mutex> lock(start_mutex_);
333 server_started_ =
true;
334 cv_started_.notify_all();
338 unsigned int concurrency_{2};
339 std::vector<std::atomic<unsigned int>> task_queue_length_pool_;
340 std::vector<std::unique_ptr<asio::io_context>> io_context_pool_;
341 asio::io_context io_context_;
342 std::vector<detail::task_timer*> task_timer_pool_;
343 std::vector<std::function<std::string()>> get_cached_date_str_pool_;
345 bool shutting_down_ =
false;
346 bool server_started_{
false};
347 bool startup_failed_ =
false;
348 std::condition_variable cv_started_;
349 std::mutex start_mutex_;
350 asio::signal_set signals_;
352 asio::basic_waitable_timer<std::chrono::high_resolution_clock> tick_timer_;
355 std::uint8_t timeout_;
356 std::string server_name_;
359 std::chrono::milliseconds tick_interval_;
360 std::function<void()> tick_function_;
362 std::tuple<Middlewares...>* middlewares_;
364 typename Adaptor::context* adaptor_ctx_;