diff --git a/Dockerfile b/Dockerfile index b7247e4..c1f60f9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,9 +1,8 @@ FROM alpine as build-env -RUN apk add --no-cache build-base python3 cmake openssl-dev +RUN apk add --no-cache build-base python3 cmake COPY ./src ./src COPY ./lib ./lib -COPY ./tests ./tests COPY ./default_www ./default_www COPY ./build_supp ./build_supp COPY ./CMakeLists.txt . diff --git a/docker-compose.yml b/docker-compose.yml index ff74b34..e89cc51 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,7 +2,7 @@ services: anthracite-web: build: . ports: - - "8080:8080" + - "8080:80" volumes: - type: bind source: ./default_www/docker_compose/ diff --git a/lib/log/log.cpp b/lib/log/log.cpp index 8095903..63399d0 100644 --- a/lib/log/log.cpp +++ b/lib/log/log.cpp @@ -1,5 +1,4 @@ #include "./log.hpp" -#include namespace anthracite::log { enum LOG_LEVEL Logger::_level = LOG_LEVEL_NONE; @@ -26,9 +25,7 @@ LogBuf::LogBuf(std::ostream& output_stream, const std::string& tag, enum LOG_LEV int LogBuf::sync() { if (this->_level <= logger._level) { - char thread_name[100]; - pthread_getname_np(pthread_self(), thread_name, 100); - std::osyncstream(std::cout) << "[" << this->_tag << "] [" << syscall(SYS_gettid) << ":" << thread_name << "] "<< this->str(); + std::cout << "[" << this->_tag << "] " << this->str(); std::cout.flush(); } this->str(""); diff --git a/lib/socket/socket.cpp b/lib/socket/socket.cpp index 0c2046d..f8ec4ae 100644 --- a/lib/socket/socket.cpp +++ b/lib/socket/socket.cpp @@ -4,26 +4,18 @@ #include #include #include -#include #include #include #include #include -#include -#include -#include "assert.h" -#include -#include - namespace anthracite::socket { const struct timeval anthracite_socket::timeout_tv = { .tv_sec = 5, .tv_usec = 0 }; -anthracite_socket::anthracite_socket(int port, int max_queue, bool nonblocking) +anthracite_socket::anthracite_socket(int port, int max_queue) : server_socket(::socket(AF_INET, SOCK_STREAM, 0)) , client_ip("") - , _nonblocking(nonblocking) { struct sockaddr_in address {}; address.sin_family = AF_INET; @@ -34,21 +26,18 @@ anthracite_socket::anthracite_socket(int port, int max_queue, bool nonblocking) setsockopt(server_socket, SOL_SOCKET, SO_REUSEADDR, &reuse_opt, sizeof(reuse_opt)); bind(server_socket, reinterpret_cast(&address), sizeof(address)); - if (_nonblocking) { - fcntl(server_socket, F_SETFL, O_NONBLOCK); - } - listen(server_socket, max_queue); } bool anthracite_socket::wait_for_conn() { client_ip = ""; - client_socket = accept(server_socket, reinterpret_cast(&client_addr), &client_addr_len); - if (client_socket > 0) { - if (_nonblocking) { - fcntl(client_socket, F_SETFL, O_NONBLOCK); - } + struct timeval tv = { .tv_sec = 1, .tv_usec = 0 }; + fd_set read_fd; + FD_ZERO(&read_fd); + FD_SET(server_socket, &read_fd); + if (select(server_socket + 1, &read_fd, NULL, NULL, &wait_timeout)) { + client_socket = accept(server_socket, reinterpret_cast(&client_addr), &client_addr_len); std::array ip_str { 0 }; inet_ntop(AF_INET, &client_addr.sin_addr, ip_str.data(), INET_ADDRSTRLEN); client_ip = std::string(ip_str.data()); @@ -63,7 +52,8 @@ const std::string& anthracite_socket::get_client_ip() return client_ip; } -void anthracite_socket::close_conn() { +void anthracite_socket::close_conn() +{ close(client_socket); client_socket = -1; } @@ -76,21 +66,13 @@ void anthracite_socket::send_message(std::string& msg) send(client_socket, &msg[0], msg.length(), 0); } -bool anthracite_socket::has_client() { - return client_socket > 0; -} - std::string anthracite_socket::recv_message(int buffer_size) { if (client_socket == -1) { return ""; } - //setsockopt(client_socket, SOL_SOCKET, SO_RCVTIMEO, &timeout_tv, sizeof timeout_tv); - - int nodelay_opt = 1; - assert(setsockopt(client_socket, SOL_TCP, TCP_NODELAY, &nodelay_opt, sizeof(nodelay_opt)) == 0); - + setsockopt(client_socket, SOL_SOCKET, SO_RCVTIMEO, &timeout_tv, sizeof timeout_tv); std::vector response(buffer_size + 1); ssize_t result = recv(client_socket, response.data(), buffer_size + 1, 0); diff --git a/lib/socket/socket.hpp b/lib/socket/socket.hpp index ea8622d..5ac3562 100644 --- a/lib/socket/socket.hpp +++ b/lib/socket/socket.hpp @@ -14,7 +14,6 @@ namespace anthracite::socket { class anthracite_socket { protected: - bool _nonblocking; struct timeval wait_timeout = { .tv_sec = 1, .tv_usec = 0}; int server_socket; int client_socket {}; @@ -25,11 +24,10 @@ protected: static const int MAX_QUEUE_LENGTH = 100; public: - anthracite_socket(int port, int max_queue = MAX_QUEUE_LENGTH, bool nonblocking = false); + anthracite_socket(int port, int max_queue = MAX_QUEUE_LENGTH); virtual const std::string& get_client_ip() final; - virtual bool has_client(); virtual bool wait_for_conn(); virtual void close_conn(); virtual void send_message(std::string& msg); diff --git a/lib/thread_mgr/event_loop.cpp b/lib/thread_mgr/event_loop.cpp index 92c5bfb..1cf4d2a 100644 --- a/lib/thread_mgr/event_loop.cpp +++ b/lib/thread_mgr/event_loop.cpp @@ -5,10 +5,7 @@ #include "sys/epoll.h" #include #include -#include -#include #include -#include "signal.h" using std::chrono::duration; using std::chrono::duration_cast; @@ -38,20 +35,25 @@ event_loop::event_loop(backends::backend& backend, config::config& config) { } -bool event_loop::event_handler(socket::anthracite_socket* sock) +bool event_loop::event_handler(event& event) { - std::string raw_request = sock->recv_message(http::HEADER_BYTES); + std::string raw_request = event.socket()->recv_message(http::HEADER_BYTES); if (raw_request == "") { return false; } - http::request req(raw_request, sock->get_client_ip()); + http::request req(raw_request, event.socket()->get_client_ip()); std::unique_ptr resp = req.is_supported_version() ? _backend.handle_request(req) : _error_backend.handle_error(http::status_codes::HTTP_VERSION_NOT_SUPPORTED); std::string header = resp->header_to_string(); - sock->send_message(header); - sock->send_message(resp->content()); + event.socket()->send_message(header); + event.socket()->send_message(resp->content()); + auto end = high_resolution_clock::now(); + auto ms_int = duration_cast(end - event.timestamp()); + log::logger.log_request_and_response(req, resp, ms_int.count()); + + resp.reset(); if (req.close_connection()) { return false; } @@ -61,36 +63,70 @@ bool event_loop::event_handler(socket::anthracite_socket* sock) void event_loop::worker_thread_loop(int threadno) { - std::stringstream ss; - ss << "worker " << threadno; - pthread_setname_np(pthread_self(), ss.str().c_str()); - - struct epoll_event* events = new struct epoll_event[_config.max_clients()]; - int timeout_ms = 1000; - - if (_nonblocking) { - timeout_ms = 0; - } - - log::info << "Starting worker thread " << threadno << std::endl; + unsigned char buf[sizeof(class event)]; + std::osyncstream(log::info) << "Starting worker thread " << threadno << std::endl; while (_run) { - int ready_fds = epoll_wait(_epoll_fd, events, _config.max_clients(), timeout_ms); + // Get event from queue + std::unique_lock lock(_event_mtx); - if (ready_fds > 0) { - for (int i = 0; i < ready_fds; i++) { - socket::anthracite_socket* sockptr = reinterpret_cast(events[i].data.ptr); + event* ev = nullptr; - if (!event_handler(sockptr)) { - epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, sockptr->csock(), &events[i]); - sockptr->close_conn(); - delete sockptr; - } + if (_events.size() > 0) { + if (!lock.owns_lock()) { + lock.lock(); } + assert(lock.owns_lock()); + ev = new (buf) event(_events.front()); + _events.pop(); + lock.unlock(); + } else { + _event_cv.wait(lock, [this] { return this->_events.size() > 0 || !_run; }); + + if (!_run) { + break; + } + + assert(lock.owns_lock()); + ev = new (buf) event(_events.front()); + _events.pop(); + lock.unlock(); + } + + if (event_handler(*ev)) { + struct epoll_event event; + event.events = EPOLLIN; + event.data.ptr = ev->socket(); + epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, ev->socket()->csock(), &event); + } else { + ev->socket()->close_conn(); + delete ev->socket(); } } - log::info << "Stopping worker thread " << threadno << std::endl; + std::osyncstream(log::info) << "Stopping worker thread " << threadno << std::endl; +} + +void event_loop::eventer_thread_loop() +{ + struct epoll_event* events = new struct epoll_event[_config.max_clients()]; + std::osyncstream(log::info) << "epoll() thread started" << std::endl; + while (_run) { + int ready_fds = epoll_wait(_epoll_fd, events, _config.max_clients(), 1000); + + if (ready_fds > 0) { + std::lock_guard lg(_event_mtx); + for (int i = 0; i < ready_fds; i++) { + socket::anthracite_socket* sockptr = reinterpret_cast(events[i].data.ptr); + struct epoll_event ev; + epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, sockptr->csock(), &events[i]); + _events.push(event(sockptr, std::chrono::high_resolution_clock::now())); + } + _event_cv.notify_one(); + } + } + delete[] events; + std::osyncstream(log::info) << "epoll() thread exited" << std::endl; } void event_loop::listener_thread_loop(config::http_config& http_config) @@ -122,7 +158,7 @@ void event_loop::listener_thread_loop(config::http_config& http_config) } struct epoll_event event; - event.events = EPOLLIN | EPOLLEXCLUSIVE;// | EPOLLET; + event.events = EPOLLIN; event.data.ptr = client_sock; epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, client_sock->csock(), &event); } @@ -135,11 +171,10 @@ void event_loop::listener_thread_loop(config::http_config& http_config) void event_loop::start() { - signal(SIGPIPE, SIG_IGN); log::info << "Starting event_loop Thread Manager" << std::endl; - _run = true; _epoll_fd = epoll_create(1); + _run = true; std::vector listener_threads; std::vector worker_threads; @@ -159,6 +194,11 @@ void event_loop::start() listener_threads.push_back(std::move(thread)); } + { + auto thread = std::thread(&event_loop::eventer_thread_loop, this); + listener_threads.push_back(std::move(thread)); + } + for (std::thread& t : worker_threads) { t.join(); } @@ -171,5 +211,7 @@ void event_loop::start() void event_loop::stop() { _run = false; + std::lock_guard lg(_event_mtx); + _event_cv.notify_all(); } } diff --git a/lib/thread_mgr/event_loop.hpp b/lib/thread_mgr/event_loop.hpp index 3fa8ea5..d5ed923 100644 --- a/lib/thread_mgr/event_loop.hpp +++ b/lib/thread_mgr/event_loop.hpp @@ -19,13 +19,14 @@ namespace anthracite::thread_mgr { int _epoll_fd; std::mutex _event_mtx; + std::condition_variable _event_cv; + std::queue _events; backends::file_backend _error_backend; - bool _nonblocking; void worker_thread_loop(int threadno); void listener_thread_loop(config::http_config& http_config); void eventer_thread_loop(); - bool event_handler(socket::anthracite_socket*); + bool event_handler(event& ev); public: event_loop(backends::backend& backend, config::config& config); diff --git a/src/file_main.cpp b/src/file_main.cpp index ad3dcdd..027bafd 100644 --- a/src/file_main.cpp +++ b/src/file_main.cpp @@ -10,7 +10,7 @@ std::shared_ptr server = nullptr; extern "C" void signalHandler(int signum) { - //anthracite::log::warn << "Caught signal SIG" << sigabbrev_np(signum) << ", exiting Anthracite" << std::endl; + anthracite::log::warn << "Caught signal SIG" << sigabbrev_np(signum) << ", exiting Anthracite" << std::endl; if (server != nullptr) { server->stop(); } @@ -23,7 +23,7 @@ int main(int argc, char** argv) signal(SIGINT, signalHandler); anthracite::backends::file_backend fb("./www"); - anthracite::config::config cfg(1, 10); + anthracite::config::config cfg(5, 10000); cfg.add_http_config(anthracite::config::http_config(8080)); // cfg.add_https_config(config::https_config(8081, "", ""));