Crow  0.3
A C++ microframework for the web
http_server.h
1#pragma once
2
3#include <chrono>
4#include <boost/date_time/posix_time/posix_time.hpp>
5#include <boost/asio.hpp>
6#ifdef CROW_ENABLE_SSL
7#include <boost/asio/ssl.hpp>
8#endif
9#include <cstdint>
10#include <atomic>
11#include <future>
12#include <vector>
13
14#include <memory>
15
16#include "crow/http_connection.h"
17#include "crow/logging.h"
18#include "crow/dumb_timer_queue.h"
19
20namespace crow
21{
22 using namespace boost;
23 using tcp = asio::ip::tcp;
24
25 template <typename Handler, typename Adaptor = SocketAdaptor, typename ... Middlewares>
26 class Server
27 {
28 public:
29 Server(Handler* handler, std::string bindaddr, uint16_t port, std::string server_name = "Crow/0.3", std::tuple<Middlewares...>* middlewares = nullptr, uint16_t concurrency = 1, typename Adaptor::context* adaptor_ctx = nullptr)
30 : acceptor_(io_service_, tcp::endpoint(boost::asio::ip::address::from_string(bindaddr), port)),
31 signals_(io_service_),
32 tick_timer_(io_service_),
33 handler_(handler),
34 concurrency_(concurrency == 0 ? 1 : concurrency),
35 server_name_(server_name),
36 port_(port),
37 bindaddr_(bindaddr),
38 middlewares_(middlewares),
39 adaptor_ctx_(adaptor_ctx)
40 {
41 }
42
43 void set_tick_function(std::chrono::milliseconds d, std::function<void()> f)
44 {
45 tick_interval_ = d;
46 tick_function_ = f;
47 }
48
49 void on_tick()
50 {
51 tick_function_();
52 tick_timer_.expires_from_now(boost::posix_time::milliseconds(tick_interval_.count()));
53 tick_timer_.async_wait([this](const boost::system::error_code& ec)
54 {
55 if (ec)
56 return;
57 on_tick();
58 });
59 }
60
61 void run()
62 {
63 for(int i = 0; i < concurrency_; i++)
64 io_service_pool_.emplace_back(new boost::asio::io_service());
65 get_cached_date_str_pool_.resize(concurrency_);
66 timer_queue_pool_.resize(concurrency_);
67
68 std::vector<std::future<void>> v;
69 std::atomic<int> init_count(0);
70 for(uint16_t i = 0; i < concurrency_; i ++)
71 v.push_back(
72 std::async(std::launch::async, [this, i, &init_count]{
73
74 // thread local date string get function
75 auto last = std::chrono::steady_clock::now();
76
77 std::string date_str;
78 auto update_date_str = [&]
79 {
80 auto last_time_t = time(0);
81 tm my_tm;
82
83#if defined(_MSC_VER) || defined(__MINGW32__)
84 gmtime_s(&my_tm, &last_time_t);
85#else
86 gmtime_r(&last_time_t, &my_tm);
87#endif
88 date_str.resize(100);
89 size_t date_str_sz = strftime(&date_str[0], 99, "%a, %d %b %Y %H:%M:%S GMT", &my_tm);
90 date_str.resize(date_str_sz);
91 };
92 update_date_str();
93 get_cached_date_str_pool_[i] = [&]()->std::string
94 {
95 if (std::chrono::steady_clock::now() - last >= std::chrono::seconds(1))
96 {
97 last = std::chrono::steady_clock::now();
98 update_date_str();
99 }
100 return date_str;
101 };
102
103 // initializing timer queue
104 detail::dumb_timer_queue timer_queue;
105 timer_queue_pool_[i] = &timer_queue;
106
107 timer_queue.set_io_service(*io_service_pool_[i]);
108 boost::asio::deadline_timer timer(*io_service_pool_[i]);
109 timer.expires_from_now(boost::posix_time::seconds(1));
110
111 std::function<void(const boost::system::error_code& ec)> handler;
112 handler = [&](const boost::system::error_code& ec){
113 if (ec)
114 return;
115 timer_queue.process();
116 timer.expires_from_now(boost::posix_time::seconds(1));
117 timer.async_wait(handler);
118 };
119 timer.async_wait(handler);
120
121 init_count ++;
122 while(1)
123 {
124 try
125 {
126 if (io_service_pool_[i]->run() == 0)
127 {
128 // when io_service.run returns 0, there are no more works to do.
129 break;
130 }
131 } catch(std::exception& e)
132 {
133 CROW_LOG_ERROR << "Worker Crash: An uncaught exception occurred: " << e.what();
134 }
135 }
136 }));
137
138 if (tick_function_ && tick_interval_.count() > 0)
139 {
140 tick_timer_.expires_from_now(boost::posix_time::milliseconds(tick_interval_.count()));
141 tick_timer_.async_wait([this](const boost::system::error_code& ec)
142 {
143 if (ec)
144 return;
145 on_tick();
146 });
147 }
148
149 CROW_LOG_INFO << server_name_ << " server is running at " << bindaddr_ <<":" << acceptor_.local_endpoint().port()
150 << " using " << concurrency_ << " threads";
151 CROW_LOG_INFO << "Call `app.loglevel(crow::LogLevel::Warning)` to hide Info level logs.";
152
153 signals_.async_wait(
154 [&](const boost::system::error_code& /*error*/, int /*signal_number*/){
155 stop();
156 });
157
158 while(concurrency_ != init_count)
159 std::this_thread::yield();
160
161 do_accept();
162
163 std::thread([this]{
164 io_service_.run();
165 CROW_LOG_INFO << "Exiting.";
166 }).join();
167 }
168
169 void stop()
170 {
171 io_service_.stop();
172 for(auto& io_service:io_service_pool_)
173 io_service->stop();
174 }
175
176 void signal_clear()
177 {
178 signals_.clear();
179 }
180
181 void signal_add(int signal_number)
182 {
183 signals_.add(signal_number);
184 }
185
186 private:
187 asio::io_service& pick_io_service()
188 {
189 // TODO load balancing
190 roundrobin_index_++;
191 if (roundrobin_index_ >= io_service_pool_.size())
192 roundrobin_index_ = 0;
193 return *io_service_pool_[roundrobin_index_];
194 }
195
196 void do_accept()
197 {
198 asio::io_service& is = pick_io_service();
199 auto p = new Connection<Adaptor, Handler, Middlewares...>(
200 is, handler_, server_name_, middlewares_,
201 get_cached_date_str_pool_[roundrobin_index_], *timer_queue_pool_[roundrobin_index_],
202 adaptor_ctx_);
203 acceptor_.async_accept(p->socket(),
204 [this, p, &is](boost::system::error_code ec)
205 {
206 if (!ec)
207 {
208 is.post([p]
209 {
210 p->start();
211 });
212 }
213 else
214 {
215 delete p;
216 }
217 do_accept();
218 });
219 }
220
221 private:
222 asio::io_service io_service_;
223 std::vector<std::unique_ptr<asio::io_service>> io_service_pool_;
224 std::vector<detail::dumb_timer_queue*> timer_queue_pool_;
225 std::vector<std::function<std::string()>> get_cached_date_str_pool_;
226 tcp::acceptor acceptor_;
227 boost::asio::signal_set signals_;
228 boost::asio::deadline_timer tick_timer_;
229
230 Handler* handler_;
231 uint16_t concurrency_{1};
232 std::string server_name_;
233 uint16_t port_;
234 std::string bindaddr_;
235 unsigned int roundrobin_index_{};
236
237 std::chrono::milliseconds tick_interval_;
238 std::function<void()> tick_function_;
239
240 std::tuple<Middlewares...>* middlewares_;
241
242#ifdef CROW_ENABLE_SSL
243 bool use_ssl_{false};
244 boost::asio::ssl::context ssl_context_{boost::asio::ssl::context::sslv23};
245#endif
246 typename Adaptor::context* adaptor_ctx_;
247 };
248}
Definition: http_server.h:27
Fast timer queue for fixed tick value.
Definition: dumb_timer_queue.h:18
void process()
Process the queue: take functions out in time intervals and execute them.
Definition: dumb_timer_queue.h:46