Server application performing load balancing and monitoring of nodejs applications
Revision | 8f6f3db608c8ec9d37b4cd147cd9e93ea2b95e05 (tree) |
---|---|
Time | 2020-12-21 23:13:38 |
Author | Vladimir Markin <v.o.markin221@gmai...> |
Commiter | Vladimir Markin |
Added processing of requests in memory leak protection mode
@@ -32,7 +32,6 @@ | ||
32 | 32 | |
33 | 33 | |
34 | 34 | namespace { |
35 | - | |
36 | 35 | std::filesystem::path get_temporary_path(){ |
37 | 36 | std::filesystem::path path = std::filesystem::temp_directory_path(); |
38 | 37 | std::stringstream ss; |
@@ -283,30 +282,8 @@ | ||
283 | 282 | |
284 | 283 | first_transaction = false; |
285 | 284 | |
286 | - // reading the residual message | |
287 | - full_pack = request_header_info.content_length + request_header_info.header_end + 4; | |
288 | - is_big_data = full_pack >= 1000000 || true; | |
289 | - if (!is_big_data) { | |
290 | - if (first_buffer_offset < full_pack) | |
291 | - residual_buffer.resize(full_pack - first_buffer_offset); | |
292 | - while (rec_total_size < full_pack) { | |
293 | - rec_total_size += a_socket->recv(&residual_buffer[rec_total_size - first_buffer_offset], full_pack - rec_total_size < MAX_PACK ? full_pack - rec_total_size : MAX_PACK); | |
294 | - if (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - start_request).count() > _config.maxRequestTimeout) | |
295 | - throw fcf::except::max_request_timeout_fi(a_socket->get_address(), a_socket->get_port(), request_header_info.host.c_str(), request_header_info.path.c_str()); | |
296 | - } | |
297 | - } else if (rec_total_size < full_pack){ | |
298 | - file_buffer.reset(new file_buffer_t()); | |
299 | - while (rec_total_size < full_pack) { | |
300 | - size_t size = a_socket->recv(&big_data_buffer[0], full_pack - rec_total_size < MAX_PACK ? full_pack - rec_total_size : MAX_PACK); | |
301 | - rec_total_size += size; | |
302 | - file_buffer->write(&big_data_buffer[0], size); | |
303 | - if (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - start_request).count() > _config.maxRequestTimeout) | |
304 | - throw fcf::except::max_request_timeout_fi(a_socket->get_address(), a_socket->get_port(), request_header_info.host.c_str(), request_header_info.path.c_str()); | |
305 | - } | |
306 | - } | |
307 | - | |
308 | 285 | // find handler configuration |
309 | - // width 0 - host and path | |
286 | + // width 0 - host and path | |
310 | 287 | // 1 - host and path (partially) (the 'path_cmp_weigth' variable is taken into account) |
311 | 288 | // 2 - host and path (none) |
312 | 289 | // 3 - path |
@@ -327,7 +304,7 @@ | ||
327 | 304 | bool cur_path_match = !cur_path_empty ? cmp_path_template(hconf.path.c_str(), request_header_info.host.c_str(), &cur_path_iseq, &cur_path_cmp_width) |
328 | 305 | : true; |
329 | 306 | |
330 | - if (!cur_host_empty && cur_host_iseq && | |
307 | + if (!cur_host_empty && cur_host_iseq && | |
331 | 308 | !cur_path_empty && cur_path_iseq) { |
332 | 309 | new_handler_index = hi; |
333 | 310 | width = 0; |
@@ -336,13 +313,13 @@ | ||
336 | 313 | } else if (width >= 1 && |
337 | 314 | !cur_host_empty && cur_host_iseq && |
338 | 315 | !cur_path_empty && cur_path_match && |
339 | - (width != 1 || path_cmp_weigth > cur_path_cmp_width)){ | |
316 | + (width != 1 || path_cmp_weigth > cur_path_cmp_width)) { | |
340 | 317 | new_handler_index = hi; |
341 | 318 | width = 1; |
342 | 319 | path_cmp_weigth = cur_path_cmp_width; |
343 | 320 | } else if (width > 2 && |
344 | 321 | !cur_host_empty && cur_host_iseq && |
345 | - cur_path_empty){ | |
322 | + cur_path_empty) { | |
346 | 323 | new_handler_index = hi; |
347 | 324 | width = 2; |
348 | 325 | path_cmp_weigth = 0; |
@@ -370,6 +347,32 @@ | ||
370 | 347 | if (new_handler_index == -1) |
371 | 348 | throw std::runtime_error(std::string(std::string("Missing request handler for host '") + request_header_info.host + "' with path '" + request_header_info.path + "'").c_str()); |
372 | 349 | |
350 | + // the definition of a method processing | |
351 | + _subprocesses[new_handler_index]->request_memory_leak_protection(request_header_info.path, request_header_info.context); | |
352 | + | |
353 | + | |
354 | + // reading the residual message | |
355 | + full_pack = request_header_info.content_length + request_header_info.header_end + 4; | |
356 | + is_big_data = full_pack >= 1000000 || true; | |
357 | + if (!is_big_data) { | |
358 | + if (first_buffer_offset < full_pack) | |
359 | + residual_buffer.resize(full_pack - first_buffer_offset); | |
360 | + while (rec_total_size < full_pack) { | |
361 | + rec_total_size += a_socket->recv(&residual_buffer[rec_total_size - first_buffer_offset], full_pack - rec_total_size < MAX_PACK ? full_pack - rec_total_size : MAX_PACK); | |
362 | + if (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - start_request).count() > _config.maxRequestTimeout) | |
363 | + throw fcf::except::max_request_timeout_fi(a_socket->get_address(), a_socket->get_port(), request_header_info.host.c_str(), request_header_info.path.c_str()); | |
364 | + } | |
365 | + } else if (rec_total_size < full_pack) { | |
366 | + file_buffer.reset(new file_buffer_t()); | |
367 | + while (rec_total_size < full_pack) { | |
368 | + size_t size = a_socket->recv(&big_data_buffer[0], full_pack - rec_total_size < MAX_PACK ? full_pack - rec_total_size : MAX_PACK); | |
369 | + rec_total_size += size; | |
370 | + file_buffer->write(&big_data_buffer[0], size); | |
371 | + if (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - start_request).count() > _config.maxRequestTimeout) | |
372 | + throw fcf::except::max_request_timeout_fi(a_socket->get_address(), a_socket->get_port(), request_header_info.host.c_str(), request_header_info.path.c_str()); | |
373 | + } | |
374 | + } | |
375 | + | |
373 | 376 | // reconnect to process |
374 | 377 | handler_configuration_t& handler_config(_config.handlers[new_handler_index]); |
375 | 378 |
@@ -397,7 +400,7 @@ | ||
397 | 400 | // send residual message to process |
398 | 401 | offset = 0; |
399 | 402 | if (!is_big_data) { |
400 | - //residual_buffer. | |
403 | + //residual_buffer | |
401 | 404 | while((offset + first_buffer_offset) < full_pack) { |
402 | 405 | offset += process_socket->send(&residual_buffer[offset], (full_pack - first_buffer_offset) - offset); |
403 | 406 | if (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - start_request).count() > _config.maxRequestTimeout) |
@@ -530,6 +533,7 @@ | ||
530 | 533 | |
531 | 534 | if (process_index != -1 && !completed) |
532 | 535 | _subprocesses[handler_index]->release_process(process_index); |
536 | + | |
533 | 537 | } |
534 | 538 | |
535 | 539 | template<typename TimePoint> |
@@ -44,6 +44,17 @@ | ||
44 | 44 | std::string content_length = get_http_param(a_data, a_dst->header_end, "content-length:"); |
45 | 45 | a_dst->content_length = content_length.empty() ? 0 : std::stoull(content_length); |
46 | 46 | |
47 | + std::string cookie = get_http_param(a_data, a_dst->header_end, "cookie:"); | |
48 | + size_t sbeg = cookie.find("___fcf___context="); | |
49 | + if (sbeg != std::string::npos) { | |
50 | + sbeg += 17; | |
51 | + size_t send = cookie.find(";", sbeg); | |
52 | + a_dst->context = cookie.substr(sbeg, send != std::string::npos ? send - sbeg : cookie.length() - sbeg); | |
53 | + } | |
54 | + | |
55 | + if (a_dst->context.empty()) | |
56 | + a_dst->context = get_http_param(a_data, a_dst->header_end, "fcf-context:"); | |
57 | + | |
47 | 58 | a_dst->host = get_http_param(a_data, a_dst->header_end, "host:"); |
48 | 59 | return true; |
49 | 60 | } |
@@ -12,6 +12,8 @@ | ||
12 | 12 | std::string path; |
13 | 13 | std::string host; |
14 | 14 | std::string connection; |
15 | + std::string context; | |
16 | + | |
15 | 17 | }; |
16 | 18 | |
17 | 19 | std::string get_http_param(const char* a_data, int a_end, const char* a_param); |
@@ -15,15 +15,41 @@ | ||
15 | 15 | #include "subprocess.hpp" |
16 | 16 | |
17 | 17 | namespace application { |
18 | + | |
19 | + enum cntrl_package_type_e { | |
20 | + CPM_EVENT, | |
21 | + CPM_HANDLER_TYPE, | |
22 | + }; | |
23 | + | |
18 | 24 | namespace { |
19 | - inline std::string to_ctrl_package(boost::property_tree::ptree& a_ptree){ | |
20 | - std::string data; | |
21 | - std::stringstream ss; | |
22 | - boost::property_tree::write_json(ss, a_ptree, false); | |
23 | - data += std::to_string(ss.str().length()); | |
24 | - data += "\n"; | |
25 | - data += ss.str(); | |
26 | - return data; | |
25 | + inline std::string to_ctrl_package(boost::property_tree::ptree& a_ptree, cntrl_package_type_e a_event_type = CPM_EVENT){ | |
26 | + switch (a_event_type) { | |
27 | + case CPM_EVENT: { | |
28 | + boost::property_tree::ptree root; | |
29 | + root.put("type", "event"); | |
30 | + root.add_child("data", a_ptree); | |
31 | + std::string data; | |
32 | + std::stringstream ss; | |
33 | + boost::property_tree::write_json(ss, root, false); | |
34 | + data += std::to_string(ss.str().length()); | |
35 | + data += "\n"; | |
36 | + data += ss.str(); | |
37 | + return data; | |
38 | + } | |
39 | + case CPM_HANDLER_TYPE: { | |
40 | + boost::property_tree::ptree root; | |
41 | + root.put("type", "check_memory_leak"); | |
42 | + root.add_child("data", a_ptree); | |
43 | + std::string data; | |
44 | + std::stringstream ss; | |
45 | + boost::property_tree::write_json(ss, root, false); | |
46 | + data += std::to_string(ss.str().length()); | |
47 | + data += "\n"; | |
48 | + data += ss.str(); | |
49 | + return data; | |
50 | + } | |
51 | + } | |
52 | + return std::string(); | |
27 | 53 | } |
28 | 54 | } // non namespace |
29 | 55 |
@@ -111,6 +137,29 @@ | ||
111 | 137 | _restart_flag = true; |
112 | 138 | } |
113 | 139 | |
140 | +bool subprocesses_t::request_memory_leak_protection(std::string a_path, std::string a_context) { | |
141 | + bool result = true; | |
142 | + int process_index = get_process(); | |
143 | + try{ | |
144 | + boost::property_tree::ptree ptree; | |
145 | + ptree.put("path", a_path); | |
146 | + ptree.put("context", a_context); | |
147 | + std::string data(to_ctrl_package(ptree, CPM_HANDLER_TYPE)); | |
148 | + fcf::net::socket_sptr_t socket(fcf::net::create_socket("127.0.0.1", _processes[process_index].control_port)); | |
149 | + socket->set_timeout(5); | |
150 | + socket->connect(); | |
151 | + socket->send_all(&data[0], (int)data.length()); | |
152 | + char buffer[16]; | |
153 | + buffer[15] = 0; | |
154 | + socket->recv(&buffer[0], sizeof(buffer)); | |
155 | + result = std::strcmp(buffer, "true") == 0; | |
156 | + release_process(process_index); | |
157 | + } catch(std::exception){ | |
158 | + release_process(process_index); | |
159 | + } | |
160 | + return result; | |
161 | +} | |
162 | + | |
114 | 163 | void subprocesses_t::_stop_process(int a_index, bool a_hide_errors){ // no exceptions |
115 | 164 | if (!_processes[a_index].pid) |
116 | 165 | return; |
@@ -81,6 +81,7 @@ | ||
81 | 81 | void release_process(int a_index); |
82 | 82 | bool is_active(int a_procNum); |
83 | 83 | void restart(); |
84 | + bool request_memory_leak_protection(std::string a_path, std::string a_context); | |
84 | 85 | private: |
85 | 86 | void _stop_process(int a_index, bool a_hide_errors); // no exceptions |
86 | 87 | void _restart_process(int a_index, bool a_restart); |