faster nonblocking io
	
		
			
	
		
	
	
		
	
		
			Some checks failed
		
		
	
	
		
			
				
	
				Docker Build & Publish / build (push) Failing after 1h24m16s
				
			
		
		
	
	
				
					
				
			
		
			Some checks failed
		
		
	
	Docker Build & Publish / build (push) Failing after 1h24m16s
				
			This commit is contained in:
		
							parent
							
								
									6c5feb8675
								
							
						
					
					
						commit
						c07f3ebf81
					
				
					 8 changed files with 66 additions and 72 deletions
				
			
		|  | @ -1,8 +1,9 @@ | |||
| FROM alpine as build-env | ||||
| 
 | ||||
| RUN apk add --no-cache build-base python3 cmake | ||||
| RUN apk add --no-cache build-base python3 cmake openssl-dev | ||||
| COPY ./src ./src | ||||
| COPY ./lib ./lib | ||||
| COPY ./tests ./tests | ||||
| COPY ./default_www ./default_www | ||||
| COPY ./build_supp ./build_supp | ||||
| COPY ./CMakeLists.txt . | ||||
|  |  | |||
|  | @ -2,7 +2,7 @@ services: | |||
|   anthracite-web: | ||||
|     build: . | ||||
|     ports: | ||||
|       - "8080:80" | ||||
|       - "8080:8080" | ||||
|     volumes: | ||||
|       - type: bind | ||||
|         source: ./default_www/docker_compose/ | ||||
|  |  | |||
|  | @ -1,4 +1,5 @@ | |||
| #include "./log.hpp" | ||||
| #include <syncstream> | ||||
| 
 | ||||
