Crow  0.3
A C++ microframework for the web
http_server.h
1 #pragma once
2 
3 #include <chrono>
4 #include <boost/date_time/posix_time/posix_time.hpp>
5 #include <boost/asio.hpp>
6 #ifdef CROW_ENABLE_SSL
7 #include <boost/asio/ssl.hpp>
8 #endif
9 #include <cstdint>
10 #include <atomic>
11 #include <future>
12 #include <vector>
13 
14 #include <memory>
15 
16 #include "crow/version.h"
17 #include "crow/http_connection.h"
18 #include "crow/logging.h"
19 #include "crow/dumb_timer_queue.h"
20 
21 namespace crow
22 {
23  using namespace boost;
24  using tcp = asio::ip::tcp;
25 
26  template <typename Handler, typename Adaptor = SocketAdaptor, typename ... Middlewares>
27  class Server
28  {
29  public:
30  Server(Handler* handler, std::string bindaddr, uint16_t port, std::string server_name = std::string("Crow/") + VERSION, std::tuple<Middlewares...>* middlewares = nullptr, uint16_t concurrency = 1, typename Adaptor::context* adaptor_ctx = nullptr)
31  : acceptor_(io_service_, tcp::endpoint(boost::asio::ip::address::from_string(bindaddr), port)),
32  signals_(io_service_, SIGINT, SIGTERM),
33  tick_timer_(io_service_),
34  handler_(handler),
35  concurrency_(concurrency == 0 ? 1 : concurrency),
36  server_name_(server_name),
37  port_(port),
38  bindaddr_(bindaddr),
39  middlewares_(middlewares),
40  adaptor_ctx_(adaptor_ctx)
41  {
42  }
43 
44  void set_tick_function(std::chrono::milliseconds d, std::function<void()> f)
45  {
46  tick_interval_ = d;
47  tick_function_ = f;
48  }
49 
50  void on_tick()
51  {
52  tick_function_();
53  tick_timer_.expires_from_now(boost::posix_time::milliseconds(tick_interval_.count()));
54  tick_timer_.async_wait([this](const boost::system::error_code& ec)
55  {
56  if (ec)
57  return;
58  on_tick();
59  });
60  }
61 
62  void run()
63  {
64  for(int i = 0; i < concurrency_; i++)
65  io_service_pool_.emplace_back(new boost::asio::io_service());
66  get_cached_date_str_pool_.resize(concurrency_);
67  timer_queue_pool_.resize(concurrency_);
68 
69  std::vector<std::future<void>> v;
70  std::atomic<int> init_count(0);
71  for(uint16_t i = 0; i < concurrency_; i ++)
72  v.push_back(
73  std::async(std::launch::async, [this, i, &init_count]{
74 
75  // thread local date string get function
76  auto last = std::chrono::steady_clock::now();
77 
78  std::string date_str;
79  auto update_date_str = [&]
80  {
81  auto last_time_t = time(0);
82  tm my_tm;
83 
84 #if defined(_MSC_VER) || defined(__MINGW32__)
85  gmtime_s(&my_tm, &last_time_t);
86 #else
87  gmtime_r(&last_time_t, &my_tm);
88 #endif
89  date_str.resize(100);
90  size_t date_str_sz = strftime(&date_str[0], 99, "%a, %d %b %Y %H:%M:%S GMT", &my_tm);
91  date_str.resize(date_str_sz);
92  };
93  update_date_str();
94  get_cached_date_str_pool_[i] = [&]()->std::string
95  {
96  if (std::chrono::steady_clock::now() - last >= std::chrono::seconds(1))
97  {
98  last = std::chrono::steady_clock::now();
99  update_date_str();
100  }
101  return date_str;
102  };
103 
104  // initializing timer queue
105  detail::dumb_timer_queue timer_queue;
106  timer_queue_pool_[i] = &timer_queue;
107 
108  timer_queue.set_io_service(*io_service_pool_[i]);
109  boost::asio::deadline_timer timer(*io_service_pool_[i]);
110  timer.expires_from_now(boost::posix_time::seconds(1));
111 
112  std::function<void(const boost::system::error_code& ec)> handler;
113  handler = [&](const boost::system::error_code& ec){
114  if (ec)
115  return;
116  timer_queue.process();
117  timer.expires_from_now(boost::posix_time::seconds(1));
118  timer.async_wait(handler);
119  };
120  timer.async_wait(handler);
121 
122  init_count ++;
123  while(1)
124  {
125  try
126  {
127  if (io_service_pool_[i]->run() == 0)
128  {
129  // when io_service.run returns 0, there are no more works to do.
130  break;
131  }
132  } catch(std::exception& e)
133  {
134  CROW_LOG_ERROR << "Worker Crash: An uncaught exception occurred: " << e.what();
135  }
136  }
137  }));
138 
139  if (tick_function_ && tick_interval_.count() > 0)
140  {
141  tick_timer_.expires_from_now(boost::posix_time::milliseconds(tick_interval_.count()));
142  tick_timer_.async_wait([this](const boost::system::error_code& ec)
143  {
144  if (ec)
145  return;
146  on_tick();
147  });
148  }
149 
150  CROW_LOG_INFO << server_name_ << " server is running at " << bindaddr_ <<":" << acceptor_.local_endpoint().port()
151  << " using " << concurrency_ << " threads";
152  CROW_LOG_INFO << "Call `app.loglevel(crow::LogLevel::Warning)` to hide Info level logs.";
153 
154  signals_.async_wait(
155  [&](const boost::system::error_code& /*error*/, int /*signal_number*/){
156  stop();
157  });
158 
159  while(concurrency_ != init_count)
160  std::this_thread::yield();
161 
162  do_accept();
163 
164  std::thread([this]{
165  io_service_.run();
166  CROW_LOG_INFO << "Exiting.";
167  }).join();
168  }
169 
170  void stop()
171  {
172  io_service_.stop();
173  for(auto& io_service:io_service_pool_)
174  io_service->stop();
175  }
176 
177  void signal_clear()
178  {
179  signals_.clear();
180  }
181 
182  void signal_add(int signal_number)
183  {
184  signals_.add(signal_number);
185  }
186 
187  private:
188  asio::io_service& pick_io_service()
189  {
190  // TODO load balancing
191  roundrobin_index_++;
192  if (roundrobin_index_ >= io_service_pool_.size())
193  roundrobin_index_ = 0;
194  return *io_service_pool_[roundrobin_index_];
195  }
196 
197  void do_accept()
198  {
199  asio::io_service& is = pick_io_service();
200  auto p = new Connection<Adaptor, Handler, Middlewares...>(
201  is, handler_, server_name_, middlewares_,
202  get_cached_date_str_pool_[roundrobin_index_], *timer_queue_pool_[roundrobin_index_],
203  adaptor_ctx_);
204  acceptor_.async_accept(p->socket(),
205  [this, p, &is](boost::system::error_code ec)
206  {
207  if (!ec)
208  {
209  is.post([p]
210  {
211  p->start();
212  });
213  }
214  else
215  {
216  delete p;
217  }
218  do_accept();
219  });
220  }
221 
222  private:
223  asio::io_service io_service_;
224  std::vector<std::unique_ptr<asio::io_service>> io_service_pool_;
225  std::vector<detail::dumb_timer_queue*> timer_queue_pool_;
226  std::vector<std::function<std::string()>> get_cached_date_str_pool_;
227  tcp::acceptor acceptor_;
228  boost::asio::signal_set signals_;
229  boost::asio::deadline_timer tick_timer_;
230 
231  Handler* handler_;
232  uint16_t concurrency_{1};
233  std::string server_name_;
234  uint16_t port_;
235  std::string bindaddr_;
236  unsigned int roundrobin_index_{};
237 
238  std::chrono::milliseconds tick_interval_;
239  std::function<void()> tick_function_;
240 
241  std::tuple<Middlewares...>* middlewares_;
242 
243 #ifdef CROW_ENABLE_SSL
244  bool use_ssl_{false};
245  boost::asio::ssl::context ssl_context_{boost::asio::ssl::context::sslv23};
246 #endif
247  typename Adaptor::context* adaptor_ctx_;
248  };
249 }
crow::detail::dumb_timer_queue
Fast timer queue for fixed tick value.
Definition: dumb_timer_queue.h:17
crow::detail::dumb_timer_queue::process
void process()
Process the queue: take functions out in time intervals and execute them.
Definition: dumb_timer_queue.h:46
crow::Server
Definition: http_server.h:27