FLVなど sext styp のついた配信のリレーができない問題を修正するパッチ
Revision | 754f64d48512602cca64ba9a11c75bba1c548eee (tree) |
---|---|
Time | 2008-01-22 20:18:29 |
Author | eru <eru01@user...> |
Commiter | eru |
バグ修正コード(VPdiff_27_20080120)マージ
バグ修正コード(VPdiff_27_20080120)をマージ
@@ -608,6 +608,10 @@ void PeercastSource::stream(Channel *ch) | ||
608 | 608 | bool next_yp = false; |
609 | 609 | bool tracker_check = (ch->trackerHit.host.ip != 0); |
610 | 610 | int connFailCnt = 0; |
611 | + int keepDownstreamTime = 7; | |
612 | + | |
613 | + if (isIndexTxt(&ch->info)) | |
614 | + keepDownstreamTime = 30; | |
611 | 615 | |
612 | 616 | ch->lastStopTime = 0; |
613 | 617 | ch->bumped = false; |
@@ -720,7 +724,9 @@ void PeercastSource::stream(Channel *ch) | ||
720 | 724 | |
721 | 725 | chanMgr->hitlistlock.off(); |
722 | 726 | |
723 | - if (servMgr->keepDownstreams && ch->lastStopTime && ch->lastStopTime < sys->getTime() - 7) { | |
727 | + if (servMgr->keepDownstreams && ch->lastStopTime | |
728 | + && ch->lastStopTime < sys->getTime() - keepDownstreamTime) | |
729 | + { | |
724 | 730 | ch->lastStopTime = 0; |
725 | 731 | LOG_DEBUG("------------ disconnect all downstreams"); |
726 | 732 | ChanPacket pack; |
@@ -902,7 +908,7 @@ yp0: | ||
902 | 908 | ch->trackerHit.lastContact = ctime - 30 + (rand() % 30); |
903 | 909 | |
904 | 910 | // broadcast source host |
905 | - if (!error && ch->sourceHost.host.ip) { // if closed normally | |
911 | + if (!got503 && !error && ch->sourceHost.host.ip) { // if closed normally | |
906 | 912 | ChanPacket pack; |
907 | 913 | MemoryStream mem(pack.data,sizeof(pack.data)); |
908 | 914 | AtomStream atom(mem); |
@@ -916,7 +922,7 @@ yp0: | ||
916 | 922 | } |
917 | 923 | |
918 | 924 | // broadcast quit to any connected downstream servents |
919 | - if (!servMgr->keepDownstreams || (ch->sourceHost.tracker && !got503) || !error) { | |
925 | + if (!servMgr->keepDownstreams || !got503 && (ch->sourceHost.tracker || !error)) { | |
920 | 926 | ChanPacket pack; |
921 | 927 | MemoryStream mem(pack.data,sizeof(pack.data)); |
922 | 928 | AtomStream atom(mem); |
@@ -1385,6 +1391,17 @@ bool ChannelStream::getStatus(Channel *ch,ChanPacket &pack) | ||
1385 | 1391 | { |
1386 | 1392 | unsigned int ctime = sys->getTime(); |
1387 | 1393 | |
1394 | + if ((ch->isPlaying() == isPlaying)){ | |
1395 | + if ((ctime-lastUpdate) < 10){ | |
1396 | + return false; | |
1397 | + } | |
1398 | + | |
1399 | + if ((ctime-lastCheckTime) < 5){ | |
1400 | + return false; | |
1401 | + } | |
1402 | + lastCheckTime = ctime; | |
1403 | + } | |
1404 | + | |
1388 | 1405 | ChanHitList *chl = chanMgr->findHitListByID(ch->info.id); |
1389 | 1406 | |
1390 | 1407 | if (!chl) |
@@ -1419,17 +1436,6 @@ bool ChannelStream::getStatus(Channel *ch,ChanPacket &pack) | ||
1419 | 1436 | int newLocalListeners = ch->localListeners(); |
1420 | 1437 | int newLocalRelays = ch->localRelays(); |
1421 | 1438 | |
1422 | - if ((ch->isPlaying() == isPlaying)){ | |
1423 | - if ((ctime-lastUpdate) < 10){ | |
1424 | - return false; | |
1425 | - } | |
1426 | - | |
1427 | - if ((ctime-lastCheckTime) < 10){ | |
1428 | - return false; | |
1429 | - } | |
1430 | - lastCheckTime = ctime; | |
1431 | - } | |
1432 | - | |
1433 | 1439 | unsigned int oldp = ch->rawData.getOldestPos(); |
1434 | 1440 | unsigned int newp = ch->rawData.getLatestPos(); |
1435 | 1441 |
@@ -1611,9 +1617,11 @@ int Channel::readStream(Stream &in,ChannelStream *source) | ||
1611 | 1617 | setStatus(Channel::S_RECEIVING); |
1612 | 1618 | bumped = false; |
1613 | 1619 | } |
1614 | - source->updateStatus(this); | |
1620 | + //source->updateStatus(this); | |
1615 | 1621 | } |
1616 | 1622 | } |
1623 | + if (rawData.lastWriteTime > 0 || rawData.lastSkipTime > 0) | |
1624 | + source->updateStatus(this); | |
1617 | 1625 | |
1618 | 1626 | unsigned int t = sys->getTime(); |
1619 | 1627 | if (t != ptime) { |
@@ -1886,9 +1894,11 @@ bool ChanPacketBuffer::findPacket(unsigned int spos, ChanPacket &pack) | ||
1886 | 1894 | lock.on(); |
1887 | 1895 | |
1888 | 1896 | unsigned int bound = packets[0].len * ChanPacketBuffer::MAX_PACKETS * 2; // max packets to wait |
1889 | - unsigned int fpos = getStreamPos(firstPos); | |
1890 | - unsigned int lpos = getStreamPos(lastPos); | |
1891 | - if (spos < fpos && (fpos < lpos || spos > lpos + bound)) | |
1897 | + unsigned int fpos = getFirstDataPos(); | |
1898 | + unsigned int lpos = getLatestPos(); | |
1899 | + if ((spos < fpos && fpos <= lpos && spos != getStreamPosEnd(lastPos)) // --s-----f---l-- | |
1900 | + || (spos < fpos && lpos < fpos && spos > lpos + bound) // -l-------s--f-- | |
1901 | + || (spos > lpos && lpos >= fpos && spos - lpos > bound)) // --f---l------s- | |
1892 | 1902 | spos = fpos; |
1893 | 1903 | |
1894 | 1904 |
@@ -1917,6 +1927,18 @@ unsigned int ChanPacketBuffer::getLatestPos() | ||
1917 | 1927 | |
1918 | 1928 | } |
1919 | 1929 | // ----------------------------------- |
1930 | +unsigned int ChanPacketBuffer::getFirstDataPos() | |
1931 | +{ | |
1932 | + if (!writePos) | |
1933 | + return 0; | |
1934 | + for(unsigned int i=firstPos; i<=lastPos; i++) | |
1935 | + { | |
1936 | + if (packets[i%MAX_PACKETS].type == ChanPacket::T_DATA) | |
1937 | + return packets[i%MAX_PACKETS].pos; | |
1938 | + } | |
1939 | + return 0; | |
1940 | +} | |
1941 | +// ----------------------------------- | |
1920 | 1942 | unsigned int ChanPacketBuffer::getOldestPos() |
1921 | 1943 | { |
1922 | 1944 | if (!writePos) |
@@ -3897,6 +3919,15 @@ int ChanHitList::pickSourceHits(ChanHitSearch &chs) | ||
3897 | 3919 | return 0; |
3898 | 3920 | } |
3899 | 3921 | |
3922 | +// ----------------------------------- | |
3923 | +unsigned int ChanHitList::getSeq() | |
3924 | +{ | |
3925 | + unsigned int seq; | |
3926 | + seqLock.on(); | |
3927 | + seq = riSequence = (riSequence + 1) & 0xffffff; | |
3928 | + seqLock.off(); | |
3929 | + return seq; | |
3930 | +} | |
3900 | 3931 | |
3901 | 3932 | // ----------------------------------- |
3902 | 3933 | const char *ChanInfo::getTypeStr(TYPE t) |
@@ -4703,40 +4734,18 @@ int ChanHitSearch::getRelayHost(Host host1, Host host2, GnuID exID, ChanHitList | ||
4703 | 4734 | int index = 0; |
4704 | 4735 | int prob; |
4705 | 4736 | int rnd; |
4706 | - static int base = 0x400; | |
4737 | + int base = 0x400; | |
4707 | 4738 | ChanHit tmpHit[MAX_RESULTS]; |
4708 | - static WLock seqLock; | |
4709 | - static unsigned int riSequence = 0; | |
4710 | 4739 | |
4711 | 4740 | //srand(seed); |
4712 | 4741 | //seed += 11; |
4713 | 4742 | |
4714 | - unsigned int seq; | |
4715 | - seqLock.on(); | |
4716 | - seq = riSequence++; | |
4717 | - riSequence &= 0xffffff; | |
4718 | - seqLock.off(); | |
4719 | - | |
4720 | - Servent *s = servMgr->servents; | |
4721 | - while (s) { | |
4722 | - if (s->serventHit.rhost[0].port && s->type == Servent::T_RELAY | |
4723 | - && s->chanID.isSame(chl->info.id)) { | |
4724 | - int i = index % MAX_RESULTS; | |
4725 | - if (index < MAX_RESULTS | |
4726 | - || tmpHit[i].lastSendSeq > s->serventHit.lastSendSeq) { | |
4727 | - s->serventHit.lastSendSeq = seq; | |
4728 | - tmpHit[i] = s->serventHit; | |
4729 | - tmpHit[i].host = s->serventHit.rhost[0]; | |
4730 | - index++; | |
4731 | - } | |
4732 | - } | |
4733 | - s = s->next; | |
4734 | - } | |
4743 | + unsigned int seq = chl->getSeq(); | |
4735 | 4744 | |
4736 | 4745 | ChanHit *hit = chl->hit; |
4737 | 4746 | |
4738 | 4747 | while(hit){ |
4739 | - if (hit->host.ip && !hit->dead){ | |
4748 | + if (hit->rhost[0].ip && !hit->dead) { | |
4740 | 4749 | if ( |
4741 | 4750 | (!exID.isSame(hit->sessionID)) |
4742 | 4751 | // && (hit->relay) |
@@ -4764,7 +4773,6 @@ int ChanHitSearch::getRelayHost(Host host1, Host host2, GnuID exID, ChanHitList | ||
4764 | 4773 | //rnd = (float)rand() / (float)RAND_MAX; |
4765 | 4774 | rnd = rand() % base; |
4766 | 4775 | if (hit->numHops == 1){ |
4767 | -#if 0 | |
4768 | 4776 | if (tmpHit[index % MAX_RESULTS].numHops == 1){ |
4769 | 4777 | if (rnd < prob){ |
4770 | 4778 | tmpHit[index % MAX_RESULTS] = *hit; |
@@ -4776,9 +4784,8 @@ int ChanHitSearch::getRelayHost(Host host1, Host host2, GnuID exID, ChanHitList | ||
4776 | 4784 | tmpHit[index % MAX_RESULTS].host = hit->rhost[0]; |
4777 | 4785 | index++; |
4778 | 4786 | } |
4779 | -#endif | |
4780 | 4787 | } else { |
4781 | - if ((tmpHit[index % MAX_RESULTS].numHops != 1) && (rnd < prob) || rnd == 0){ | |
4788 | + if ((tmpHit[index % MAX_RESULTS].numHops != 1) && (rnd < prob)){ | |
4782 | 4789 | tmpHit[index % MAX_RESULTS] = *hit; |
4783 | 4790 | tmpHit[index % MAX_RESULTS].host = hit->rhost[0]; |
4784 | 4791 | index++; |
@@ -4822,15 +4829,9 @@ int ChanHitSearch::getRelayHost(Host host1, Host host2, GnuID exID, ChanHitList | ||
4822 | 4829 | best[use[i]] = tmpHit[i]; |
4823 | 4830 | }*/ |
4824 | 4831 | |
4825 | - int use[MAX_RESULTS]; | |
4826 | - int i; | |
4827 | - for (i = 0; i < cnt; i++) { | |
4828 | - use[i] = (i + seq) % cnt; | |
4829 | - } | |
4830 | - | |
4831 | - for (i = 0; i < cnt; i++){ | |
4832 | + for (int i = 0; i < cnt; i++){ | |
4832 | 4833 | // LOG_DEBUG("%d", use[i]); |
4833 | - best[use[i]] = tmpHit[i]; | |
4834 | + best[(i + seq) % cnt] = tmpHit[i]; | |
4834 | 4835 | } |
4835 | 4836 | // for (i = 0; i < cnt; i++){ |
4836 | 4837 | // char tmp[50]; |
@@ -249,13 +249,16 @@ public: | ||
249 | 249 | int getTotalRelays(); |
250 | 250 | int getTotalFirewalled(); |
251 | 251 | |
252 | + unsigned int getSeq(); | |
253 | + | |
252 | 254 | bool used; |
253 | 255 | ChanInfo info; |
254 | 256 | ChanHit *hit; |
255 | 257 | unsigned int lastHitTime; |
256 | 258 | ChanHitList *next; |
257 | 259 | |
258 | - | |
260 | + WLock seqLock; | |
261 | + unsigned int riSequence; | |
259 | 262 | }; |
260 | 263 | // ---------------------------------- |
261 | 264 | class ChanHitSearch |
@@ -187,6 +187,7 @@ public: | ||
187 | 187 | |
188 | 188 | int numPending() {return writePos-readPos;} |
189 | 189 | |
190 | + unsigned int getFirstDataPos(); | |
190 | 191 | unsigned int getLatestPos(); |
191 | 192 | unsigned int getOldestPos(); |
192 | 193 | unsigned int findOldestPos(unsigned int); |
@@ -547,7 +547,7 @@ void PCPStream::readHostAtoms(AtomStream &atom, int numc, BroadcastState &bcs, C | ||
547 | 547 | if (sv && sv->getHost().ip == hit.host.ip){ |
548 | 548 | // LOG_DEBUG("set servent's waitPort = %d", hit.host.port); |
549 | 549 | sv->waitPort = hit.host.port; |
550 | - hit.lastSendSeq = sv->serventHit.lastSendSeq; | |
550 | + //hit.lastSendSeq = sv->serventHit.lastSendSeq; | |
551 | 551 | sv->serventHit = hit; |
552 | 552 | } |
553 | 553 | } |
@@ -761,7 +761,7 @@ int PCPStream::readBroadcastAtoms(AtomStream &atom,int numc,BroadcastState &bcs) | ||
761 | 761 | ((hit.numHops == 1 && (hit.rhost[0].ip == sv->getHost().ip |
762 | 762 | && hit.uphost.ip == servMgr->serverHost.ip && hit.uphost.port == servMgr->serverHost.port) |
763 | 763 | || (hit.rhost[1].localIP() && hit.rhost[1].ip == sv->getHost().ip)) |
764 | - || chanMgr->findParentHit(hit))) | |
764 | + || (hit.numHops != 1 && chanMgr->findParentHit(hit)))) | |
765 | 765 | { |
766 | 766 | int oldPos = pmem.pos; |
767 | 767 | hit.writeAtoms(patom, hit.chanID); |
@@ -878,20 +878,24 @@ bool Servent::handshakeStream(ChanInfo &chanInfo) | ||
878 | 878 | } |
879 | 879 | |
880 | 880 | chanID = chanInfo.id; |
881 | - serventHit.rhost[0].ip = getHost().ip; | |
882 | - serventHit.rhost[0].port = listenPort; | |
883 | - serventHit.host = serventHit.rhost[0]; | |
881 | + serventHit.host.ip = getHost().ip; | |
882 | + serventHit.host.port = listenPort; | |
883 | + if (serventHit.host.globalIP()) | |
884 | + serventHit.rhost[0] = serventHit.host; | |
885 | + else | |
886 | + serventHit.rhost[1] = serventHit.host; | |
884 | 887 | serventHit.chanID = chanID; |
885 | 888 | |
886 | 889 | canStreamLock.on(); |
887 | 890 | chanReady = canStream(ch); |
888 | - if (/*0 && */!chanReady) | |
891 | + if (0 && !chanReady && ch->isPlaying()) | |
889 | 892 | { |
890 | - if (servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0) | |
893 | + if (ch->info.getUptime() > 60 | |
894 | + && servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0) | |
891 | 895 | { |
892 | 896 | sourceHit = &ch->sourceHost; // send source host info |
893 | 897 | |
894 | - if (listenPort && ch->info.getUptime() > 60) // if stable | |
898 | + if (listenPort) | |
895 | 899 | { |
896 | 900 | // connect "this" host later |
897 | 901 | chanMgr->addHit(serventHit); |
@@ -1054,11 +1058,18 @@ bool Servent::handshakeStream(ChanInfo &chanInfo) | ||
1054 | 1058 | |
1055 | 1059 | int error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE; |
1056 | 1060 | |
1061 | + if (sourceHit) { | |
1062 | + sourceHit->writeAtoms(atom2,chanInfo.id); | |
1063 | + char tmp[50]; | |
1064 | + sourceHit->host.toStr(tmp); | |
1065 | + LOG_DEBUG("relay info(sourceHit): %s", tmp); | |
1066 | + } | |
1067 | + | |
1057 | 1068 | chanMgr->hitlistlock.on(); |
1058 | 1069 | |
1059 | 1070 | chl = chanMgr->findHitList(chanInfo); |
1060 | 1071 | |
1061 | - if (chl) | |
1072 | + if (chl && !sourceHit) | |
1062 | 1073 | { |
1063 | 1074 | ChanHit best; |
1064 | 1075 |
@@ -1117,25 +1128,18 @@ bool Servent::handshakeStream(ChanInfo &chanInfo) | ||
1117 | 1128 | cnt++; |
1118 | 1129 | } |
1119 | 1130 | |
1120 | - if (sourceHit) { | |
1121 | - char tmp[50]; | |
1122 | - sourceHit->writeAtoms(atom2, chanInfo.id); | |
1123 | - sourceHit->host.toStr(tmp); | |
1124 | - LOG_DEBUG("relay info(sourceHit): %s", tmp); | |
1125 | - best.host.ip = sourceHit->host.ip; | |
1126 | - } | |
1127 | - | |
1128 | 1131 | if (!best.host.ip){ |
1129 | 1132 | char tmp[50]; |
1130 | 1133 | // chanMgr->hitlistlock.on(); |
1131 | - int cnt = chs.getRelayHost(servMgr->serverHost, rhost, remoteID, chl); | |
1134 | + int rhcnt = chs.getRelayHost(servMgr->serverHost, rhost, remoteID, chl); | |
1132 | 1135 | // chanMgr->hitlistlock.off(); |
1133 | - for (int i = 0; i < cnt; i++){ | |
1136 | + for (int i = 0; i < rhcnt; i++){ | |
1134 | 1137 | chs.best[i].writeAtoms(atom2, chanInfo.id); |
1135 | 1138 | chs.best[i].host.toStr(tmp); |
1136 | 1139 | LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops); |
1137 | 1140 | best.host.ip = chs.best[i].host.ip; |
1138 | 1141 | } |
1142 | + cnt += rhcnt; | |
1139 | 1143 | } |
1140 | 1144 | |
1141 | 1145 | if (cnt) |
@@ -1352,6 +1356,13 @@ bool Servent::handshakeStream(ChanInfo &chanInfo) | ||
1352 | 1356 | { |
1353 | 1357 | handshakeIncomingPCP(atom,rhost,remoteID,agent); |
1354 | 1358 | atom.writeInt(PCP_OK,0); |
1359 | + if (rhost.globalIP()) | |
1360 | + serventHit.rhost[0] = rhost; | |
1361 | + else | |
1362 | + serventHit.rhost[1] = rhost; | |
1363 | + serventHit.sessionID = remoteID; | |
1364 | + serventHit.numHops = 1; | |
1365 | + chanMgr->addHit(serventHit); | |
1355 | 1366 | } |
1356 | 1367 | |
1357 | 1368 | } |
@@ -2762,7 +2762,7 @@ int ServMgr::kickUnrelayableHost(GnuID &chid, ChanHit &sendhit) | ||
2762 | 2762 | Host h = s->getHost(); |
2763 | 2763 | |
2764 | 2764 | ChanHit hit = s->serventHit; |
2765 | - if (!hit.relay && hit.numRelays == 0) | |
2765 | + if (!hit.relay && hit.numRelays == 0 || hit.firewalled) | |
2766 | 2766 | { |
2767 | 2767 | char hostName[256]; |
2768 | 2768 | h.toStr(hostName); |
@@ -45,8 +45,8 @@ extern int version_ex; // PP | ||
45 | 45 | //#define VERSION_EX 1 |
46 | 46 | static const char *PCP_CLIENT_VERSION_EX_PREFIX = "IM"; // 2bytes only |
47 | 47 | static const int PCP_CLIENT_VERSION_EX_NUMBER = 27; |
48 | -static const char *PCX_AGENTEX = "PeerCast/0.1218(IM0027-2)"; | |
49 | -static const char *PCX_VERSTRING_EX = "v0.1218(IM0027-2)"; | |
48 | +static const char *PCX_AGENTEX = "PeerCast/0.1218(IM0027-3)"; | |
49 | +static const char *PCX_VERSTRING_EX = "v0.1218(IM0027-3)"; | |
50 | 50 | #endif |
51 | 51 | |
52 | 52 | // ------------------------------------------------ |
@@ -608,6 +608,10 @@ void PeercastSource::stream(Channel *ch) | ||
608 | 608 | bool next_yp = false; |
609 | 609 | bool tracker_check = (ch->trackerHit.host.ip != 0); |
610 | 610 | int connFailCnt = 0; |
611 | + int keepDownstreamTime = 7; | |
612 | + | |
613 | + if (isIndexTxt(&ch->info)) | |
614 | + keepDownstreamTime = 30; | |
611 | 615 | |
612 | 616 | ch->lastStopTime = 0; |
613 | 617 | ch->bumped = false; |
@@ -720,7 +724,9 @@ void PeercastSource::stream(Channel *ch) | ||
720 | 724 | |
721 | 725 | chanMgr->hitlistlock.off(); |
722 | 726 | |
723 | - if (servMgr->keepDownstreams && ch->lastStopTime && ch->lastStopTime < sys->getTime() - 7) { | |
727 | + if (servMgr->keepDownstreams && ch->lastStopTime | |
728 | + && ch->lastStopTime < sys->getTime() - keepDownstreamTime) | |
729 | + { | |
724 | 730 | ch->lastStopTime = 0; |
725 | 731 | LOG_DEBUG("------------ disconnect all downstreams"); |
726 | 732 | ChanPacket pack; |
@@ -902,7 +908,7 @@ yp0: | ||
902 | 908 | ch->trackerHit.lastContact = ctime - 30 + (rand() % 30); |
903 | 909 | |
904 | 910 | // broadcast source host |
905 | - if (!error && ch->sourceHost.host.ip) { // if closed normally | |
911 | + if (!got503 && !error && ch->sourceHost.host.ip) { // if closed normally | |
906 | 912 | ChanPacket pack; |
907 | 913 | MemoryStream mem(pack.data,sizeof(pack.data)); |
908 | 914 | AtomStream atom(mem); |
@@ -916,7 +922,7 @@ yp0: | ||
916 | 922 | } |
917 | 923 | |
918 | 924 | // broadcast quit to any connected downstream servents |
919 | - if (!servMgr->keepDownstreams || (ch->sourceHost.tracker && !got503) || !error) { | |
925 | + if (!servMgr->keepDownstreams || !got503 && (ch->sourceHost.tracker || !error)) { | |
920 | 926 | ChanPacket pack; |
921 | 927 | MemoryStream mem(pack.data,sizeof(pack.data)); |
922 | 928 | AtomStream atom(mem); |
@@ -1385,6 +1391,17 @@ bool ChannelStream::getStatus(Channel *ch,ChanPacket &pack) | ||
1385 | 1391 | { |
1386 | 1392 | unsigned int ctime = sys->getTime(); |
1387 | 1393 | |
1394 | + if ((ch->isPlaying() == isPlaying)){ | |
1395 | + if ((ctime-lastUpdate) < 10){ | |
1396 | + return false; | |
1397 | + } | |
1398 | + | |
1399 | + if ((ctime-lastCheckTime) < 5){ | |
1400 | + return false; | |
1401 | + } | |
1402 | + lastCheckTime = ctime; | |
1403 | + } | |
1404 | + | |
1388 | 1405 | ChanHitList *chl = chanMgr->findHitListByID(ch->info.id); |
1389 | 1406 | |
1390 | 1407 | if (!chl) |
@@ -1419,17 +1436,6 @@ bool ChannelStream::getStatus(Channel *ch,ChanPacket &pack) | ||
1419 | 1436 | int newLocalListeners = ch->localListeners(); |
1420 | 1437 | int newLocalRelays = ch->localRelays(); |
1421 | 1438 | |
1422 | - if ((ch->isPlaying() == isPlaying)){ | |
1423 | - if ((ctime-lastUpdate) < 10){ | |
1424 | - return false; | |
1425 | - } | |
1426 | - | |
1427 | - if ((ctime-lastCheckTime) < 10){ | |
1428 | - return false; | |
1429 | - } | |
1430 | - lastCheckTime = ctime; | |
1431 | - } | |
1432 | - | |
1433 | 1439 | unsigned int oldp = ch->rawData.getOldestPos(); |
1434 | 1440 | unsigned int newp = ch->rawData.getLatestPos(); |
1435 | 1441 |
@@ -1611,9 +1617,11 @@ int Channel::readStream(Stream &in,ChannelStream *source) | ||
1611 | 1617 | setStatus(Channel::S_RECEIVING); |
1612 | 1618 | bumped = false; |
1613 | 1619 | } |
1614 | - source->updateStatus(this); | |
1620 | + //source->updateStatus(this); | |
1615 | 1621 | } |
1616 | 1622 | } |
1623 | + if (rawData.lastWriteTime > 0 || rawData.lastSkipTime > 0) | |
1624 | + source->updateStatus(this); | |
1617 | 1625 | |
1618 | 1626 | unsigned int t = sys->getTime(); |
1619 | 1627 | if (t != ptime) { |
@@ -1886,9 +1894,11 @@ bool ChanPacketBuffer::findPacket(unsigned int spos, ChanPacket &pack) | ||
1886 | 1894 | lock.on(); |
1887 | 1895 | |
1888 | 1896 | unsigned int bound = packets[0].len * ChanPacketBuffer::MAX_PACKETS * 2; // max packets to wait |
1889 | - unsigned int fpos = getStreamPos(firstPos); | |
1890 | - unsigned int lpos = getStreamPos(lastPos); | |
1891 | - if (spos < fpos && (fpos < lpos || spos > lpos + bound)) | |
1897 | + unsigned int fpos = getFirstDataPos(); | |
1898 | + unsigned int lpos = getLatestPos(); | |
1899 | + if ((spos < fpos && fpos <= lpos && spos != getStreamPosEnd(lastPos)) // --s-----f---l-- | |
1900 | + || (spos < fpos && lpos < fpos && spos > lpos + bound) // -l-------s--f-- | |
1901 | + || (spos > lpos && lpos >= fpos && spos - lpos > bound)) // --f---l------s- | |
1892 | 1902 | spos = fpos; |
1893 | 1903 | |
1894 | 1904 |
@@ -1917,6 +1927,18 @@ unsigned int ChanPacketBuffer::getLatestPos() | ||
1917 | 1927 | |
1918 | 1928 | } |
1919 | 1929 | // ----------------------------------- |
1930 | +unsigned int ChanPacketBuffer::getFirstDataPos() | |
1931 | +{ | |
1932 | + if (!writePos) | |
1933 | + return 0; | |
1934 | + for(unsigned int i=firstPos; i<=lastPos; i++) | |
1935 | + { | |
1936 | + if (packets[i%MAX_PACKETS].type == ChanPacket::T_DATA) | |
1937 | + return packets[i%MAX_PACKETS].pos; | |
1938 | + } | |
1939 | + return 0; | |
1940 | +} | |
1941 | +// ----------------------------------- | |
1920 | 1942 | unsigned int ChanPacketBuffer::getOldestPos() |
1921 | 1943 | { |
1922 | 1944 | if (!writePos) |
@@ -3897,6 +3919,15 @@ int ChanHitList::pickSourceHits(ChanHitSearch &chs) | ||
3897 | 3919 | return 0; |
3898 | 3920 | } |
3899 | 3921 | |
3922 | +// ----------------------------------- | |
3923 | +unsigned int ChanHitList::getSeq() | |
3924 | +{ | |
3925 | + unsigned int seq; | |
3926 | + seqLock.on(); | |
3927 | + seq = riSequence = (riSequence + 1) & 0xffffff; | |
3928 | + seqLock.off(); | |
3929 | + return seq; | |
3930 | +} | |
3900 | 3931 | |
3901 | 3932 | // ----------------------------------- |
3902 | 3933 | const char *ChanInfo::getTypeStr(TYPE t) |
@@ -4703,40 +4734,18 @@ int ChanHitSearch::getRelayHost(Host host1, Host host2, GnuID exID, ChanHitList | ||
4703 | 4734 | int index = 0; |
4704 | 4735 | int prob; |
4705 | 4736 | int rnd; |
4706 | - static int base = 0x400; | |
4737 | + int base = 0x400; | |
4707 | 4738 | ChanHit tmpHit[MAX_RESULTS]; |
4708 | - static WLock seqLock; | |
4709 | - static unsigned int riSequence = 0; | |
4710 | 4739 | |
4711 | 4740 | //srand(seed); |
4712 | 4741 | //seed += 11; |
4713 | 4742 | |
4714 | - unsigned int seq; | |
4715 | - seqLock.on(); | |
4716 | - seq = riSequence++; | |
4717 | - riSequence &= 0xffffff; | |
4718 | - seqLock.off(); | |
4719 | - | |
4720 | - Servent *s = servMgr->servents; | |
4721 | - while (s) { | |
4722 | - if (s->serventHit.rhost[0].port && s->type == Servent::T_RELAY | |
4723 | - && s->chanID.isSame(chl->info.id)) { | |
4724 | - int i = index % MAX_RESULTS; | |
4725 | - if (index < MAX_RESULTS | |
4726 | - || tmpHit[i].lastSendSeq > s->serventHit.lastSendSeq) { | |
4727 | - s->serventHit.lastSendSeq = seq; | |
4728 | - tmpHit[i] = s->serventHit; | |
4729 | - tmpHit[i].host = s->serventHit.rhost[0]; | |
4730 | - index++; | |
4731 | - } | |
4732 | - } | |
4733 | - s = s->next; | |
4734 | - } | |
4743 | + unsigned int seq = chl->getSeq(); | |
4735 | 4744 | |
4736 | 4745 | ChanHit *hit = chl->hit; |
4737 | 4746 | |
4738 | 4747 | while(hit){ |
4739 | - if (hit->host.ip && !hit->dead){ | |
4748 | + if (hit->rhost[0].ip && !hit->dead) { | |
4740 | 4749 | if ( |
4741 | 4750 | (!exID.isSame(hit->sessionID)) |
4742 | 4751 | // && (hit->relay) |
@@ -4764,7 +4773,6 @@ int ChanHitSearch::getRelayHost(Host host1, Host host2, GnuID exID, ChanHitList | ||
4764 | 4773 | //rnd = (float)rand() / (float)RAND_MAX; |
4765 | 4774 | rnd = rand() % base; |
4766 | 4775 | if (hit->numHops == 1){ |
4767 | -#if 0 | |
4768 | 4776 | if (tmpHit[index % MAX_RESULTS].numHops == 1){ |
4769 | 4777 | if (rnd < prob){ |
4770 | 4778 | tmpHit[index % MAX_RESULTS] = *hit; |
@@ -4776,9 +4784,8 @@ int ChanHitSearch::getRelayHost(Host host1, Host host2, GnuID exID, ChanHitList | ||
4776 | 4784 | tmpHit[index % MAX_RESULTS].host = hit->rhost[0]; |
4777 | 4785 | index++; |
4778 | 4786 | } |
4779 | -#endif | |
4780 | 4787 | } else { |
4781 | - if ((tmpHit[index % MAX_RESULTS].numHops != 1) && (rnd < prob) || rnd == 0){ | |
4788 | + if ((tmpHit[index % MAX_RESULTS].numHops != 1) && (rnd < prob)){ | |
4782 | 4789 | tmpHit[index % MAX_RESULTS] = *hit; |
4783 | 4790 | tmpHit[index % MAX_RESULTS].host = hit->rhost[0]; |
4784 | 4791 | index++; |
@@ -4822,15 +4829,9 @@ int ChanHitSearch::getRelayHost(Host host1, Host host2, GnuID exID, ChanHitList | ||
4822 | 4829 | best[use[i]] = tmpHit[i]; |
4823 | 4830 | }*/ |
4824 | 4831 | |
4825 | - int use[MAX_RESULTS]; | |
4826 | - int i; | |
4827 | - for (i = 0; i < cnt; i++) { | |
4828 | - use[i] = (i + seq) % cnt; | |
4829 | - } | |
4830 | - | |
4831 | - for (i = 0; i < cnt; i++){ | |
4832 | + for (int i = 0; i < cnt; i++){ | |
4832 | 4833 | // LOG_DEBUG("%d", use[i]); |
4833 | - best[use[i]] = tmpHit[i]; | |
4834 | + best[(i + seq) % cnt] = tmpHit[i]; | |
4834 | 4835 | } |
4835 | 4836 | // for (i = 0; i < cnt; i++){ |
4836 | 4837 | // char tmp[50]; |
@@ -249,13 +249,16 @@ public: | ||
249 | 249 | int getTotalRelays(); |
250 | 250 | int getTotalFirewalled(); |
251 | 251 | |
252 | + unsigned int getSeq(); | |
253 | + | |
252 | 254 | bool used; |
253 | 255 | ChanInfo info; |
254 | 256 | ChanHit *hit; |
255 | 257 | unsigned int lastHitTime; |
256 | 258 | ChanHitList *next; |
257 | 259 | |
258 | - | |
260 | + WLock seqLock; | |
261 | + unsigned int riSequence; | |
259 | 262 | }; |
260 | 263 | // ---------------------------------- |
261 | 264 | class ChanHitSearch |
@@ -187,6 +187,7 @@ public: | ||
187 | 187 | |
188 | 188 | int numPending() {return writePos-readPos;} |
189 | 189 | |
190 | + unsigned int getFirstDataPos(); | |
190 | 191 | unsigned int getLatestPos(); |
191 | 192 | unsigned int getOldestPos(); |
192 | 193 | unsigned int findOldestPos(unsigned int); |
@@ -547,7 +547,7 @@ void PCPStream::readHostAtoms(AtomStream &atom, int numc, BroadcastState &bcs, C | ||
547 | 547 | if (sv && sv->getHost().ip == hit.host.ip){ |
548 | 548 | // LOG_DEBUG("set servent's waitPort = %d", hit.host.port); |
549 | 549 | sv->waitPort = hit.host.port; |
550 | - hit.lastSendSeq = sv->serventHit.lastSendSeq; | |
550 | + //hit.lastSendSeq = sv->serventHit.lastSendSeq; | |
551 | 551 | sv->serventHit = hit; |
552 | 552 | } |
553 | 553 | } |
@@ -761,7 +761,7 @@ int PCPStream::readBroadcastAtoms(AtomStream &atom,int numc,BroadcastState &bcs) | ||
761 | 761 | ((hit.numHops == 1 && (hit.rhost[0].ip == sv->getHost().ip |
762 | 762 | && hit.uphost.ip == servMgr->serverHost.ip && hit.uphost.port == servMgr->serverHost.port) |
763 | 763 | || (hit.rhost[1].localIP() && hit.rhost[1].ip == sv->getHost().ip)) |
764 | - || chanMgr->findParentHit(hit))) | |
764 | + || (hit.numHops != 1 && chanMgr->findParentHit(hit)))) | |
765 | 765 | { |
766 | 766 | int oldPos = pmem.pos; |
767 | 767 | hit.writeAtoms(patom, hit.chanID); |
@@ -878,20 +878,24 @@ bool Servent::handshakeStream(ChanInfo &chanInfo) | ||
878 | 878 | } |
879 | 879 | |
880 | 880 | chanID = chanInfo.id; |
881 | - serventHit.rhost[0].ip = getHost().ip; | |
882 | - serventHit.rhost[0].port = listenPort; | |
883 | - serventHit.host = serventHit.rhost[0]; | |
881 | + serventHit.host.ip = getHost().ip; | |
882 | + serventHit.host.port = listenPort; | |
883 | + if (serventHit.host.globalIP()) | |
884 | + serventHit.rhost[0] = serventHit.host; | |
885 | + else | |
886 | + serventHit.rhost[1] = serventHit.host; | |
884 | 887 | serventHit.chanID = chanID; |
885 | 888 | |
886 | 889 | canStreamLock.on(); |
887 | 890 | chanReady = canStream(ch); |
888 | - if (/*0 && */!chanReady) | |
891 | + if (0 && !chanReady && ch->isPlaying()) | |
889 | 892 | { |
890 | - if (servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0) | |
893 | + if (ch->info.getUptime() > 60 | |
894 | + && servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0) | |
891 | 895 | { |
892 | 896 | sourceHit = &ch->sourceHost; // send source host info |
893 | 897 | |
894 | - if (listenPort && ch->info.getUptime() > 60) // if stable | |
898 | + if (listenPort) | |
895 | 899 | { |
896 | 900 | // connect "this" host later |
897 | 901 | chanMgr->addHit(serventHit); |
@@ -1054,11 +1058,18 @@ bool Servent::handshakeStream(ChanInfo &chanInfo) | ||
1054 | 1058 | |
1055 | 1059 | int error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE; |
1056 | 1060 | |
1061 | + if (sourceHit) { | |
1062 | + sourceHit->writeAtoms(atom2,chanInfo.id); | |
1063 | + char tmp[50]; | |
1064 | + sourceHit->host.toStr(tmp); | |
1065 | + LOG_DEBUG("relay info(sourceHit): %s", tmp); | |
1066 | + } | |
1067 | + | |
1057 | 1068 | chanMgr->hitlistlock.on(); |
1058 | 1069 | |
1059 | 1070 | chl = chanMgr->findHitList(chanInfo); |
1060 | 1071 | |
1061 | - if (chl) | |
1072 | + if (chl && !sourceHit) | |
1062 | 1073 | { |
1063 | 1074 | ChanHit best; |
1064 | 1075 |
@@ -1117,25 +1128,18 @@ bool Servent::handshakeStream(ChanInfo &chanInfo) | ||
1117 | 1128 | cnt++; |
1118 | 1129 | } |
1119 | 1130 | |
1120 | - if (sourceHit) { | |
1121 | - char tmp[50]; | |
1122 | - sourceHit->writeAtoms(atom2, chanInfo.id); | |
1123 | - sourceHit->host.toStr(tmp); | |
1124 | - LOG_DEBUG("relay info(sourceHit): %s", tmp); | |
1125 | - best.host.ip = sourceHit->host.ip; | |
1126 | - } | |
1127 | - | |
1128 | 1131 | if (!best.host.ip){ |
1129 | 1132 | char tmp[50]; |
1130 | 1133 | // chanMgr->hitlistlock.on(); |
1131 | - int cnt = chs.getRelayHost(servMgr->serverHost, rhost, remoteID, chl); | |
1134 | + int rhcnt = chs.getRelayHost(servMgr->serverHost, rhost, remoteID, chl); | |
1132 | 1135 | // chanMgr->hitlistlock.off(); |
1133 | - for (int i = 0; i < cnt; i++){ | |
1136 | + for (int i = 0; i < rhcnt; i++){ | |
1134 | 1137 | chs.best[i].writeAtoms(atom2, chanInfo.id); |
1135 | 1138 | chs.best[i].host.toStr(tmp); |
1136 | 1139 | LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops); |
1137 | 1140 | best.host.ip = chs.best[i].host.ip; |
1138 | 1141 | } |
1142 | + cnt += rhcnt; | |
1139 | 1143 | } |
1140 | 1144 | |
1141 | 1145 | if (cnt) |
@@ -1352,6 +1356,13 @@ bool Servent::handshakeStream(ChanInfo &chanInfo) | ||
1352 | 1356 | { |
1353 | 1357 | handshakeIncomingPCP(atom,rhost,remoteID,agent); |
1354 | 1358 | atom.writeInt(PCP_OK,0); |
1359 | + if (rhost.globalIP()) | |
1360 | + serventHit.rhost[0] = rhost; | |
1361 | + else | |
1362 | + serventHit.rhost[1] = rhost; | |
1363 | + serventHit.sessionID = remoteID; | |
1364 | + serventHit.numHops = 1; | |
1365 | + chanMgr->addHit(serventHit); | |
1355 | 1366 | } |
1356 | 1367 | |
1357 | 1368 | } |
@@ -2762,7 +2762,7 @@ int ServMgr::kickUnrelayableHost(GnuID &chid, ChanHit &sendhit) | ||
2762 | 2762 | Host h = s->getHost(); |
2763 | 2763 | |
2764 | 2764 | ChanHit hit = s->serventHit; |
2765 | - if (!hit.relay && hit.numRelays == 0) | |
2765 | + if (!hit.relay && hit.numRelays == 0 || hit.firewalled) | |
2766 | 2766 | { |
2767 | 2767 | char hostName[256]; |
2768 | 2768 | h.toStr(hostName); |
@@ -45,8 +45,8 @@ extern int version_ex; // PP | ||
45 | 45 | //#define VERSION_EX 1 |
46 | 46 | static const char *PCP_CLIENT_VERSION_EX_PREFIX = "IM"; // 2bytes only |
47 | 47 | static const int PCP_CLIENT_VERSION_EX_NUMBER = 27; |
48 | -static const char *PCX_AGENTEX = "PeerCast/0.1218(IM0027-2)"; | |
49 | -static const char *PCX_VERSTRING_EX = "v0.1218(IM0027-2)"; | |
48 | +static const char *PCX_AGENTEX = "PeerCast/0.1218(IM0027-3)"; | |
49 | +static const char *PCX_VERSTRING_EX = "v0.1218(IM0027-3)"; | |
50 | 50 | #endif |
51 | 51 | |
52 | 52 | // ------------------------------------------------ |