From 409024e04adb6459d1baf0ab54ecf28569b07a4f Mon Sep 17 00:00:00 2001 From: Nicholas Orlowsky Date: Fri, 21 Feb 2025 18:24:28 -0500 Subject: [PATCH] polished up event loop changes --- CHANGELOG.md | 3 +- CMakeLists.txt | 3 - README.md | 5 +- lib/anthracite.cpp | 44 ----- lib/anthracite.hpp | 6 - lib/config/config.hpp | 11 +- lib/http/request.cpp | 145 +++++++-------- lib/log/log.cpp | 5 + lib/log/log.hpp | 8 +- lib/socket/openssl_socket.cpp | 22 ++- lib/socket/socket.cpp | 6 +- lib/socket/socket.hpp | 2 + lib/thread_mgr/event_loop.cpp | 329 +++++++++++++++++++--------------- lib/thread_mgr/event_loop.hpp | 7 +- src/api_main.cpp | 112 ------------ src/file_main.cpp | 34 +++- tests/speed_tests.cpp | 31 ++-- tests/unit_tests.cpp | 9 +- 18 files changed, 354 insertions(+), 428 deletions(-) delete mode 100644 lib/anthracite.cpp delete mode 100644 lib/anthracite.hpp delete mode 100644 src/api_main.cpp diff --git a/CHANGELOG.md b/CHANGELOG.md index b26e941..c3e9edb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # 0.3.0 - SSL support via OpenSSL -- Switched from thread per connection to event driven threading model +- Added "Thread Manager" class to allow for multiple (or custom) threading models (process per thread, event loop) +- Defaul threading model is now event loop - Rewrote request parser for readability and speed - Added improved logging with different log levels - Separated anthracite into libanthracite and anthracite-bin to allow for other projects to implement anthracite (example in ./src/api_main.cpp) diff --git a/CMakeLists.txt b/CMakeLists.txt index 2449bbd..a1fbdf8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -39,9 +39,6 @@ target_link_libraries(anthracite-bin anthracite) add_dependencies(anthracite-bin build-supplemental) add_dependencies(anthracite-bin anthracite) -add_executable(anthracite-api-bin src/api_main.cpp) -target_link_libraries(anthracite-api-bin anthracite) - include(FetchContent) FetchContent_Declare( googletest diff --git a/README.md b/README.md index 0352b11..629ba55 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,6 @@ # Anthracite -A simple web server written in C++. Supports HTTP 1.0 & 1.1. + +Anthracite is an extensible, low-dependency, fast web server. ## Developing @@ -16,7 +17,7 @@ Create a `build/` directory, run `cmake ..`, and then `make` to build. - [x] HTTP/1.1 - [x] Enhance logging - [x] Create library that can be used to implement custom backends (i.e. webapi, fileserver, etc) -- [ ] Faster parsing +- [x] Faster parsing - [ ] HTTP/2 - [ ] Improve benchmarking infrastructure - [ ] Fix glaring security issues diff --git a/lib/anthracite.cpp b/lib/anthracite.cpp deleted file mode 100644 index 450deaf..0000000 --- a/lib/anthracite.cpp +++ /dev/null @@ -1,44 +0,0 @@ -#include "./anthracite.hpp" -#include "./log/log.hpp" -#include -#include -#include -#include -#include "./config/config.hpp" -#include "./thread_mgr/event_loop.hpp" -#include - -using namespace anthracite; - -void log_request_and_response(http::request& req, std::unique_ptr& resp, uint32_t micros); - -constexpr int default_port = 80; -constexpr int max_worker_threads = 128; - -thread_mgr::event_loop* elp = nullptr; - - -extern "C" void signalHandler(int signum) { - log::warn << "Caught signal #" << signum << ", exiting Anthracite" << std::endl; - elp->stop(); -} - - -int anthracite_main(backends::backend& be, config::config& config) -{ - signal(SIGTERM, signalHandler); - signal(SIGSEGV, signalHandler); - signal(SIGINT, signalHandler); - signal(SIGABRT, signalHandler); - - log::logger.initialize(log::LOG_LEVEL_INFO); - thread_mgr::event_loop el(be, config); - elp = ⪙ - el.start(); - return 0; -} - -void log_request_and_response(http::request& req, std::unique_ptr& resp, uint32_t micros) -{ - log::info << "[" << resp->status_code() << " " + http::status_map.find(resp->status_code())->second + "] " + req.client_ip() + " " + http::reverse_method_map.find(req.get_method())->second + " " + req.path() << " in " << micros << " usecs" << std::endl; -} diff --git a/lib/anthracite.hpp b/lib/anthracite.hpp deleted file mode 100644 index f342344..0000000 --- a/lib/anthracite.hpp +++ /dev/null @@ -1,6 +0,0 @@ -#include "backends/backend.hpp" -#include "config/config.hpp" - -using namespace anthracite; - -int anthracite_main(backends::backend& be, config::config& cfg); diff --git a/lib/config/config.hpp b/lib/config/config.hpp index 3fcfbd3..f353a21 100644 --- a/lib/config/config.hpp +++ b/lib/config/config.hpp @@ -23,12 +23,13 @@ namespace anthracite::config { }; class config { - uint16_t _worker_threads; + int _worker_threads; + int _max_clients; std::optional _http_config; std::optional _https_config; public: - config(uint16_t worker_threads) : _worker_threads(worker_threads) { + config(int worker_threads, int max_clients) : _worker_threads(worker_threads), _max_clients(max_clients) { } void add_http_config(http_config config) { @@ -39,9 +40,13 @@ namespace anthracite::config { _https_config = config; } - uint16_t worker_threads() { + int worker_threads() { return _worker_threads; } + + int max_clients() { + return _max_clients; + } std::optional& http_cfg() { return _http_config; diff --git a/lib/http/request.cpp b/lib/http/request.cpp index 75d90df..ca4b648 100644 --- a/lib/http/request.cpp +++ b/lib/http/request.cpp @@ -1,87 +1,91 @@ #include "request.hpp" #include "../log/log.hpp" #include "constants.hpp" -#include #include +#include #include namespace anthracite::http { -void request::parse_header(std::string& raw_line) { +void request::parse_header(std::string& raw_line) +{ auto delim_pos = raw_line.find_first_of(':'); - auto value_pos = raw_line.find_first_not_of(' ', delim_pos+1); + auto value_pos = raw_line.find_first_not_of(' ', delim_pos + 1); - std::string header_name = raw_line.substr(0,delim_pos); + std::string header_name = raw_line.substr(0, delim_pos); std::string header_val = raw_line.substr(value_pos); _headers[header_name] = header_val; } -void request::parse_query_param(std::string& raw_param) { +void request::parse_query_param(std::string& raw_param) +{ auto delim_pos = raw_param.find_first_of('='); - auto value_pos = delim_pos+1; + auto value_pos = delim_pos + 1; - std::string query_name = raw_param.substr(0,delim_pos); + std::string query_name = raw_param.substr(0, delim_pos); std::string query_val = raw_param.substr(value_pos); _query_params[query_name] = query_val; } -void request::parse_path(char* raw_path) { +void request::parse_path(char* raw_path) +{ char* saveptr = nullptr; char* tok = strtok_r(raw_path, "?", &saveptr); - if (tok){ + if (tok) { _path = tok; } tok = strtok_r(nullptr, "?", &saveptr); - while(tok) { + while (tok) { std::string rtok(tok); parse_query_param(rtok); tok = strtok_r(nullptr, "?", &saveptr); } } -void request::parse_request_line(char* raw_line) { - request_line_parser_state state = METHOD; +void request::parse_request_line(char* raw_line) +{ + request_line_parser_state state = METHOD; - char* saveptr = nullptr; - char* tok = strtok_r(raw_line, " \r", &saveptr); + char* saveptr = nullptr; + char* tok = strtok_r(raw_line, " \r", &saveptr); - while(tok){ - switch(state) { - case METHOD: { - auto search = method_map.find(tok); - if (search != method_map.end()) { - _method = search->second; - } else { - _method = method::UNKNOWN; - } - - state = PATH; - break; - }; - - case PATH: { - std::string str_tok(tok); - parse_path(tok); - state = VERSION; - break; - }; - - case VERSION: { - auto search = version_map.find(tok); - if (search != version_map.end()) { - _http_version = search->second; - } else { - _http_version = version::HTTP_1_0; - } - return; - }; + while (tok) { + switch (state) { + case METHOD: { + auto search = method_map.find(tok); + if (search != method_map.end()) { + _method = search->second; + } else { + _method = method::UNKNOWN; } - tok = strtok_r(nullptr, " \r", &saveptr); + + state = PATH; + break; + }; + + case PATH: { + std::string str_tok(tok); + parse_path(tok); + state = VERSION; + break; + }; + + case VERSION: { + auto search = version_map.find(tok); + if (search != version_map.end()) { + _http_version = search->second; + } else { + _http_version = version::HTTP_1_0; + } + return; + }; } + tok = strtok_r(nullptr, " \r", &saveptr); + } } request::request(std::string& raw_data, const std::string& client_ip) @@ -94,26 +98,27 @@ request::request(std::string& raw_data, const std::string& client_ip) char* saveptr = nullptr; char* tok = strtok_r(raw_data.data(), "\r\n", &saveptr); - while(tok && state != BODY_CONTENT){ - switch(state) { - case REQUEST_LINE: { - parse_request_line(tok); - state = HEADERS; - tok = strtok_r(nullptr, "\n", &saveptr); - break; - }; - case HEADERS: { - if (tok[0] == '\r') { - state = BODY_CONTENT; - } else { - std::string rtok(tok); - rtok.pop_back(); - parse_header(rtok); + while (tok && state != BODY_CONTENT) { + switch (state) { + case REQUEST_LINE: { + parse_request_line(tok); + state = HEADERS; + tok = strtok_r(nullptr, "\n", &saveptr); + break; + }; + case HEADERS: { + if (tok[0] == '\r') { + state = BODY_CONTENT; + } else { + std::string rtok(tok); + rtok.pop_back(); + parse_header(rtok); tok = strtok_r(nullptr, "\n", &saveptr); - } - break; - }; - case BODY_CONTENT: break; + } + break; + }; + case BODY_CONTENT: + break; } } @@ -121,9 +126,9 @@ request::request(std::string& raw_data, const std::string& client_ip) if (tok) { _body_content = std::string(tok); } - //if (getline(line_stream, line, '\0')) { - // _body_content = line; - //} + // if (getline(line_stream, line, '\0')) { + // _body_content = line; + // } } std::string request::path() { return _path; } @@ -147,11 +152,11 @@ bool request::close_connection() const auto& header = _headers.find("Connection"); const bool found = header != _headers.end(); - if (found && header->second == "keep-alive") { - return false; + if (found && header->second == "close") { + return true; } - return true; + return false; } std::string request::to_string() diff --git a/lib/log/log.cpp b/lib/log/log.cpp index 4c59e8b..63399d0 100644 --- a/lib/log/log.cpp +++ b/lib/log/log.cpp @@ -10,6 +10,11 @@ void Logger::initialize(enum LOG_LEVEL level) _level = level; } +void Logger::log_request_and_response(http::request& req, std::unique_ptr& resp, uint32_t micros) +{ + log::info << "[" << resp->status_code() << " " + http::status_map.find(resp->status_code())->second + "] " + req.client_ip() + " " + http::reverse_method_map.find(req.get_method())->second + " " + req.path() << " in " << micros << " usecs" << std::endl; +} + LogBuf::LogBuf(std::ostream& output_stream, const std::string& tag, enum LOG_LEVEL level) : _output_stream(output_stream) , _tag(tag) diff --git a/lib/log/log.hpp b/lib/log/log.hpp index ba1bb05..6a99749 100644 --- a/lib/log/log.hpp +++ b/lib/log/log.hpp @@ -3,6 +3,10 @@ #include #include #include +#include +#include +#include "../http/request.hpp" +#include "../http/response.hpp" namespace anthracite::log { enum LOG_LEVEL { @@ -21,9 +25,9 @@ namespace anthracite::log { Logger(); void initialize(enum LOG_LEVEL level); + void log_request_and_response(http::request& req, std::unique_ptr& resp, uint32_t micros); }; - class LogBuf : public std::stringbuf { std::string _tag; @@ -50,4 +54,6 @@ namespace anthracite::log { static class LogBuf debugBuf{std::cout, "DEBG", LOG_LEVEL_DEBUG}; static std::ostream debug(&debugBuf); + + }; diff --git a/lib/socket/openssl_socket.cpp b/lib/socket/openssl_socket.cpp index eddf655..9bb4639 100644 --- a/lib/socket/openssl_socket.cpp +++ b/lib/socket/openssl_socket.cpp @@ -1,18 +1,18 @@ -#include -#include -#include #include "./openssl_socket.hpp" +#include "../log/log.hpp" #include #include +#include +#include #include #include +#include +#include #include #include #include #include #include -#include -#include "../log/log.hpp" namespace anthracite::socket { @@ -21,7 +21,7 @@ SSL_CTX* openssl_socket::_context = nullptr; openssl_socket::openssl_socket(int port, int max_queue) : anthracite_socket(port, max_queue) { - const SSL_METHOD *method = TLS_server_method(); + const SSL_METHOD* method = TLS_server_method(); if (_context == nullptr) { _context = SSL_CTX_new(method); @@ -37,7 +37,7 @@ openssl_socket::openssl_socket(int port, int max_queue) throw std::exception(); } - if (SSL_CTX_use_PrivateKey_file(_context, "key.pem", SSL_FILETYPE_PEM) <= 0 ) { + if (SSL_CTX_use_PrivateKey_file(_context, "key.pem", SSL_FILETYPE_PEM) <= 0) { log::err << "Unable to open key.pem" << std::endl; throw std::exception(); } @@ -48,11 +48,11 @@ openssl_socket::~openssl_socket() = default; bool openssl_socket::wait_for_conn() { client_ip = ""; - struct timeval tv = {.tv_sec = 1, .tv_usec = 0}; + struct timeval tv = { .tv_sec = 1, .tv_usec = 0 }; fd_set read_fd; FD_ZERO(&read_fd); FD_SET(server_socket, &read_fd); - if (select(server_socket+1, &read_fd, NULL, NULL, &wait_timeout)) { + if (select(server_socket + 1, &read_fd, NULL, NULL, &wait_timeout)) { client_socket = accept(server_socket, reinterpret_cast(&client_addr), &client_addr_len); std::array ip_str { 0 }; inet_ntop(AF_INET, &client_addr.sin_addr, ip_str.data(), INET_ADDRSTRLEN); @@ -70,8 +70,6 @@ bool openssl_socket::wait_for_conn() } else { return false; } - - } void openssl_socket::close_conn() @@ -98,7 +96,7 @@ std::string openssl_socket::recv_message(int buffer_size) setsockopt(client_socket, SOL_SOCKET, SO_RCVTIMEO, &timeout_tv, sizeof timeout_tv); std::vector response(buffer_size + 1); - ssize_t result = SSL_read(_ssl, response.data(), buffer_size+1); + ssize_t result = SSL_read(_ssl, response.data(), buffer_size + 1); if (result < 1) { return ""; diff --git a/lib/socket/socket.cpp b/lib/socket/socket.cpp index 5a54ac1..f8ec4ae 100644 --- a/lib/socket/socket.cpp +++ b/lib/socket/socket.cpp @@ -11,7 +11,7 @@ namespace anthracite::socket { -const struct timeval anthracite_socket::timeout_tv = { .tv_sec = 5, .tv_usec = 0}; +const struct timeval anthracite_socket::timeout_tv = { .tv_sec = 5, .tv_usec = 0 }; anthracite_socket::anthracite_socket(int port, int max_queue) : server_socket(::socket(AF_INET, SOCK_STREAM, 0)) @@ -32,11 +32,11 @@ anthracite_socket::anthracite_socket(int port, int max_queue) bool anthracite_socket::wait_for_conn() { client_ip = ""; - struct timeval tv = {.tv_sec = 1, .tv_usec = 0}; + struct timeval tv = { .tv_sec = 1, .tv_usec = 0 }; fd_set read_fd; FD_ZERO(&read_fd); FD_SET(server_socket, &read_fd); - if (select(server_socket+1, &read_fd, NULL, NULL, &wait_timeout)) { + if (select(server_socket + 1, &read_fd, NULL, NULL, &wait_timeout)) { client_socket = accept(server_socket, reinterpret_cast(&client_addr), &client_addr_len); std::array ip_str { 0 }; inet_ntop(AF_INET, &client_addr.sin_addr, ip_str.data(), INET_ADDRSTRLEN); diff --git a/lib/socket/socket.hpp b/lib/socket/socket.hpp index 5df0fde..5ac3562 100644 --- a/lib/socket/socket.hpp +++ b/lib/socket/socket.hpp @@ -32,6 +32,8 @@ public: virtual void close_conn(); virtual void send_message(std::string& msg); virtual std::string recv_message(int buffer_size); + + int csock() { return client_socket; } }; }; diff --git a/lib/thread_mgr/event_loop.cpp b/lib/thread_mgr/event_loop.cpp index 1a9c55f..1cf4d2a 100644 --- a/lib/thread_mgr/event_loop.cpp +++ b/lib/thread_mgr/event_loop.cpp @@ -1,176 +1,217 @@ #include "./event_loop.hpp" #include "../log/log.hpp" #include "../socket/openssl_socket.hpp" -#include +#include "assert.h" +#include "sys/epoll.h" #include +#include #include -using std::chrono::high_resolution_clock; -using std::chrono::duration_cast; using std::chrono::duration; +using std::chrono::duration_cast; +using std::chrono::high_resolution_clock; using std::chrono::milliseconds; namespace anthracite::thread_mgr { - event_loop::event::event(socket::anthracite_socket* socket) : - _socket(socket) {} +event_loop::event::event(socket::anthracite_socket* socket, std::chrono::time_point timestamp) + : _socket(socket) + , _ts(timestamp) +{ +} - socket::anthracite_socket* event_loop::event::socket() { - return _socket; +socket::anthracite_socket* event_loop::event::socket() +{ + return _socket; +} + +std::chrono::time_point& event_loop::event::timestamp() +{ + return _ts; +} + +event_loop::event_loop(backends::backend& backend, config::config& config) + : thread_mgr(backend, config) + , _error_backend("./www") +{ +} + +bool event_loop::event_handler(event& event) +{ + std::string raw_request = event.socket()->recv_message(http::HEADER_BYTES); + + if (raw_request == "") { + return false; } - event_loop::event_loop(backends::backend& backend, config::config& config) : thread_mgr(backend, config), _error_backend("./www") {} + http::request req(raw_request, event.socket()->get_client_ip()); + std::unique_ptr resp = req.is_supported_version() ? _backend.handle_request(req) : _error_backend.handle_error(http::status_codes::HTTP_VERSION_NOT_SUPPORTED); + std::string header = resp->header_to_string(); + event.socket()->send_message(header); + event.socket()->send_message(resp->content()); - bool event_loop::event_handler(event& event) { - std::string raw_request = event.socket()->recv_message(http::HEADER_BYTES); - - // We're doing the start here even though it would ideally be done - // before the first line since if we leave the connection open for - // HTTP 1.1, we can spend a bit of time waiting - auto start = high_resolution_clock::now(); - - if (raw_request == "") { - event.socket()->close_conn(); - delete event.socket(); - return false; - } - - http::request req(raw_request, event.socket()->get_client_ip()); - std::unique_ptr resp = req.is_supported_version() ? _backend.handle_request(req) : _error_backend.handle_error(http::status_codes::HTTP_VERSION_NOT_SUPPORTED); - std::string header = resp->header_to_string(); - event.socket()->send_message(header); - event.socket()->send_message(resp->content()); - - auto end = high_resolution_clock::now(); - auto ms_int = duration_cast(end-start); - //log_request_and_response(req, resp , ms_int.count()); - - resp.reset(); - if (req.close_connection()) { - event.socket()->close_conn(); - delete event.socket(); - return false; - } + auto end = high_resolution_clock::now(); + auto ms_int = duration_cast(end - event.timestamp()); + log::logger.log_request_and_response(req, resp, ms_int.count()); - return true; + resp.reset(); + if (req.close_connection()) { + return false; } - void event_loop::worker_thread_loop(int threadno) { - unsigned char buf[sizeof(class event)]; + return true; +} - std::osyncstream(log::info) << "Starting worker thread " << threadno << std::endl; - while(_run) { - // Get event from queue - std::unique_lock lock(_event_mtx); +void event_loop::worker_thread_loop(int threadno) +{ + unsigned char buf[sizeof(class event)]; + std::osyncstream(log::info) << "Starting worker thread " << threadno << std::endl; + while (_run) { + // Get event from queue + std::unique_lock lock(_event_mtx); - event* ev = nullptr; + event* ev = nullptr; - if (_events.size() > 0) { - ev = new (buf) event(_events.back()); - _events.pop(); - lock.unlock(); - } else { - _event_cv.wait(lock, [this]{ return this->_events.size() > 0 || !_run; }); - - if (!_run) { - break; - } - - ev = new (buf) event(_events.back()); - _events.pop(); - lock.unlock(); + if (_events.size() > 0) { + if (!lock.owns_lock()) { + lock.lock(); } - - - // process - bool requeue = event_handler(*ev); - - // if necessary, requeue - if (requeue) { - { - std::lock_guard lg(_event_mtx); - _events.push(*ev); - } - _event_cv.notify_one(); - } - } - - std::osyncstream(log::info) << "Stopping worker thread " << threadno << std::endl; - } - - void event_loop::listener_thread_loop(config::http_config& http_config) { - socket::anthracite_socket* socket; - - config::http_config* http_ptr = &http_config; - config::https_config* https_ptr = dynamic_cast(http_ptr); - - bool is_tls = https_ptr != nullptr; - - if (is_tls){ - socket = new socket::openssl_socket(https_ptr->port()); + assert(lock.owns_lock()); + ev = new (buf) event(_events.front()); + _events.pop(); + lock.unlock(); } else { - socket = new socket::anthracite_socket(http_ptr->port()); - } + _event_cv.wait(lock, [this] { return this->_events.size() > 0 || !_run; }); - std::osyncstream(log::info) << "Listening for " << (is_tls ? "HTTPS" : "HTTP" ) << " connections on port " << http_ptr->port() << std::endl; - - while (_run) { - if(socket->wait_for_conn()) { - socket::anthracite_socket* client_sock; - - if (is_tls){ - socket::openssl_socket* ssl_sock = dynamic_cast(socket); - client_sock = new socket::openssl_socket(*ssl_sock); - } else { - client_sock = new socket::anthracite_socket(*socket); - } - - std::lock_guard lg(_event_mtx); - _events.push(event(client_sock)); - _event_cv.notify_one(); + if (!_run) { + break; } + + assert(lock.owns_lock()); + ev = new (buf) event(_events.front()); + _events.pop(); + lock.unlock(); } - std::osyncstream(log::info) << "Stopping listening for " << (is_tls ? "HTTPS" : "HTTP") << " connections on port " << http_ptr->port() << std::endl; - - delete socket; - } - - void event_loop::start() { - log::info << "Starting event_loop Thread Manager" << std::endl; - - _run = true; - - std::vector listener_threads; - std::vector worker_threads; - - for(int i = 0; i < _config.worker_threads(); i++) { - auto thread = std::thread(&event_loop::worker_thread_loop, this, i); - worker_threads.push_back(std::move(thread)); - } - - if (_config.http_cfg().has_value()) { - auto thread = std::thread(&event_loop::listener_thread_loop, this, std::ref(_config.http_cfg().value())); - listener_threads.push_back(std::move(thread)); - } - - if (_config.https_cfg().has_value()) { - auto thread = std::thread(&event_loop::listener_thread_loop, this, std::ref(_config.https_cfg().value())); - listener_threads.push_back(std::move(thread)); - } - - for(std::thread& t : worker_threads) { - t.join(); - } - - for(std::thread& t : listener_threads) { - t.join(); + if (event_handler(*ev)) { + struct epoll_event event; + event.events = EPOLLIN; + event.data.ptr = ev->socket(); + epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, ev->socket()->csock(), &event); + } else { + ev->socket()->close_conn(); + delete ev->socket(); } } - void event_loop::stop() { - _run = false; - std::lock_guard lg(_event_mtx); - _event_cv.notify_all(); + std::osyncstream(log::info) << "Stopping worker thread " << threadno << std::endl; +} + +void event_loop::eventer_thread_loop() +{ + struct epoll_event* events = new struct epoll_event[_config.max_clients()]; + std::osyncstream(log::info) << "epoll() thread started" << std::endl; + while (_run) { + int ready_fds = epoll_wait(_epoll_fd, events, _config.max_clients(), 1000); + + if (ready_fds > 0) { + std::lock_guard lg(_event_mtx); + for (int i = 0; i < ready_fds; i++) { + socket::anthracite_socket* sockptr = reinterpret_cast(events[i].data.ptr); + struct epoll_event ev; + epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, sockptr->csock(), &events[i]); + _events.push(event(sockptr, std::chrono::high_resolution_clock::now())); + } + _event_cv.notify_one(); + } + } + delete[] events; + std::osyncstream(log::info) << "epoll() thread exited" << std::endl; +} + +void event_loop::listener_thread_loop(config::http_config& http_config) +{ + socket::anthracite_socket* socket; + + config::http_config* http_ptr = &http_config; + config::https_config* https_ptr = dynamic_cast(http_ptr); + + bool is_tls = https_ptr != nullptr; + + if (is_tls) { + socket = new socket::openssl_socket(https_ptr->port()); + } else { + socket = new socket::anthracite_socket(http_ptr->port()); + } + + std::osyncstream(log::info) << "Listening for " << (is_tls ? "HTTPS" : "HTTP") << " connections on port " << http_ptr->port() << std::endl; + + while (_run) { + if (socket->wait_for_conn()) { + socket::anthracite_socket* client_sock; + + if (is_tls) { + socket::openssl_socket* ssl_sock = dynamic_cast(socket); + client_sock = new socket::openssl_socket(*ssl_sock); + } else { + client_sock = new socket::anthracite_socket(*socket); + } + + struct epoll_event event; + event.events = EPOLLIN; + event.data.ptr = client_sock; + epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, client_sock->csock(), &event); + } + } + + std::osyncstream(log::info) << "Stopping listening for " << (is_tls ? "HTTPS" : "HTTP") << " connections on port " << http_ptr->port() << std::endl; + + delete socket; +} + +void event_loop::start() +{ + log::info << "Starting event_loop Thread Manager" << std::endl; + + _epoll_fd = epoll_create(1); + _run = true; + + std::vector listener_threads; + std::vector worker_threads; + + for (int i = 0; i < _config.worker_threads(); i++) { + auto thread = std::thread(&event_loop::worker_thread_loop, this, i); + worker_threads.push_back(std::move(thread)); + } + + if (_config.http_cfg().has_value()) { + auto thread = std::thread(&event_loop::listener_thread_loop, this, std::ref(_config.http_cfg().value())); + listener_threads.push_back(std::move(thread)); + } + + if (_config.https_cfg().has_value()) { + auto thread = std::thread(&event_loop::listener_thread_loop, this, std::ref(_config.https_cfg().value())); + listener_threads.push_back(std::move(thread)); + } + + { + auto thread = std::thread(&event_loop::eventer_thread_loop, this); + listener_threads.push_back(std::move(thread)); + } + + for (std::thread& t : worker_threads) { + t.join(); + } + + for (std::thread& t : listener_threads) { + t.join(); } } + +void event_loop::stop() +{ + _run = false; + std::lock_guard lg(_event_mtx); + _event_cv.notify_all(); +} +} diff --git a/lib/thread_mgr/event_loop.hpp b/lib/thread_mgr/event_loop.hpp index 917af6b..d5ed923 100644 --- a/lib/thread_mgr/event_loop.hpp +++ b/lib/thread_mgr/event_loop.hpp @@ -1,6 +1,7 @@ #include "./thread_mgr.hpp" #include "../socket/socket.hpp" #include "../backends/file_backend.hpp" +#include #include #include #include @@ -9,11 +10,14 @@ namespace anthracite::thread_mgr { class event_loop : public virtual thread_mgr { class event { socket::anthracite_socket* _socket; + std::chrono::time_point _ts; public: - event(socket::anthracite_socket* socket); + event(socket::anthracite_socket* socket, std::chrono::time_point timestamp); socket::anthracite_socket* socket(); + std::chrono::time_point& timestamp(); }; + int _epoll_fd; std::mutex _event_mtx; std::condition_variable _event_cv; std::queue _events; @@ -21,6 +25,7 @@ namespace anthracite::thread_mgr { void worker_thread_loop(int threadno); void listener_thread_loop(config::http_config& http_config); + void eventer_thread_loop(); bool event_handler(event& ev); public: diff --git a/src/api_main.cpp b/src/api_main.cpp deleted file mode 100644 index 54d7b3b..0000000 --- a/src/api_main.cpp +++ /dev/null @@ -1,112 +0,0 @@ -#include "../lib/anthracite.hpp" -#include "../lib/backends/backend.hpp" -#include "../lib/http/constants.hpp" -#include -#include -#include -#include -#include -#include -#include - -using namespace anthracite; - -using CallbackType = std::unique_ptr (*)(http::request&); -class api_backend : public backends::backend { - - class RouteNode { - public: - std::optional callback; - - RouteNode() - : callback(std::nullopt) - { - } - std::unordered_map routes; - }; - - RouteNode root; - - std::unique_ptr default_route(http::request& req) - { - std::unique_ptr resp = std::make_unique(); - - resp->add_body("Not Found"); - resp->add_header(http::header("Content-Type", "application/json")); - resp->add_status(http::status_codes::NOT_FOUND); - - return resp; - } - - std::unique_ptr find_handler(http::request& req) - { - std::string filename = req.path().substr(1); - std::vector result; - std::stringstream ss(filename); - std::string item; - - RouteNode* cur = &root; - while (getline(ss, item, '/')) { - if (cur->routes.find(item) == cur->routes.end()) { - if (cur->routes.find("*") == cur->routes.end()) { - break; - } else { - cur = &cur->routes["*"]; - } - } else { - cur = &cur->routes[item]; - } - } - - if (cur->callback.has_value()) { - return cur->callback.value()(req); - } else { - return default_route(req); - } - } - - std::unique_ptr handle_request(http::request& req) override - { - return find_handler(req); - } - -public: - api_backend() - { - root.routes = std::unordered_map(); - } - - void register_endpoint(std::string pathspec, CallbackType callback) - { - std::vector result; - std::stringstream ss(pathspec); - std::string item; - - RouteNode* cur = &root; - while (getline(ss, item, '/')) { - cur->routes[item] = RouteNode {}; - cur = &cur->routes[item]; - } - - cur->callback = callback; - } -}; - -std::unique_ptr handle_request(http::request& req) -{ - std::unique_ptr resp = std::make_unique(); - - resp->add_body(R"({"user": "endpoint"}")"); - resp->add_header(http::header("Content-Type", "application/json")); - resp->add_status(http::status_codes::OK); - - return resp; -} - -int main(int argc, char** argv) -{ - auto args = std::span(argv, size_t(argc)); - api_backend ab; - ab.register_endpoint("users/*", handle_request); - //anthracite_main(argc, argv, ab); -} diff --git a/src/file_main.cpp b/src/file_main.cpp index 7a723a9..027bafd 100644 --- a/src/file_main.cpp +++ b/src/file_main.cpp @@ -1,15 +1,35 @@ -#include "../lib/anthracite.hpp" #include "../lib/backends/file_backend.hpp" #include "../lib/config/config.hpp" +#include "../lib/log/log.hpp" +#include "../lib/thread_mgr/event_loop.hpp" +#include "signal.h" +#include "string.h" +#include -using namespace anthracite; +std::shared_ptr server = nullptr; + +extern "C" void signalHandler(int signum) +{ + anthracite::log::warn << "Caught signal SIG" << sigabbrev_np(signum) << ", exiting Anthracite" << std::endl; + if (server != nullptr) { + server->stop(); + } +} int main(int argc, char** argv) { - backends::file_backend fb("./www"); - config::config cfg(5); - cfg.add_http_config(config::http_config(8080)); - cfg.add_https_config(config::https_config(8081, "", "")); + anthracite::log::logger.initialize(anthracite::log::LOG_LEVEL_INFO); + anthracite::log::info << "Starting Anthracite, a higher performance web server" << std::endl; + signal(SIGINT, signalHandler); - anthracite_main(fb, cfg); + anthracite::backends::file_backend fb("./www"); + anthracite::config::config cfg(5, 10000); + cfg.add_http_config(anthracite::config::http_config(8080)); + // cfg.add_https_config(config::https_config(8081, "", "")); + + server = std::make_shared(fb, cfg); + + server->start(); + + anthracite::log::info << "Stopping Anthracite, a higher performance web server" << std::endl; } diff --git a/tests/speed_tests.cpp b/tests/speed_tests.cpp index 9f6835f..e9da069 100644 --- a/tests/speed_tests.cpp +++ b/tests/speed_tests.cpp @@ -1,21 +1,21 @@ -#include -#include -#include #include "../lib/http/request.hpp" +#include +#include +#include #ifdef SPEEDTEST_COMPARE_BOOST #include #endif - -using std::chrono::high_resolution_clock; -using std::chrono::duration_cast; using std::chrono::duration; +using std::chrono::duration_cast; +using std::chrono::high_resolution_clock; using std::chrono::milliseconds; constexpr uint32_t num_requests = 10000000; -TEST(speed_tests, request_parse) { +TEST(speed_tests, request_parse) +{ std::ifstream t("./test_files/test_request.http"); std::stringstream buffer; buffer << t.rdbuf(); @@ -23,24 +23,25 @@ TEST(speed_tests, request_parse) { auto start = high_resolution_clock::now(); - for(int i = 0; i < num_requests; ++i) { - volatile anthracite::http::request req (raw_req, "0.0.0.0"); + for (int i = 0; i < num_requests; ++i) { + volatile anthracite::http::request req(raw_req, "0.0.0.0"); } auto end = high_resolution_clock::now(); - auto ms_int = duration_cast(end-start); + auto ms_int = duration_cast(end - start); double m_rps = ((1000.0 / ms_int.count()) * num_requests) / 1000000; - std::cout << "Parsed " << (num_requests/1000000) << " Million requests in " << ms_int << " ms"; + std::cout << "Parsed " << (num_requests / 1000000) << " Million requests in " << ms_int << " ms"; std::cout << " at " << m_rps << " Million RPS " << std::endl; ASSERT_LT(ms_int.count(), 2000); } #ifdef SPEEDTEST_COMPARE_BOOST -TEST(speed_tests, boost) { +TEST(speed_tests, boost) +{ std::ifstream t("./test_files/test_request.http"); std::stringstream buffer; buffer << t.rdbuf(); @@ -48,7 +49,7 @@ TEST(speed_tests, boost) { auto start = high_resolution_clock::now(); - for(int i = 0; i < num_requests; ++i) { + for (int i = 0; i < num_requests; ++i) { boost::system::error_code ec; boost::beast::http::request_parser p; p.put(boost::asio::buffer(raw_req), ec); @@ -56,11 +57,11 @@ TEST(speed_tests, boost) { } auto end = high_resolution_clock::now(); - auto ms_int = duration_cast(end-start); + auto ms_int = duration_cast(end - start); double m_rps = ((1000.0 / ms_int.count()) * num_requests) / 1000000; - std::cout << "Parsed " << (num_requests/1000000) << " Million requests in " << ms_int << " ms"; + std::cout << "Parsed " << (num_requests / 1000000) << " Million requests in " << ms_int << " ms"; std::cout << " at " << m_rps << " Million RPS " << std::endl; } #endif diff --git a/tests/unit_tests.cpp b/tests/unit_tests.cpp index c7e3117..e152679 100644 --- a/tests/unit_tests.cpp +++ b/tests/unit_tests.cpp @@ -1,9 +1,10 @@ -#include -#include #include "../lib/http/request.hpp" #include +#include +#include -TEST(unit_tests, single_request_parse) { +TEST(unit_tests, single_request_parse) +{ std::ifstream t("./test_files/test_request.http"); std::stringstream buffer; buffer << t.rdbuf(); @@ -11,7 +12,7 @@ TEST(unit_tests, single_request_parse) { std::string raw_req = buffer.str(); std::string expected = buffer.str(); - anthracite::http::request req (raw_req, "0.0.0.0"); + anthracite::http::request req(raw_req, "0.0.0.0"); ASSERT_EQ(expected, req.to_string()); }