| namespace anthracite::log { | ||||
| enum LOG_LEVEL Logger::_level = LOG_LEVEL_NONE; | ||||
|  | @ -25,7 +26,9 @@ LogBuf::LogBuf(std::ostream& output_stream, const std::string& tag, enum LOG_LEV | |||
| int LogBuf::sync() | ||||
| { | ||||
|     if (this->_level <= logger._level) { | ||||
|         std::cout << "[" << this->_tag << "] " << this->str(); | ||||
|         char thread_name[100]; | ||||
|         pthread_getname_np(pthread_self(), thread_name, 100); | ||||
|         std::osyncstream(std::cout) << "[" << this->_tag << "] [" << syscall(SYS_gettid) <<  ":" << thread_name << "] "<< this->str(); | ||||
|         std::cout.flush(); | ||||
|     } | ||||
|     this->str(""); | ||||
|  |  | |||
|  | @ -4,18 +4,26 @@ | |||
| #include <malloc.h> | ||||
| #include <netinet/in.h> | ||||
| #include <string> | ||||
| #include <sys/epoll.h> | ||||
| #include <sys/socket.h> | ||||
| #include <sys/time.h> | ||||
| #include <unistd.h> | ||||
| #include <vector> | ||||
| #include <unistd.h> | ||||
| #include <fcntl.h> | ||||
| #include "assert.h" | ||||
| #include <netinet/in.h> | ||||
| #include <netinet/tcp.h> | ||||
| 
 | ||||
| 
 | ||||
| namespace anthracite::socket { | ||||
| 
 | ||||
| const struct timeval anthracite_socket::timeout_tv = { .tv_sec = 5, .tv_usec = 0 }; | ||||
| 
 | ||||
| anthracite_socket::anthracite_socket(int port, int max_queue) | ||||
| anthracite_socket::anthracite_socket(int port, int max_queue, bool nonblocking) | ||||
|     : server_socket(::socket(AF_INET, SOCK_STREAM, 0)) | ||||
|     , client_ip("") | ||||
|     , _nonblocking(nonblocking) | ||||
| { | ||||
|     struct sockaddr_in address {}; | ||||
|     address.sin_family = AF_INET; | ||||
|  | @ -26,25 +34,28 @@ anthracite_socket::anthracite_socket(int port, int max_queue) | |||
|     setsockopt(server_socket, SOL_SOCKET, SO_REUSEADDR, &reuse_opt, sizeof(reuse_opt)); | ||||
|     bind(server_socket, reinterpret_cast<struct sockaddr*>(&address), sizeof(address)); | ||||
| 
 | ||||
|     if (_nonblocking) { | ||||
|         fcntl(server_socket, F_SETFL, O_NONBLOCK); | ||||
|     } | ||||
| 
 | ||||
|     listen(server_socket, max_queue); | ||||
| } | ||||
| 
 | ||||
| bool anthracite_socket::wait_for_conn() | ||||
| { | ||||
|     client_ip = ""; | ||||
|     //struct timeval tv = { .tv_sec = 1, .tv_usec = 0 };
 | ||||
|     //fd_set read_fd;
 | ||||
|     //FD_ZERO(&read_fd);
 | ||||
|     //FD_SET(server_socket, &read_fd);
 | ||||
|     //if (select(server_socket + 1, &read_fd, NULL, NULL, &wait_timeout)) {
 | ||||
|         client_socket = accept(server_socket, reinterpret_cast<struct sockaddr*>(&client_addr), &client_addr_len); | ||||
|     client_socket = accept(server_socket, reinterpret_cast<struct sockaddr*>(&client_addr), &client_addr_len); | ||||
|     if (client_socket > 0) { | ||||
|         if (_nonblocking) { | ||||
|             fcntl(client_socket, F_SETFL, O_NONBLOCK); | ||||
|         } | ||||
|         std::array<char, INET_ADDRSTRLEN> ip_str { 0 }; | ||||
|         inet_ntop(AF_INET, &client_addr.sin_addr, ip_str.data(), INET_ADDRSTRLEN); | ||||
|         client_ip = std::string(ip_str.data()); | ||||
|         return true; | ||||
|     //} else {
 | ||||
|     //    return false;
 | ||||
|     //}
 | ||||
|     } else { | ||||
|         return false; | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| const std::string& anthracite_socket::get_client_ip() | ||||
|  | @ -52,8 +63,7 @@ const std::string& anthracite_socket::get_client_ip() | |||
|     return client_ip; | ||||
| } | ||||
| 
 | ||||
| void anthracite_socket::close_conn() | ||||
| { | ||||
| void anthracite_socket::close_conn() { | ||||
|     close(client_socket); | ||||
|     client_socket = -1; | ||||
| } | ||||
|  | @ -66,13 +76,21 @@ void anthracite_socket::send_message(std::string& msg) | |||
|     send(client_socket, &msg[0], msg.length(), 0); | ||||
| } | ||||
| 
 | ||||
| bool anthracite_socket::has_client() { | ||||
|     return client_socket > 0; | ||||
| } | ||||
| 
 | ||||
| std::string anthracite_socket::recv_message(int buffer_size) | ||||
| { | ||||
|     if (client_socket == -1) { | ||||
|         return ""; | ||||
|     } | ||||
| 
 | ||||
|     setsockopt(client_socket, SOL_SOCKET, SO_RCVTIMEO, &timeout_tv, sizeof timeout_tv); | ||||
|     //setsockopt(client_socket, SOL_SOCKET, SO_RCVTIMEO, &timeout_tv, sizeof timeout_tv);
 | ||||
| 
 | ||||
|     int nodelay_opt = 1; | ||||
|     assert(setsockopt(client_socket, SOL_TCP, TCP_NODELAY, &nodelay_opt, sizeof(nodelay_opt)) == 0); | ||||
| 
 | ||||
|     std::vector<char> response(buffer_size + 1); | ||||
|     ssize_t result = recv(client_socket, response.data(), buffer_size + 1, 0); | ||||
| 
 | ||||
|  |  | |||
|  | @ -14,6 +14,7 @@ namespace anthracite::socket { | |||
| class anthracite_socket { | ||||
| 
 | ||||
| protected: | ||||
|     bool _nonblocking; | ||||
|     struct timeval wait_timeout = { .tv_sec = 1, .tv_usec = 0}; | ||||
|     int server_socket; | ||||
|     int client_socket {}; | ||||
|  | @ -24,10 +25,11 @@ protected: | |||
|     static const int MAX_QUEUE_LENGTH = 100; | ||||
| 
 | ||||
| public: | ||||
|     anthracite_socket(int port, int max_queue = MAX_QUEUE_LENGTH); | ||||
|     anthracite_socket(int port, int max_queue = MAX_QUEUE_LENGTH, bool nonblocking = false); | ||||
| 
 | ||||
|     virtual const std::string& get_client_ip() final; | ||||
| 
 | ||||
|     virtual bool has_client(); | ||||
|     virtual bool wait_for_conn(); | ||||
|     virtual void close_conn(); | ||||
|     virtual void send_message(std::string& msg); | ||||
|  |  | |||
|  | @ -6,7 +6,9 @@ | |||
| #include <chrono> | ||||
| #include <mutex> | ||||
| #include <pthread.h> | ||||
| #include <sstream> | ||||
| #include <syncstream> | ||||
| #include "signal.h" | ||||
| 
 | ||||
| using std::chrono::duration; | ||||
| using std::chrono::duration_cast; | ||||
|  | @ -50,11 +52,6 @@ bool event_loop::event_handler(socket::anthracite_socket* sock) | |||
|     sock->send_message(header); | ||||
|     sock->send_message(resp->content()); | ||||
| 
 | ||||
|     auto end = high_resolution_clock::now(); | ||||
|     //auto ms_int = duration_cast<std::chrono::microseconds>(end - event.timestamp());
 | ||||
|     //log::logger.log_request_and_response(req, resp, 9);//ms_int.count());
 | ||||
| 
 | ||||
|     resp.reset(); | ||||
|     if (req.close_connection()) { | ||||
|         return false; | ||||
|     } | ||||
|  | @ -62,26 +59,30 @@ bool event_loop::event_handler(socket::anthracite_socket* sock) | |||
|     return true; | ||||
| } | ||||
| 
 | ||||
| #define QATATIME (50) | ||||
| 
 | ||||
| void event_loop::worker_thread_loop(int threadno) | ||||
| { | ||||
|     struct epoll_event* events = new struct epoll_event[_config.max_clients()]; | ||||
|     int epoll_fd = _epoll_fds[threadno]; | ||||
|     std::stringstream ss; | ||||
|     ss << "worker " << threadno; | ||||
|     pthread_setname_np(pthread_self(), ss.str().c_str()); | ||||
| 
 | ||||
|     std::osyncstream(log::info) << "Starting worker thread " << threadno << " on pid " << syscall(SYS_gettid) << std::endl; | ||||
|     struct epoll_event* events = new struct epoll_event[_config.max_clients()]; | ||||
|     int timeout_ms = 1000; | ||||
| 
 | ||||
|     if (_nonblocking) { | ||||
|         timeout_ms = 0; | ||||
|     } | ||||
| 
 | ||||
|     log::info << "Starting worker thread " << threadno << std::endl; | ||||
| 
 | ||||
|     while (_run) { | ||||
|         // Get event from queue
 | ||||
|         int ready_fds = epoll_wait(epoll_fd, events, _config.max_clients(), 1000); | ||||
|         int ready_fds = epoll_wait(_epoll_fd, events, _config.max_clients(), timeout_ms); | ||||
| 
 | ||||
|         if (ready_fds > 0) { | ||||
|             std::lock_guard<std::mutex> lg(_event_mtx); | ||||
|             for (int i = 0; i < ready_fds; i++) { | ||||
|                 socket::anthracite_socket* sockptr = reinterpret_cast<socket::anthracite_socket*>(events[i].data.ptr); | ||||
| 
 | ||||
|                 if (!event_handler(sockptr)) { | ||||
|                     epoll_ctl(epoll_fd, EPOLL_CTL_DEL, sockptr->csock(), &events[i]); | ||||
|                     epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, sockptr->csock(), &events[i]); | ||||
|                     sockptr->close_conn(); | ||||
|                     delete sockptr; | ||||
|                 } | ||||
|  | @ -89,29 +90,7 @@ void event_loop::worker_thread_loop(int threadno) | |||
|         } | ||||
|     } | ||||
| 
 | ||||
|     std::osyncstream(log::info) << "Stopping worker thread " << threadno << std::endl; | ||||
| } | ||||
| 
 | ||||
| void event_loop::eventer_thread_loop() | ||||
| { | ||||
|     //struct epoll_event* events = new struct epoll_event[_config.max_clients()];
 | ||||
|     //std::osyncstream(log::info) << "epoll() thread started on pid " << getpid() << std::endl;
 | ||||
|     //while (_run) {
 | ||||
|     //    int ready_fds = epoll_wait(_epoll_fd, events, _config.max_clients(), 1000);
 | ||||
| 
 | ||||
|     //    if (ready_fds > 0) {
 | ||||
|     //        std::lock_guard<std::mutex> lg(_event_mtx);
 | ||||
|     //        for (int i = 0; i < ready_fds; i++) {
 | ||||
|     //            socket::anthracite_socket* sockptr = reinterpret_cast<socket::anthracite_socket*>(events[i].data.ptr);
 | ||||
|     //            struct epoll_event ev;
 | ||||
|     //            epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, sockptr->csock(), &events[i]);
 | ||||
|     //            _events.push(event(sockptr, std::chrono::high_resolution_clock::now()));
 | ||||
|     //        }
 | ||||
|     //        _event_cv.notify_all();
 | ||||
|     //    }
 | ||||
|     //}
 | ||||
|     //delete[] events;
 | ||||
|     //std::osyncstream(log::info) << "epoll() thread exited" << std::endl;
 | ||||
|     log::info << "Stopping worker thread " << threadno << std::endl; | ||||
| } | ||||
| 
 | ||||
| void event_loop::listener_thread_loop(config::http_config& http_config) | ||||
|  | @ -129,9 +108,8 @@ void event_loop::listener_thread_loop(config::http_config& http_config) | |||
|         socket = new socket::anthracite_socket(http_ptr->port()); | ||||
|     } | ||||
| 
 | ||||
|     std::osyncstream(log::info) << "Listening for " << (is_tls ? "HTTPS" : "HTTP") << " connections on port " << http_ptr->port() << " on pid " << getpid() << std::endl; | ||||
|     std::osyncstream(log::info) << "Listening for " << (is_tls ? "HTTPS" : "HTTP") << " connections on port " << http_ptr->port() << std::endl; | ||||
| 
 | ||||
|     int assign_thread = 0; | ||||
|     while (_run) { | ||||
|         if (socket->wait_for_conn()) { | ||||
|             socket::anthracite_socket* client_sock; | ||||
|  | @ -144,10 +122,9 @@ void event_loop::listener_thread_loop(config::http_config& http_config) | |||
|             } | ||||
| 
 | ||||
|             struct epoll_event event; | ||||
|             event.events = EPOLLIN; | ||||
|             event.events = EPOLLIN | EPOLLEXCLUSIVE;// | EPOLLET;
 | ||||
|             event.data.ptr = client_sock; | ||||
|             epoll_ctl(_epoll_fds[assign_thread], EPOLL_CTL_ADD, client_sock->csock(), &event); | ||||
|             assign_thread = (assign_thread + 1) % _epoll_fds.size(); | ||||
|             epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, client_sock->csock(), &event); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|  | @ -158,15 +135,16 @@ void event_loop::listener_thread_loop(config::http_config& http_config) | |||
| 
 | ||||
| void event_loop::start() | ||||
| { | ||||
|     signal(SIGPIPE, SIG_IGN); | ||||
|     log::info << "Starting event_loop Thread Manager" << std::endl; | ||||
| 
 | ||||
|     _run = true; | ||||
|     _epoll_fd = epoll_create(1); | ||||
| 
 | ||||
|     std::vector<std::thread> listener_threads; | ||||
|     std::vector<std::thread> worker_threads; | ||||
| 
 | ||||
|     for (int i = 0; i < _config.worker_threads(); i++) { | ||||
|         _epoll_fds.push_back(epoll_create(1)); | ||||
|         auto thread = std::thread(&event_loop::worker_thread_loop, this, i); | ||||
|         worker_threads.push_back(std::move(thread)); | ||||
|     } | ||||
|  | @ -181,11 +159,6 @@ void event_loop::start() | |||
|         listener_threads.push_back(std::move(thread)); | ||||
|     } | ||||
| 
 | ||||
|     //{
 | ||||
|     //    auto thread = std::thread(&event_loop::eventer_thread_loop, this);
 | ||||
|     //    listener_threads.push_back(std::move(thread));
 | ||||
|     //}
 | ||||
| 
 | ||||
|     for (std::thread& t : worker_threads) { | ||||
|         t.join(); | ||||
|     } | ||||
|  | @ -198,7 +171,5 @@ void event_loop::start() | |||
| void event_loop::stop() | ||||
| { | ||||
|     _run = false; | ||||
|     std::lock_guard<std::mutex> lg(_event_mtx); | ||||
|     _event_cv.notify_all(); | ||||
| } | ||||
| } | ||||
|  |  | |||
|  | @ -17,11 +17,10 @@ namespace anthracite::thread_mgr { | |||
|                     std::chrono::time_point<std::chrono::high_resolution_clock>& timestamp();     | ||||
|             }; | ||||
| 
 | ||||
|             std::vector<int> _epoll_fds; | ||||
|             int _epoll_fd; | ||||
|             std::mutex _event_mtx; | ||||
|             std::condition_variable _event_cv; | ||||
|             std::queue<event> _events; | ||||
|             backends::file_backend _error_backend; | ||||
|             bool _nonblocking; | ||||
| 
 | ||||
|             void worker_thread_loop(int threadno); | ||||
|             void listener_thread_loop(config::http_config& http_config); | ||||
|  |  | |||
|  | @ -10,7 +10,7 @@ std::shared_ptr<anthracite::thread_mgr::thread_mgr> server = nullptr; | |||
| 
 | ||||
| extern "C" void signalHandler(int signum) | ||||
| { | ||||
|     anthracite::log::warn << "Caught signal SIG" << sigabbrev_np(signum) << ", exiting Anthracite" << std::endl; | ||||
|     //anthracite::log::warn << "Caught signal SIG" << sigabbrev_np(signum) << ", exiting Anthracite" << std::endl;
 | ||||
|     if (server != nullptr) { | ||||
|         server->stop(); | ||||
|     } | ||||
|  | @ -23,7 +23,7 @@ int main(int argc, char** argv) | |||
|     signal(SIGINT, signalHandler); | ||||
| 
 | ||||
|     anthracite::backends::file_backend fb("./www"); | ||||
|     anthracite::config::config cfg(3, 1000); | ||||
|     anthracite::config::config cfg(1, 10); | ||||
|     cfg.add_http_config(anthracite::config::http_config(8080)); | ||||
|     // cfg.add_https_config(config::https_config(8081, "", ""));
 | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue