4 #include <boost/asio.hpp>
6 #include <boost/asio/ssl.hpp>
9 #ifndef ASIO_STANDALONE
10 #define ASIO_STANDALONE
13 #ifdef CROW_ENABLE_SSL
14 #include <asio/ssl.hpp>
25 #include "crow/version.h"
26 #include "crow/http_connection.h"
27 #include "crow/logging.h"
28 #include "crow/task_timer.h"
34 namespace asio = boost::asio;
35 using error_code = boost::system::error_code;
37 using error_code = asio::error_code;
39 using tcp = asio::ip::tcp;
41 template<
typename Handler,
typename Adaptor = SocketAdaptor,
typename... Middlewares>
45 Server(Handler* handler, std::string bindaddr, uint16_t port, std::string server_name = std::string(
"Crow/") + VERSION, std::tuple<Middlewares...>* middlewares =
nullptr, uint16_t concurrency = 1, uint8_t timeout = 5,
typename Adaptor::context* adaptor_ctx =
nullptr):
46 acceptor_(io_service_, tcp::endpoint(asio::ip::address::from_string(bindaddr), port)),
47 signals_(io_service_),
48 tick_timer_(io_service_),
50 concurrency_(concurrency),
52 server_name_(server_name),
55 task_queue_length_pool_(concurrency_ - 1),
56 middlewares_(middlewares),
57 adaptor_ctx_(adaptor_ctx)
60 void set_tick_function(std::chrono::milliseconds d, std::function<
void()> f)
69 tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
70 tick_timer_.async_wait([
this](
const error_code& ec) {
79 uint16_t worker_thread_count = concurrency_ - 1;
80 for (
int i = 0; i < worker_thread_count; i++)
81 io_service_pool_.emplace_back(
new asio::io_service());
82 get_cached_date_str_pool_.resize(worker_thread_count);
83 task_timer_pool_.resize(worker_thread_count);
85 std::vector<std::future<void>> v;
86 std::atomic<int> init_count(0);
87 for (uint16_t i = 0; i < worker_thread_count; i++)
90 std::launch::async, [
this, i, &init_count] {
92 auto last = std::chrono::steady_clock::now();
95 auto update_date_str = [&] {
96 auto last_time_t = time(0);
99 #if defined(_MSC_VER) || defined(__MINGW32__)
100 gmtime_s(&my_tm, &last_time_t);
102 gmtime_r(&last_time_t, &my_tm);
104 date_str.resize(100);
105 size_t date_str_sz = strftime(&date_str[0], 99,
"%a, %d %b %Y %H:%M:%S GMT", &my_tm);
106 date_str.resize(date_str_sz);
109 get_cached_date_str_pool_[i] = [&]() -> std::string {
110 if (std::chrono::steady_clock::now() - last >= std::chrono::seconds(1))
112 last = std::chrono::steady_clock::now();
121 task_timer_pool_[i] = &task_timer;
122 task_queue_length_pool_[i] = 0;
129 if (io_service_pool_[i]->run() == 0)
135 catch (std::exception& e)
137 CROW_LOG_ERROR <<
"Worker Crash: An uncaught exception occurred: " << e.what();
142 if (tick_function_ && tick_interval_.count() > 0)
144 tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
145 tick_timer_.async_wait(
146 [
this](
const error_code& ec) {
153 port_ = acceptor_.local_endpoint().port();
154 handler_->port(port_);
157 CROW_LOG_INFO << server_name_ <<
" server is running at " << (handler_->ssl_used() ?
"https://" :
"http://") << bindaddr_ <<
":" << acceptor_.local_endpoint().port() <<
" using " << concurrency_ <<
" threads";
158 CROW_LOG_INFO <<
"Call `app.loglevel(crow::LogLevel::Warning)` to hide Info level logs.";
161 [&](
const error_code& ,
int ) {
165 while (worker_thread_count != init_count)
166 std::this_thread::yield();
174 CROW_LOG_INFO <<
"Exiting.";
181 shutting_down_ =
true;
182 for (
auto& io_service : io_service_pool_)
184 if (io_service !=
nullptr)
186 CROW_LOG_INFO <<
"Closing IO service " << &io_service;
191 CROW_LOG_INFO <<
"Closing main IO service (" << &io_service_ <<
')';
196 return acceptor_.local_endpoint().port();
202 std::unique_lock<std::mutex> lock(start_mutex_);
203 while (!server_started_)
204 cv_started_.wait(lock);
212 void signal_add(
int signal_number)
214 signals_.add(signal_number);
218 uint16_t pick_io_service_idx()
220 uint16_t min_queue_idx = 0;
225 for (
size_t i = 1; i < task_queue_length_pool_.size() && task_queue_length_pool_[min_queue_idx] > 0; i++)
228 if (task_queue_length_pool_[i] < task_queue_length_pool_[min_queue_idx])
231 return min_queue_idx;
238 uint16_t service_idx = pick_io_service_idx();
239 asio::io_service& is = *io_service_pool_[service_idx];
240 task_queue_length_pool_[service_idx]++;
241 CROW_LOG_DEBUG << &is <<
" {" << service_idx <<
"} queue length: " << task_queue_length_pool_[service_idx];
243 auto p = std::make_shared<Connection<Adaptor, Handler, Middlewares...>>(
244 is, handler_, server_name_, middlewares_,
245 get_cached_date_str_pool_[service_idx], *task_timer_pool_[service_idx], adaptor_ctx_, task_queue_length_pool_[service_idx]);
247 acceptor_.async_accept(
249 [
this, p, &is, service_idx](error_code ec) {
259 task_queue_length_pool_[service_idx]--;
260 CROW_LOG_DEBUG << &is <<
" {" << service_idx <<
"} queue length: " << task_queue_length_pool_[service_idx];
270 std::unique_lock<std::mutex> lock(start_mutex_);
271 server_started_ =
true;
272 cv_started_.notify_all();
276 std::vector<std::unique_ptr<asio::io_service>> io_service_pool_;
277 asio::io_service io_service_;
278 std::vector<detail::task_timer*> task_timer_pool_;
279 std::vector<std::function<std::string()>> get_cached_date_str_pool_;
280 tcp::acceptor acceptor_;
281 bool shutting_down_ =
false;
282 bool server_started_{
false};
283 std::condition_variable cv_started_;
284 std::mutex start_mutex_;
285 asio::signal_set signals_;
287 asio::basic_waitable_timer<std::chrono::high_resolution_clock> tick_timer_;
290 uint16_t concurrency_{2};
291 std::uint8_t timeout_;
292 std::string server_name_;
294 std::string bindaddr_;
295 std::vector<std::atomic<unsigned int>> task_queue_length_pool_;
297 std::chrono::milliseconds tick_interval_;
298 std::function<void()> tick_function_;
300 std::tuple<Middlewares...>* middlewares_;
302 typename Adaptor::context* adaptor_ctx_;
Definition: http_server.h:43
void wait_for_start()
Wait until the server has properly started.
Definition: http_server.h:200
Definition: task_timer.h:36
void set_default_timeout(uint8_t timeout)
Definition: task_timer.h:107
The main namespace of the library. In this namespace is defined the most important classes and functi...