From 6c5feb867515900ac83bd46c4bad9c3e4692530e Mon Sep 17 00:00:00 2001 From: Nicholas Orlowsky Date: Sat, 22 Feb 2025 00:52:30 -0500 Subject: [PATCH] epoll per thd --- lib/socket/socket.cpp | 16 ++--- lib/thread_mgr/event_loop.cpp | 113 +++++++++++++++------------------- lib/thread_mgr/event_loop.hpp | 4 +- src/file_main.cpp | 2 +- 4 files changed, 61 insertions(+), 74 deletions(-) diff --git a/lib/socket/socket.cpp b/lib/socket/socket.cpp index f8ec4ae..7c748d8 100644 --- a/lib/socket/socket.cpp +++ b/lib/socket/socket.cpp @@ -32,19 +32,19 @@ anthracite_socket::anthracite_socket(int port, int max_queue) bool anthracite_socket::wait_for_conn() { client_ip = ""; - struct timeval tv = { .tv_sec = 1, .tv_usec = 0 }; - fd_set read_fd; - FD_ZERO(&read_fd); - FD_SET(server_socket, &read_fd); - if (select(server_socket + 1, &read_fd, NULL, NULL, &wait_timeout)) { + //struct timeval tv = { .tv_sec = 1, .tv_usec = 0 }; + //fd_set read_fd; + //FD_ZERO(&read_fd); + //FD_SET(server_socket, &read_fd); + //if (select(server_socket + 1, &read_fd, NULL, NULL, &wait_timeout)) { client_socket = accept(server_socket, reinterpret_cast(&client_addr), &client_addr_len); std::array ip_str { 0 }; inet_ntop(AF_INET, &client_addr.sin_addr, ip_str.data(), INET_ADDRSTRLEN); client_ip = std::string(ip_str.data()); return true; - } else { - return false; - } + //} else { + // return false; + //} } const std::string& anthracite_socket::get_client_ip() diff --git a/lib/thread_mgr/event_loop.cpp b/lib/thread_mgr/event_loop.cpp index 1cf4d2a..f271138 100644 --- a/lib/thread_mgr/event_loop.cpp +++ b/lib/thread_mgr/event_loop.cpp @@ -5,6 +5,7 @@ #include "sys/epoll.h" #include #include +#include #include using std::chrono::duration; @@ -35,23 +36,23 @@ event_loop::event_loop(backends::backend& backend, config::config& config) { } -bool event_loop::event_handler(event& event) +bool event_loop::event_handler(socket::anthracite_socket* sock) { - std::string raw_request = event.socket()->recv_message(http::HEADER_BYTES); + std::string raw_request = sock->recv_message(http::HEADER_BYTES); if (raw_request == "") { return false; } - http::request req(raw_request, event.socket()->get_client_ip()); + http::request req(raw_request, sock->get_client_ip()); std::unique_ptr resp = req.is_supported_version() ? _backend.handle_request(req) : _error_backend.handle_error(http::status_codes::HTTP_VERSION_NOT_SUPPORTED); std::string header = resp->header_to_string(); - event.socket()->send_message(header); - event.socket()->send_message(resp->content()); + sock->send_message(header); + sock->send_message(resp->content()); auto end = high_resolution_clock::now(); - auto ms_int = duration_cast(end - event.timestamp()); - log::logger.log_request_and_response(req, resp, ms_int.count()); + //auto ms_int = duration_cast(end - event.timestamp()); + //log::logger.log_request_and_response(req, resp, 9);//ms_int.count()); resp.reset(); if (req.close_connection()) { @@ -61,46 +62,30 @@ bool event_loop::event_handler(event& event) return true; } +#define QATATIME (50) + void event_loop::worker_thread_loop(int threadno) { - unsigned char buf[sizeof(class event)]; + struct epoll_event* events = new struct epoll_event[_config.max_clients()]; + int epoll_fd = _epoll_fds[threadno]; + + std::osyncstream(log::info) << "Starting worker thread " << threadno << " on pid " << syscall(SYS_gettid) << std::endl; - std::osyncstream(log::info) << "Starting worker thread " << threadno << std::endl; while (_run) { // Get event from queue - std::unique_lock lock(_event_mtx); + int ready_fds = epoll_wait(epoll_fd, events, _config.max_clients(), 1000); - event* ev = nullptr; + if (ready_fds > 0) { + std::lock_guard lg(_event_mtx); + for (int i = 0; i < ready_fds; i++) { + socket::anthracite_socket* sockptr = reinterpret_cast(events[i].data.ptr); - if (_events.size() > 0) { - if (!lock.owns_lock()) { - lock.lock(); + if (!event_handler(sockptr)) { + epoll_ctl(epoll_fd, EPOLL_CTL_DEL, sockptr->csock(), &events[i]); + sockptr->close_conn(); + delete sockptr; + } } - assert(lock.owns_lock()); - ev = new (buf) event(_events.front()); - _events.pop(); - lock.unlock(); - } else { - _event_cv.wait(lock, [this] { return this->_events.size() > 0 || !_run; }); - - if (!_run) { - break; - } - - assert(lock.owns_lock()); - ev = new (buf) event(_events.front()); - _events.pop(); - lock.unlock(); - } - - if (event_handler(*ev)) { - struct epoll_event event; - event.events = EPOLLIN; - event.data.ptr = ev->socket(); - epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, ev->socket()->csock(), &event); - } else { - ev->socket()->close_conn(); - delete ev->socket(); } } @@ -109,24 +94,24 @@ void event_loop::worker_thread_loop(int threadno) void event_loop::eventer_thread_loop() { - struct epoll_event* events = new struct epoll_event[_config.max_clients()]; - std::osyncstream(log::info) << "epoll() thread started" << std::endl; - while (_run) { - int ready_fds = epoll_wait(_epoll_fd, events, _config.max_clients(), 1000); + //struct epoll_event* events = new struct epoll_event[_config.max_clients()]; + //std::osyncstream(log::info) << "epoll() thread started on pid " << getpid() << std::endl; + //while (_run) { + // int ready_fds = epoll_wait(_epoll_fd, events, _config.max_clients(), 1000); - if (ready_fds > 0) { - std::lock_guard lg(_event_mtx); - for (int i = 0; i < ready_fds; i++) { - socket::anthracite_socket* sockptr = reinterpret_cast(events[i].data.ptr); - struct epoll_event ev; - epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, sockptr->csock(), &events[i]); - _events.push(event(sockptr, std::chrono::high_resolution_clock::now())); - } - _event_cv.notify_one(); - } - } - delete[] events; - std::osyncstream(log::info) << "epoll() thread exited" << std::endl; + // if (ready_fds > 0) { + // std::lock_guard lg(_event_mtx); + // for (int i = 0; i < ready_fds; i++) { + // socket::anthracite_socket* sockptr = reinterpret_cast(events[i].data.ptr); + // struct epoll_event ev; + // epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, sockptr->csock(), &events[i]); + // _events.push(event(sockptr, std::chrono::high_resolution_clock::now())); + // } + // _event_cv.notify_all(); + // } + //} + //delete[] events; + //std::osyncstream(log::info) << "epoll() thread exited" << std::endl; } void event_loop::listener_thread_loop(config::http_config& http_config) @@ -144,8 +129,9 @@ void event_loop::listener_thread_loop(config::http_config& http_config) socket = new socket::anthracite_socket(http_ptr->port()); } - std::osyncstream(log::info) << "Listening for " << (is_tls ? "HTTPS" : "HTTP") << " connections on port " << http_ptr->port() << std::endl; + std::osyncstream(log::info) << "Listening for " << (is_tls ? "HTTPS" : "HTTP") << " connections on port " << http_ptr->port() << " on pid " << getpid() << std::endl; + int assign_thread = 0; while (_run) { if (socket->wait_for_conn()) { socket::anthracite_socket* client_sock; @@ -160,7 +146,8 @@ void event_loop::listener_thread_loop(config::http_config& http_config) struct epoll_event event; event.events = EPOLLIN; event.data.ptr = client_sock; - epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, client_sock->csock(), &event); + epoll_ctl(_epoll_fds[assign_thread], EPOLL_CTL_ADD, client_sock->csock(), &event); + assign_thread = (assign_thread + 1) % _epoll_fds.size(); } } @@ -173,13 +160,13 @@ void event_loop::start() { log::info << "Starting event_loop Thread Manager" << std::endl; - _epoll_fd = epoll_create(1); _run = true; std::vector listener_threads; std::vector worker_threads; for (int i = 0; i < _config.worker_threads(); i++) { + _epoll_fds.push_back(epoll_create(1)); auto thread = std::thread(&event_loop::worker_thread_loop, this, i); worker_threads.push_back(std::move(thread)); } @@ -194,10 +181,10 @@ void event_loop::start() listener_threads.push_back(std::move(thread)); } - { - auto thread = std::thread(&event_loop::eventer_thread_loop, this); - listener_threads.push_back(std::move(thread)); - } + //{ + // auto thread = std::thread(&event_loop::eventer_thread_loop, this); + // listener_threads.push_back(std::move(thread)); + //} for (std::thread& t : worker_threads) { t.join(); diff --git a/lib/thread_mgr/event_loop.hpp b/lib/thread_mgr/event_loop.hpp index d5ed923..7f09c97 100644 --- a/lib/thread_mgr/event_loop.hpp +++ b/lib/thread_mgr/event_loop.hpp @@ -17,7 +17,7 @@ namespace anthracite::thread_mgr { std::chrono::time_point& timestamp(); }; - int _epoll_fd; + std::vector _epoll_fds; std::mutex _event_mtx; std::condition_variable _event_cv; std::queue _events; @@ -26,7 +26,7 @@ namespace anthracite::thread_mgr { void worker_thread_loop(int threadno); void listener_thread_loop(config::http_config& http_config); void eventer_thread_loop(); - bool event_handler(event& ev); + bool event_handler(socket::anthracite_socket*); public: event_loop(backends::backend& backend, config::config& config); diff --git a/src/file_main.cpp b/src/file_main.cpp index 027bafd..0ce2289 100644 --- a/src/file_main.cpp +++ b/src/file_main.cpp @@ -23,7 +23,7 @@ int main(int argc, char** argv) signal(SIGINT, signalHandler); anthracite::backends::file_backend fb("./www"); - anthracite::config::config cfg(5, 10000); + anthracite::config::config cfg(3, 1000); cfg.add_http_config(anthracite::config::http_config(8080)); // cfg.add_https_config(config::https_config(8081, "", ""));