From c07f3ebf8109ec40d5ec5019ca4b54dd888aa864 Mon Sep 17 00:00:00 2001 From: Nicholas Orlowsky Date: Sun, 23 Feb 2025 17:06:32 -0500 Subject: [PATCH] faster nonblocking io --- Dockerfile | 3 +- docker-compose.yml | 2 +- lib/log/log.cpp | 5 ++- lib/socket/socket.cpp | 44 +++++++++++++++------- lib/socket/socket.hpp | 4 +- lib/thread_mgr/event_loop.cpp | 71 +++++++++++------------------------ lib/thread_mgr/event_loop.hpp | 5 +-- src/file_main.cpp | 4 +- 8 files changed, 66 insertions(+), 72 deletions(-) diff --git a/Dockerfile b/Dockerfile index c1f60f9..b7247e4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,8 +1,9 @@ FROM alpine as build-env -RUN apk add --no-cache build-base python3 cmake +RUN apk add --no-cache build-base python3 cmake openssl-dev COPY ./src ./src COPY ./lib ./lib +COPY ./tests ./tests COPY ./default_www ./default_www COPY ./build_supp ./build_supp COPY ./CMakeLists.txt . diff --git a/docker-compose.yml b/docker-compose.yml index e89cc51..ff74b34 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,7 +2,7 @@ services: anthracite-web: build: . ports: - - "8080:80" + - "8080:8080" volumes: - type: bind source: ./default_www/docker_compose/ diff --git a/lib/log/log.cpp b/lib/log/log.cpp index 63399d0..8095903 100644 --- a/lib/log/log.cpp +++ b/lib/log/log.cpp @@ -1,4 +1,5 @@ #include "./log.hpp" +#include namespace anthracite::log { enum LOG_LEVEL Logger::_level = LOG_LEVEL_NONE; @@ -25,7 +26,9 @@ LogBuf::LogBuf(std::ostream& output_stream, const std::string& tag, enum LOG_LEV int LogBuf::sync() { if (this->_level <= logger._level) { - std::cout << "[" << this->_tag << "] " << this->str(); + char thread_name[100]; + pthread_getname_np(pthread_self(), thread_name, 100); + std::osyncstream(std::cout) << "[" << this->_tag << "] [" << syscall(SYS_gettid) << ":" << thread_name << "] "<< this->str(); std::cout.flush(); } this->str(""); diff --git a/lib/socket/socket.cpp b/lib/socket/socket.cpp index 7c748d8..0c2046d 100644 --- a/lib/socket/socket.cpp +++ b/lib/socket/socket.cpp @@ -4,18 +4,26 @@ #include #include #include +#include #include #include #include #include +#include +#include +#include "assert.h" +#include +#include + namespace anthracite::socket { const struct timeval anthracite_socket::timeout_tv = { .tv_sec = 5, .tv_usec = 0 }; -anthracite_socket::anthracite_socket(int port, int max_queue) +anthracite_socket::anthracite_socket(int port, int max_queue, bool nonblocking) : server_socket(::socket(AF_INET, SOCK_STREAM, 0)) , client_ip("") + , _nonblocking(nonblocking) { struct sockaddr_in address {}; address.sin_family = AF_INET; @@ -26,25 +34,28 @@ anthracite_socket::anthracite_socket(int port, int max_queue) setsockopt(server_socket, SOL_SOCKET, SO_REUSEADDR, &reuse_opt, sizeof(reuse_opt)); bind(server_socket, reinterpret_cast(&address), sizeof(address)); + if (_nonblocking) { + fcntl(server_socket, F_SETFL, O_NONBLOCK); + } + listen(server_socket, max_queue); } bool anthracite_socket::wait_for_conn() { client_ip = ""; - //struct timeval tv = { .tv_sec = 1, .tv_usec = 0 }; - //fd_set read_fd; - //FD_ZERO(&read_fd); - //FD_SET(server_socket, &read_fd); - //if (select(server_socket + 1, &read_fd, NULL, NULL, &wait_timeout)) { - client_socket = accept(server_socket, reinterpret_cast(&client_addr), &client_addr_len); + client_socket = accept(server_socket, reinterpret_cast(&client_addr), &client_addr_len); + if (client_socket > 0) { + if (_nonblocking) { + fcntl(client_socket, F_SETFL, O_NONBLOCK); + } std::array ip_str { 0 }; inet_ntop(AF_INET, &client_addr.sin_addr, ip_str.data(), INET_ADDRSTRLEN); client_ip = std::string(ip_str.data()); return true; - //} else { - // return false; - //} + } else { + return false; + } } const std::string& anthracite_socket::get_client_ip() @@ -52,8 +63,7 @@ const std::string& anthracite_socket::get_client_ip() return client_ip; } -void anthracite_socket::close_conn() -{ +void anthracite_socket::close_conn() { close(client_socket); client_socket = -1; } @@ -66,13 +76,21 @@ void anthracite_socket::send_message(std::string& msg) send(client_socket, &msg[0], msg.length(), 0); } +bool anthracite_socket::has_client() { + return client_socket > 0; +} + std::string anthracite_socket::recv_message(int buffer_size) { if (client_socket == -1) { return ""; } - setsockopt(client_socket, SOL_SOCKET, SO_RCVTIMEO, &timeout_tv, sizeof timeout_tv); + //setsockopt(client_socket, SOL_SOCKET, SO_RCVTIMEO, &timeout_tv, sizeof timeout_tv); + + int nodelay_opt = 1; + assert(setsockopt(client_socket, SOL_TCP, TCP_NODELAY, &nodelay_opt, sizeof(nodelay_opt)) == 0); + std::vector response(buffer_size + 1); ssize_t result = recv(client_socket, response.data(), buffer_size + 1, 0); diff --git a/lib/socket/socket.hpp b/lib/socket/socket.hpp index 5ac3562..ea8622d 100644 --- a/lib/socket/socket.hpp +++ b/lib/socket/socket.hpp @@ -14,6 +14,7 @@ namespace anthracite::socket { class anthracite_socket { protected: + bool _nonblocking; struct timeval wait_timeout = { .tv_sec = 1, .tv_usec = 0}; int server_socket; int client_socket {}; @@ -24,10 +25,11 @@ protected: static const int MAX_QUEUE_LENGTH = 100; public: - anthracite_socket(int port, int max_queue = MAX_QUEUE_LENGTH); + anthracite_socket(int port, int max_queue = MAX_QUEUE_LENGTH, bool nonblocking = false); virtual const std::string& get_client_ip() final; + virtual bool has_client(); virtual bool wait_for_conn(); virtual void close_conn(); virtual void send_message(std::string& msg); diff --git a/lib/thread_mgr/event_loop.cpp b/lib/thread_mgr/event_loop.cpp index f271138..92c5bfb 100644 --- a/lib/thread_mgr/event_loop.cpp +++ b/lib/thread_mgr/event_loop.cpp @@ -6,7 +6,9 @@ #include #include #include +#include #include +#include "signal.h" using std::chrono::duration; using std::chrono::duration_cast; @@ -50,11 +52,6 @@ bool event_loop::event_handler(socket::anthracite_socket* sock) sock->send_message(header); sock->send_message(resp->content()); - auto end = high_resolution_clock::now(); - //auto ms_int = duration_cast(end - event.timestamp()); - //log::logger.log_request_and_response(req, resp, 9);//ms_int.count()); - - resp.reset(); if (req.close_connection()) { return false; } @@ -62,26 +59,30 @@ bool event_loop::event_handler(socket::anthracite_socket* sock) return true; } -#define QATATIME (50) - void event_loop::worker_thread_loop(int threadno) { - struct epoll_event* events = new struct epoll_event[_config.max_clients()]; - int epoll_fd = _epoll_fds[threadno]; + std::stringstream ss; + ss << "worker " << threadno; + pthread_setname_np(pthread_self(), ss.str().c_str()); - std::osyncstream(log::info) << "Starting worker thread " << threadno << " on pid " << syscall(SYS_gettid) << std::endl; + struct epoll_event* events = new struct epoll_event[_config.max_clients()]; + int timeout_ms = 1000; + + if (_nonblocking) { + timeout_ms = 0; + } + + log::info << "Starting worker thread " << threadno << std::endl; while (_run) { - // Get event from queue - int ready_fds = epoll_wait(epoll_fd, events, _config.max_clients(), 1000); + int ready_fds = epoll_wait(_epoll_fd, events, _config.max_clients(), timeout_ms); if (ready_fds > 0) { - std::lock_guard lg(_event_mtx); for (int i = 0; i < ready_fds; i++) { socket::anthracite_socket* sockptr = reinterpret_cast(events[i].data.ptr); if (!event_handler(sockptr)) { - epoll_ctl(epoll_fd, EPOLL_CTL_DEL, sockptr->csock(), &events[i]); + epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, sockptr->csock(), &events[i]); sockptr->close_conn(); delete sockptr; } @@ -89,29 +90,7 @@ void event_loop::worker_thread_loop(int threadno) } } - std::osyncstream(log::info) << "Stopping worker thread " << threadno << std::endl; -} - -void event_loop::eventer_thread_loop() -{ - //struct epoll_event* events = new struct epoll_event[_config.max_clients()]; - //std::osyncstream(log::info) << "epoll() thread started on pid " << getpid() << std::endl; - //while (_run) { - // int ready_fds = epoll_wait(_epoll_fd, events, _config.max_clients(), 1000); - - // if (ready_fds > 0) { - // std::lock_guard lg(_event_mtx); - // for (int i = 0; i < ready_fds; i++) { - // socket::anthracite_socket* sockptr = reinterpret_cast(events[i].data.ptr); - // struct epoll_event ev; - // epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, sockptr->csock(), &events[i]); - // _events.push(event(sockptr, std::chrono::high_resolution_clock::now())); - // } - // _event_cv.notify_all(); - // } - //} - //delete[] events; - //std::osyncstream(log::info) << "epoll() thread exited" << std::endl; + log::info << "Stopping worker thread " << threadno << std::endl; } void event_loop::listener_thread_loop(config::http_config& http_config) @@ -129,9 +108,8 @@ void event_loop::listener_thread_loop(config::http_config& http_config) socket = new socket::anthracite_socket(http_ptr->port()); } - std::osyncstream(log::info) << "Listening for " << (is_tls ? "HTTPS" : "HTTP") << " connections on port " << http_ptr->port() << " on pid " << getpid() << std::endl; + std::osyncstream(log::info) << "Listening for " << (is_tls ? "HTTPS" : "HTTP") << " connections on port " << http_ptr->port() << std::endl; - int assign_thread = 0; while (_run) { if (socket->wait_for_conn()) { socket::anthracite_socket* client_sock; @@ -144,10 +122,9 @@ void event_loop::listener_thread_loop(config::http_config& http_config) } struct epoll_event event; - event.events = EPOLLIN; + event.events = EPOLLIN | EPOLLEXCLUSIVE;// | EPOLLET; event.data.ptr = client_sock; - epoll_ctl(_epoll_fds[assign_thread], EPOLL_CTL_ADD, client_sock->csock(), &event); - assign_thread = (assign_thread + 1) % _epoll_fds.size(); + epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, client_sock->csock(), &event); } } @@ -158,15 +135,16 @@ void event_loop::listener_thread_loop(config::http_config& http_config) void event_loop::start() { + signal(SIGPIPE, SIG_IGN); log::info << "Starting event_loop Thread Manager" << std::endl; _run = true; + _epoll_fd = epoll_create(1); std::vector listener_threads; std::vector worker_threads; for (int i = 0; i < _config.worker_threads(); i++) { - _epoll_fds.push_back(epoll_create(1)); auto thread = std::thread(&event_loop::worker_thread_loop, this, i); worker_threads.push_back(std::move(thread)); } @@ -181,11 +159,6 @@ void event_loop::start() listener_threads.push_back(std::move(thread)); } - //{ - // auto thread = std::thread(&event_loop::eventer_thread_loop, this); - // listener_threads.push_back(std::move(thread)); - //} - for (std::thread& t : worker_threads) { t.join(); } @@ -198,7 +171,5 @@ void event_loop::start() void event_loop::stop() { _run = false; - std::lock_guard lg(_event_mtx); - _event_cv.notify_all(); } } diff --git a/lib/thread_mgr/event_loop.hpp b/lib/thread_mgr/event_loop.hpp index 7f09c97..3fa8ea5 100644 --- a/lib/thread_mgr/event_loop.hpp +++ b/lib/thread_mgr/event_loop.hpp @@ -17,11 +17,10 @@ namespace anthracite::thread_mgr { std::chrono::time_point& timestamp(); }; - std::vector _epoll_fds; + int _epoll_fd; std::mutex _event_mtx; - std::condition_variable _event_cv; - std::queue _events; backends::file_backend _error_backend; + bool _nonblocking; void worker_thread_loop(int threadno); void listener_thread_loop(config::http_config& http_config); diff --git a/src/file_main.cpp b/src/file_main.cpp index 0ce2289..ad3dcdd 100644 --- a/src/file_main.cpp +++ b/src/file_main.cpp @@ -10,7 +10,7 @@ std::shared_ptr server = nullptr; extern "C" void signalHandler(int signum) { - anthracite::log::warn << "Caught signal SIG" << sigabbrev_np(signum) << ", exiting Anthracite" << std::endl; + //anthracite::log::warn << "Caught signal SIG" << sigabbrev_np(signum) << ", exiting Anthracite" << std::endl; if (server != nullptr) { server->stop(); } @@ -23,7 +23,7 @@ int main(int argc, char** argv) signal(SIGINT, signalHandler); anthracite::backends::file_backend fb("./www"); - anthracite::config::config cfg(3, 1000); + anthracite::config::config cfg(1, 10); cfg.add_http_config(anthracite::config::http_config(8080)); // cfg.add_https_config(config::https_config(8081, "", ""));