Revision | 7771 (tree) |
---|---|
Time | 2019-06-15 15:42:23 |
Author | (del#24082) |
ポート転送でSSH通信が遅い場合において、Tera Term(TTSSH)の消費メモリが
肥大化して、アプリが落ちる問題を修正した。
チケット #39297
local channelからのパケット読み込み時、SSHサーバへの送信でremote_windowに
空きがない場合は、バッファにパケットを記録するが、フロー制御を追加して際限なく
バッファに溜め込まないようにした。
branches/portfwd_memleak ブランチからのマージ。
@@ -679,9 +679,21 @@ | ||
679 | 679 | |
680 | 680 | while (channel->local_socket != INVALID_SOCKET) { |
681 | 681 | char buf[CHANNEL_READ_BUF_SIZE]; |
682 | - int amount = recv(channel->local_socket, buf, sizeof(buf), 0); | |
682 | + int amount; | |
683 | 683 | int err; |
684 | 684 | |
685 | + // recvの一時停止中ならば、何もせずに戻る。 | |
686 | + if (SSHv2(pvar)) { | |
687 | + Channel_t* c = ssh2_local_channel_lookup(channel_num); | |
688 | + if (c->bufchain_recv_suspended) { | |
689 | + logprintf(LOG_LEVEL_NOTICE, "%s: channel=%d recv was skipped for flow control", | |
690 | + __FUNCTION__, channel_num); | |
691 | + return; | |
692 | + } | |
693 | + } | |
694 | + | |
695 | + amount = recv(channel->local_socket, buf, sizeof(buf), 0); | |
696 | + | |
685 | 697 | // Xサーバからのデータ受信があれば、ノンブロッキングモードでソケット受信を行い、 |
686 | 698 | // SSHサーバのXアプリケーションへ送信する。 |
687 | 699 | //OutputDebugPrintf("%s: recv %d\n", __FUNCTION__, amount); |
@@ -740,6 +752,63 @@ | ||
740 | 752 | } |
741 | 753 | } |
742 | 754 | |
755 | +// local connectionの受信の停止および再開の判断を行う | |
756 | +// | |
757 | +// notify: TRUE recvを再開する | |
758 | +// FALSE recvを停止する | |
759 | +// | |
760 | +// [目的] | |
761 | +// remote_windowに空きがない場合は通知オフとし、空きができた場合は | |
762 | +// 通知を再開する。 | |
763 | +// remote_windowに余裕がない状態で、local connectionからのパケットを | |
764 | +// 受信し続けると、消費メモリが肥大化する(厳密にはメモリリークではない) | |
765 | +// という問題を回避する。 | |
766 | +// | |
767 | +// (2019.6.5 yutaka) | |
768 | +void FWD_suspend_resume_local_connection(PTInstVar pvar, Channel_t* c, int notify) | |
769 | +{ | |
770 | + int channel_num; | |
771 | + FWDChannel* channel; | |
772 | + int changed = 0; | |
773 | + | |
774 | + channel_num = c->local_num; | |
775 | + channel = pvar->fwd_state.channels + channel_num; | |
776 | + | |
777 | + if (notify) { | |
778 | + // recvを再開するか判断する | |
779 | + if (c->bufchain_amount <= FWD_LOW_WATER_MARK) { | |
780 | + // 下限を下回ったので再開 | |
781 | + c->bufchain_recv_suspended = FALSE; | |
782 | + | |
783 | + // ここで再開のメッセージを飛ばす | |
784 | + PostMessage(pvar->fwd_state.accept_wnd, WM_SOCK_IO, | |
785 | + (WPARAM)channel->local_socket, | |
786 | + MAKEWPARAM(FD_READ, 0) | |
787 | + ); | |
788 | + | |
789 | + changed = 1; | |
790 | + } | |
791 | + | |
792 | + } else { | |
793 | + // recvを停止するか判断する | |
794 | + if (c->bufchain_amount >= FWD_HIGH_WATER_MARK) { | |
795 | + // 上限を超えたので停止 | |
796 | + c->bufchain_recv_suspended = TRUE; | |
797 | + changed = 1; | |
798 | + } | |
799 | + } | |
800 | + | |
801 | + logprintf(LOG_LEVEL_NOTICE, | |
802 | + "%s: Local channel#%d recv has been `%s' for flow control(buffer size %lu, recv %s).", | |
803 | + __FUNCTION__, channel_num, | |
804 | + c->bufchain_recv_suspended ? "disabled" : "enabled", | |
805 | + c->bufchain_amount, | |
806 | + changed ? "changed" : "" | |
807 | + ); | |
808 | + | |
809 | +} | |
810 | + | |
811 | + | |
743 | 812 | static LRESULT CALLBACK accept_wnd_proc(HWND wnd, UINT msg, WPARAM wParam, |
744 | 813 | LPARAM lParam) |
745 | 814 | { |
@@ -35,6 +35,11 @@ | ||
35 | 35 | #ifndef __FWD_H |
36 | 36 | #define __FWD_H |
37 | 37 | |
38 | +// ポート転送におけるフロー制御の閾値 | |
39 | +// 適用先 Channel_t.bufchain_amount | |
40 | +#define FWD_HIGH_WATER_MARK (1 * 1024 * 1024) // 1MB | |
41 | +#define FWD_LOW_WATER_MARK (0) // 0MB | |
42 | + | |
38 | 43 | #define FWD_REMOTE_CONNECTED 0x01 |
39 | 44 | #define FWD_LOCAL_CONNECTED 0x02 |
40 | 45 | #define FWD_BOTH_CONNECTED (FWD_REMOTE_CONNECTED | FWD_LOCAL_CONNECTED) |
@@ -164,5 +169,6 @@ | ||
164 | 169 | int FWD_check_local_channel_num(PTInstVar pvar, int local_num); |
165 | 170 | int FWD_agent_open(PTInstVar pvar, uint32 remote_channel_num); |
166 | 171 | BOOL FWD_agent_forward_confirm(PTInstVar pvar); |
172 | +void FWD_suspend_resume_local_connection(PTInstVar pvar, Channel_t* c, int notify); | |
167 | 173 | |
168 | 174 | #endif |
@@ -213,6 +213,8 @@ | ||
213 | 213 | c->type = type; |
214 | 214 | c->local_num = local_num; // alloc_channel()の返値を保存しておく |
215 | 215 | c->bufchain = NULL; |
216 | + c->bufchain_amount = 0; | |
217 | + c->bufchain_recv_suspended = FALSE; | |
216 | 218 | if (type == TYPE_SCP) { |
217 | 219 | c->scp.state = SCP_INIT; |
218 | 220 | c->scp.progress_window = NULL; |
@@ -231,7 +233,8 @@ | ||
231 | 233 | } |
232 | 234 | |
233 | 235 | // remote_windowの空きがない場合に、送れなかったバッファをリスト(入力順)へつないでおく。 |
234 | -static void ssh2_channel_add_bufchain(Channel_t *c, unsigned char *buf, unsigned int buflen) | |
236 | +// ここで確保したメモリは ssh2_channel_retry_send_bufchain() で解放する。 | |
237 | +static void ssh2_channel_add_bufchain(PTInstVar pvar, Channel_t *c, unsigned char *buf, unsigned int buflen) | |
235 | 238 | { |
236 | 239 | bufchain_t *p, *old; |
237 | 240 |
@@ -255,12 +258,22 @@ | ||
255 | 258 | old = old->next; |
256 | 259 | old->next = p; |
257 | 260 | } |
261 | + | |
262 | + // バッファサイズの合計を更新する(記録用) | |
263 | + c->bufchain_amount += buflen; | |
264 | + | |
265 | + // remote_windowの空きがないので、local connectionからのパケット受信の | |
266 | + // 停止指示を出す。すぐに通知が止まるわけではない。 | |
267 | + FWD_suspend_resume_local_connection(pvar, c, FALSE); | |
258 | 268 | } |
259 | 269 | |
270 | +// remote_windowの空きができたら、リストに残っているデータを順番に送る。 | |
271 | +// 送信ができたらメモリを解放する。 | |
260 | 272 | static void ssh2_channel_retry_send_bufchain(PTInstVar pvar, Channel_t *c) |
261 | 273 | { |
262 | 274 | bufchain_t *ch; |
263 | 275 | unsigned int size; |
276 | + bufchain_t* ch_origin = c->bufchain; | |
264 | 277 | |
265 | 278 | while (c->bufchain) { |
266 | 279 | // 先頭から先に送る |
@@ -279,7 +292,16 @@ | ||
279 | 292 | |
280 | 293 | buffer_free(ch->msg); |
281 | 294 | free(ch); |
295 | + | |
296 | + // バッファサイズの合計を更新する(記録用) | |
297 | + c->bufchain_amount -= size; | |
282 | 298 | } |
299 | + | |
300 | + // 元々あったリストが空になったら、 | |
301 | + // local connectionからのパケット通知を再開する。 | |
302 | + if (ch_origin && c->bufchain == NULL) { | |
303 | + FWD_suspend_resume_local_connection(pvar, c, TRUE); | |
304 | + } | |
283 | 305 | } |
284 | 306 | |
285 | 307 | // channel close時にチャネル構造体をリストへ返却する |
@@ -370,7 +392,7 @@ | ||
370 | 392 | // SSH1で管理しているchannel構造体から、SSH2向けのChannel_tへ変換する。 |
371 | 393 | // TODO: 将来的にはチャネル構造体は1つに統合する。 |
372 | 394 | // (2005.6.12 yutaka) |
373 | -static Channel_t *ssh2_local_channel_lookup(int local_num) | |
395 | +Channel_t *ssh2_local_channel_lookup(int local_num) | |
374 | 396 | { |
375 | 397 | int i; |
376 | 398 | Channel_t *c; |
@@ -3441,7 +3463,7 @@ | ||
3441 | 3463 | // これによりパケットが壊れたように見える現象が改善される。 |
3442 | 3464 | // (2012.10.14 yutaka) |
3443 | 3465 | if (retry == 0 && c->bufchain) { |
3444 | - ssh2_channel_add_bufchain(c, buf, buflen); | |
3466 | + ssh2_channel_add_bufchain(pvar, c, buf, buflen); | |
3445 | 3467 | return; |
3446 | 3468 | } |
3447 | 3469 |
@@ -3448,7 +3470,7 @@ | ||
3448 | 3470 | if ((unsigned int)buflen > c->remote_window) { |
3449 | 3471 | unsigned int offset = 0; |
3450 | 3472 | // 送れないデータはいったん保存しておく |
3451 | - ssh2_channel_add_bufchain(c, buf + offset, buflen - offset); | |
3473 | + ssh2_channel_add_bufchain(pvar, c, buf + offset, buflen - offset); | |
3452 | 3474 | buflen = offset; |
3453 | 3475 | return; |
3454 | 3476 | } |
@@ -881,6 +881,8 @@ | ||
881 | 881 | enum channel_type type; |
882 | 882 | int local_num; |
883 | 883 | bufchain_t *bufchain; |
884 | + unsigned long bufchain_amount; | |
885 | + BOOL bufchain_recv_suspended; | |
884 | 886 | scp_t scp; |
885 | 887 | buffer_t *agent_msg; |
886 | 888 | int agent_request_len; |
@@ -892,6 +894,7 @@ | ||
892 | 894 | unsigned char *begin_send_packet(PTInstVar pvar, int type, int len); |
893 | 895 | void finish_send_packet_special(PTInstVar pvar, int skip_compress); |
894 | 896 | void SSH2_send_channel_data(PTInstVar pvar, Channel_t *c, unsigned char *buf, unsigned int buflen, int retry); |
897 | +Channel_t* ssh2_local_channel_lookup(int local_num); | |
895 | 898 | |
896 | 899 | #define finish_send_packet(pvar) finish_send_packet_special((pvar), 0) |
897 | 900 | #define get_payload_uint32(pvar, offset) get_uint32_MSBfirst((pvar)->ssh_state.payload + (offset)) |