00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 #include "private.h"
00039 #include <ccrtp/oqueue.h>
00040
00041 #ifdef CCXX_NAMESPACES
00042 namespace ost {
00043 #endif
00044
00045 const size_t OutgoingDataQueueBase::defaultMaxSendSegmentSize = 65536;
00046
00047 OutgoingDataQueueBase::OutgoingDataQueueBase()
00048 {
00049
00050 setMaxSendSegmentSize(getDefaultMaxSendSegmentSize());
00051 }
00052
00053 DestinationListHandler::DestinationListHandler() :
00054 destList(), destinationLock()
00055 {
00056 }
00057
00058 DestinationListHandler::~DestinationListHandler()
00059 {
00060 TransportAddress* tmp = NULL;
00061 writeLockDestinationList();
00062 for (std::list<TransportAddress*>::iterator i = destList.begin();
00063 destList.end() != i; i++) {
00064 tmp = *i;
00065 #ifdef CCXX_EXCEPTIONS
00066 try {
00067 #endif
00068 delete tmp;
00069 #ifdef CCXX_EXCEPTIONS
00070 } catch (...) {}
00071 #endif
00072 }
00073 unlockDestinationList();
00074 }
00075
00076 bool
00077 DestinationListHandler::addDestinationToList(const InetAddress& ia,
00078 tpport_t data, tpport_t control)
00079 {
00080 TransportAddress* addr = new TransportAddress(ia,data,control);
00081 writeLockDestinationList();
00082 destList.push_back(addr);
00083 unlockDestinationList();
00084 return true;
00085 }
00086
00087 bool
00088 DestinationListHandler::removeDestinationFromList(const InetAddress& ia,
00089 tpport_t dataPort,
00090 tpport_t controlPort)
00091 {
00092 bool result = false;
00093 writeLockDestinationList();
00094 TransportAddress* tmp;
00095 for (std::list<TransportAddress*>::iterator i = destList.begin();
00096 destList.end() != i && !result; ) {
00097 tmp = *i;
00098 if ( ia == tmp->getNetworkAddress() &&
00099 dataPort == tmp->getDataTransportPort() &&
00100 controlPort == tmp->getControlTransportPort() ) {
00101
00102 result = true;
00103 destList.erase(i);
00104 delete tmp;
00105 } else{
00106 i++;
00107 }
00108 }
00109 unlockDestinationList();
00110 return result;
00111 }
00112
00113 #ifdef CCXX_IPV6
00114
00115 DestinationListHandlerIPV6::DestinationListHandlerIPV6() :
00116 destListIPV6(), destinationLock()
00117 {
00118 }
00119
00120 DestinationListHandlerIPV6::~DestinationListHandlerIPV6()
00121 {
00122 TransportAddressIPV6* tmp = NULL;
00123 writeLockDestinationListIPV6();
00124 for (std::list<TransportAddressIPV6*>::iterator i = destListIPV6.begin();
00125 destListIPV6.end() != i; i++) {
00126 tmp = *i;
00127 #ifdef CCXX_EXCEPTIONS
00128 try {
00129 #endif
00130 delete tmp;
00131 #ifdef CCXX_EXCEPTIONS
00132 } catch (...) {}
00133 #endif
00134 }
00135 unlockDestinationListIPV6();
00136 }
00137
00138 bool
00139 DestinationListHandlerIPV6::addDestinationToListIPV6(const IPV6Address& ia,
00140 tpport_t data, tpport_t control)
00141 {
00142 TransportAddressIPV6* addr = new TransportAddressIPV6(ia,data,control);
00143 writeLockDestinationListIPV6();
00144 destListIPV6.push_back(addr);
00145 unlockDestinationListIPV6();
00146 return true;
00147 }
00148
00149 bool
00150 DestinationListHandlerIPV6::removeDestinationFromListIPV6(const IPV6Address& ia,
00151 tpport_t dataPort,
00152 tpport_t controlPort)
00153 {
00154 bool result = false;
00155 writeLockDestinationListIPV6();
00156 TransportAddressIPV6* tmp;
00157 for (std::list<TransportAddressIPV6*>::iterator i = destListIPV6.begin();
00158 destListIPV6.end() != i && !result; ) {
00159 tmp = *i;
00160 if ( ia == tmp->getNetworkAddress() &&
00161 dataPort == tmp->getDataTransportPort() &&
00162 controlPort == tmp->getControlTransportPort() ) {
00163
00164 result = true;
00165 destListIPV6.erase(i);
00166 delete tmp;
00167 } else {
00168 i++;
00169 }
00170 }
00171 unlockDestinationListIPV6();
00172 return result;
00173 }
00174
00175
00176 #endif
00177
00179 const microtimeout_t OutgoingDataQueue::defaultSchedulingTimeout = 8000;
00181 const microtimeout_t OutgoingDataQueue::defaultExpireTimeout = 40000;
00182
00183 OutgoingDataQueue::OutgoingDataQueue():
00184 OutgoingDataQueueBase(),
00185 #ifdef CCXX_IPV6
00186 DestinationListHandlerIPV6(),
00187 #endif
00188 DestinationListHandler(),
00189 sendLock(),
00190 sendFirst(NULL), sendLast(NULL)
00191 {
00192 setInitialTimestamp(random32());
00193 setSchedulingTimeout(getDefaultSchedulingTimeout());
00194 setExpireTimeout(getDefaultExpireTimeout());
00195
00196 sendInfo.packetCount = 0;
00197 sendInfo.octetCount = 0;
00198 sendInfo.sendSeq = random16();
00199 sendInfo.sendCC = 0;
00200 sendInfo.paddinglen = 0;
00201 sendInfo.marked = false;
00202 sendInfo.complete = true;
00203
00204 sendInfo.sendSources[0] = getLocalSSRC();
00205
00206 sendInfo.overflowTime.tv_sec = getInitialTime().tv_sec;
00207 sendInfo.overflowTime.tv_usec = getInitialTime().tv_usec;
00208 }
00209
00210 void
00211 OutgoingDataQueue::purgeOutgoingQueue()
00212 {
00213 OutgoingRTPPktLink* sendnext;
00214
00215
00216 sendLock.writeLock();
00217 while ( sendFirst )
00218 {
00219 sendnext = sendFirst->getNext();
00220 delete sendFirst;
00221 sendFirst = sendnext;
00222 }
00223 sendLast = NULL;
00224 sendLock.unlock();
00225 }
00226
00227 bool
00228 OutgoingDataQueue::addDestination(const InetHostAddress& ia,
00229 tpport_t dataPort,
00230 tpport_t controlPort)
00231 {
00232 if ( 0 == controlPort )
00233 controlPort = dataPort + 1;
00234 bool result = addDestinationToList(ia,dataPort,controlPort);
00235 if ( result && isSingleDestination() ) {
00236 setDataPeer(ia,dataPort);
00237 setControlPeer(ia,controlPort);
00238 }
00239 return result;
00240 }
00241
00242 bool
00243 OutgoingDataQueue::addDestination(const InetMcastAddress& ia,
00244 tpport_t dataPort,
00245 tpport_t controlPort)
00246 {
00247 if ( 0 == controlPort )
00248 controlPort = dataPort + 1;
00249 bool result = addDestinationToList(ia,dataPort,controlPort);
00250 if ( result && isSingleDestination() ) {
00251 setDataPeer(ia,dataPort);
00252 setControlPeer(ia,controlPort);
00253 }
00254 return result;
00255 }
00256
00257 bool
00258 OutgoingDataQueue::forgetDestination(const InetHostAddress& ia,
00259 tpport_t dataPort,
00260 tpport_t controlPort)
00261 {
00262 if ( 0 == controlPort )
00263 controlPort = dataPort + 1;
00264 return DestinationListHandler::
00265 removeDestinationFromList(ia,dataPort,controlPort);
00266 }
00267
00268 bool
00269 OutgoingDataQueue::forgetDestination(const InetMcastAddress& ia,
00270 tpport_t dataPort,
00271 tpport_t controlPort)
00272 {
00273 if ( 0 == controlPort )
00274 controlPort = dataPort + 1;
00275 return DestinationListHandler::
00276 removeDestinationFromList(ia,dataPort,controlPort);
00277 }
00278
00279 #ifdef CCXX_IPV6
00280 bool
00281 OutgoingDataQueue::addDestination(const IPV6Address& ia,
00282 tpport_t dataPort,
00283 tpport_t controlPort)
00284 {
00285 if ( 0 == controlPort )
00286 controlPort = dataPort + 1;
00287 bool result = addDestinationToListIPV6(ia,dataPort,controlPort);
00288 if ( result && isSingleDestinationIPV6() ) {
00289 setDataPeerIPV6(ia,dataPort);
00290 setControlPeerIPV6(ia,controlPort);
00291 }
00292 return result;
00293 }
00294
00295 bool
00296 OutgoingDataQueue::forgetDestination(const IPV6Address& ia,
00297 tpport_t dataPort,
00298 tpport_t controlPort)
00299 {
00300 if ( 0 == controlPort )
00301 controlPort = dataPort + 1;
00302 return DestinationListHandlerIPV6::
00303 removeDestinationFromListIPV6(ia,dataPort,controlPort);
00304 }
00305
00306 #endif
00307
00308 bool
00309 OutgoingDataQueue::isSending(void) const
00310 {
00311 if(sendFirst)
00312 return true;
00313
00314 return false;
00315 }
00316
00317 microtimeout_t
00318 OutgoingDataQueue::getSchedulingTimeout(void)
00319 {
00320 struct timeval send, now;
00321 uint32 rate;
00322 uint32 rem;
00323
00324 for(;;)
00325 {
00326
00327
00328 if( !sendFirst )
00329 return schedulingTimeout;
00330
00331 uint32 stamp = sendFirst->getPacket()->getTimestamp();
00332 stamp -= getInitialTimestamp();
00333 rate = getCurrentRTPClockRate();
00334
00335
00336
00337
00338
00339 send.tv_sec = stamp / rate;
00340 rem = stamp % rate;
00341 send.tv_usec = (1000ul*rem) / (rate/1000ul);
00342
00343
00344
00345
00346 timeradd(&send,&(sendInfo.overflowTime),&send);
00347 gettimeofday(&now, NULL);
00348
00349
00350
00351
00352
00353
00354
00355
00356
00357
00358
00359
00360
00361
00362 if ( now.tv_sec - send.tv_sec > 5000){
00363 timeval overflow;
00364 overflow.tv_sec =(~static_cast<uint32>(0)) / rate;
00365 overflow.tv_usec = (~static_cast<uint32>(0)) % rate *
00366 1000000ul / rate;
00367 do {
00368 timeradd(&send,&overflow,&send);
00369 timeradd(&(sendInfo.overflowTime),&overflow,
00370 &(sendInfo.overflowTime));
00371 } while ( now.tv_sec - send.tv_sec > 5000 );
00372 }
00373
00374
00375
00376
00377 if ( send.tv_sec - now.tv_sec > 20000 ){
00378 timeval overflow;
00379 overflow.tv_sec = (~static_cast<uint32>(0)) / rate;
00380 overflow.tv_usec = (~static_cast<uint32>(0)) % rate *
00381 1000000ul / rate;
00382 timersub(&send,&overflow,&send);
00383 }
00384
00385
00386 if ( send.tv_sec - now.tv_sec > 3600 ){
00387 return 3600000000ul;
00388 }
00389 int32 diff =
00390 ((send.tv_sec - now.tv_sec) * 1000000ul) +
00391 send.tv_usec - now.tv_usec;
00392
00393 if ( diff >= 0 ){
00394 return static_cast<microtimeout_t>(diff);
00395 }
00396
00397
00398 if ( (diff < 0) &&
00399 static_cast<microtimeout_t>(-diff) <= getExpireTimeout() ){
00400 return 0;
00401 }
00402
00403
00404 sendLock.writeLock();
00405 OutgoingRTPPktLink* packet = sendFirst;
00406 sendFirst = sendFirst->getNext();
00407 onExpireSend(*(packet->getPacket()));
00408 delete packet;
00409 if ( sendFirst )
00410 sendFirst->setPrev(NULL);
00411 else
00412 sendLast = NULL;
00413 sendLock.unlock();
00414 }
00415 I( false );
00416 return 0;
00417 }
00418
00419 void
00420 OutgoingDataQueue::putData(uint32 stamp, const unsigned char *data,
00421 size_t datalen)
00422 {
00423 if ( !data || !datalen )
00424 return;
00425
00426 size_t step = 0, offset = 0;
00427 while ( offset < datalen ) {
00428
00429
00430 size_t remainder = datalen - offset;
00431 step = ( remainder > getMaxSendSegmentSize() ) ?
00432 getMaxSendSegmentSize() : remainder;
00433
00434 CryptoContext* pcc = getOutQueueCryptoContext(getLocalSSRC());
00435
00436 OutgoingRTPPkt* packet;
00437 if ( sendInfo.sendCC )
00438 packet = new OutgoingRTPPkt(sendInfo.sendSources,15,data + offset,step,
00439 sendInfo.paddinglen, pcc);
00440 else
00441 packet = new OutgoingRTPPkt(data + offset,step,sendInfo.paddinglen, pcc);
00442
00443 packet->setPayloadType(getCurrentPayloadType());
00444 packet->setSeqNum(sendInfo.sendSeq++);
00445 packet->setTimestamp(stamp + getInitialTimestamp());
00446
00447 packet->setSSRCNetwork(getLocalSSRCNetwork());
00448 if ( (0 == offset) && getMark() ) {
00449 packet->setMarker(true);
00450 setMark(false);
00451 } else {
00452 packet->setMarker(false);
00453 }
00454 if (pcc != NULL) {
00455 packet->protect(getLocalSSRC(), pcc);
00456 }
00457
00458 sendLock.writeLock();
00459 OutgoingRTPPktLink *link =
00460 new OutgoingRTPPktLink(packet,sendLast,NULL);
00461 if (sendLast)
00462 sendLast->setNext(link);
00463 else
00464 sendFirst = link;
00465 sendLast = link;
00466 sendLock.unlock();
00467
00468 offset += step;
00469 }
00470 }
00471
00472 void
00473 OutgoingDataQueue::sendImmediate(uint32 stamp, const unsigned char *data,
00474 size_t datalen)
00475 {
00476 if ( !data || !datalen )
00477 return;
00478
00479 size_t step = 0, offset = 0;
00480 while ( offset < datalen ) {
00481
00482
00483 size_t remainder = datalen - offset;
00484 step = ( remainder > getMaxSendSegmentSize() ) ?
00485 getMaxSendSegmentSize() : remainder;
00486
00487 CryptoContext* pcc = getOutQueueCryptoContext(getLocalSSRC());
00488
00489 OutgoingRTPPkt* packet;
00490 if ( sendInfo.sendCC )
00491 packet = new OutgoingRTPPkt(sendInfo.sendSources,15,data + offset,step,
00492 sendInfo.paddinglen, pcc);
00493 else
00494 packet = new OutgoingRTPPkt(data + offset,step,sendInfo.paddinglen, pcc);
00495
00496
00497 packet->setPayloadType(getCurrentPayloadType());
00498 packet->setSeqNum(sendInfo.sendSeq++);
00499 packet->setTimestamp(stamp + getInitialTimestamp());
00500 packet->setSSRCNetwork(getLocalSSRCNetwork());
00501 if ( (0 == offset) && getMark() ) {
00502 packet->setMarker(true);
00503 setMark(false);
00504 } else {
00505 packet->setMarker(false);
00506 }
00507 if (pcc != NULL) {
00508 packet->protect(getLocalSSRC(), pcc);
00509 }
00510 dispatchImmediate(packet);
00511 delete packet;
00512 offset += step;
00513 }
00514 }
00515
00516 void OutgoingDataQueue::dispatchImmediate(OutgoingRTPPkt *packet)
00517 {
00518 lockDestinationList();
00519 if ( isSingleDestination() ) {
00520 TransportAddress* tmp = destList.front();
00521
00522 setDataPeer(tmp->getNetworkAddress(),
00523 tmp->getDataTransportPort());
00524
00525 sendData(packet->getRawPacket(),
00526 packet->getRawPacketSizeSrtp());
00527 } else {
00528
00529 for (std::list<TransportAddress*>::iterator i =
00530 destList.begin(); destList.end() != i; i++) {
00531 TransportAddress* dest = *i;
00532 setDataPeer(dest->getNetworkAddress(),
00533 dest->getDataTransportPort());
00534 sendData(packet->getRawPacket(),
00535 packet->getRawPacketSizeSrtp());
00536 }
00537 }
00538 unlockDestinationList();
00539
00540 #ifdef CCXX_IPV6
00541 lockDestinationListIPV6();
00542 if ( isSingleDestinationIPV6() ) {
00543 TransportAddressIPV6* tmp6 = destListIPV6.front();
00544
00545 setDataPeerIPV6(tmp6->getNetworkAddress(),
00546 tmp6->getDataTransportPort());
00547
00548 sendDataIPV6(packet->getRawPacket(),
00549 packet->getRawPacketSizeSrtp());
00550 } else {
00551
00552 for (std::list<TransportAddressIPV6*>::iterator i6 =
00553 destListIPV6.begin(); destListIPV6.end() != i6; i6++) {
00554 TransportAddressIPV6* dest6 = *i6;
00555 setDataPeerIPV6(dest6->getNetworkAddress(),
00556 dest6->getDataTransportPort());
00557 sendDataIPV6(packet->getRawPacket(),
00558 packet->getRawPacketSizeSrtp());
00559 }
00560 }
00561 unlockDestinationListIPV6();
00562 #endif
00563 }
00564
00565 size_t
00566 OutgoingDataQueue::dispatchDataPacket(void)
00567 {
00568 sendLock.writeLock();
00569 OutgoingRTPPktLink* packetLink = sendFirst;
00570
00571 if ( !packetLink ){
00572 sendLock.unlock();
00573 return 0;
00574 }
00575
00576 OutgoingRTPPkt* packet = packetLink->getPacket();
00577 uint32 rtn = packet->getPayloadSize();
00578 dispatchImmediate(packet);
00579
00580
00581
00582 sendFirst = sendFirst->getNext();
00583 if ( sendFirst ) {
00584 sendFirst->setPrev(NULL);
00585 } else {
00586 sendLast = NULL;
00587 }
00588
00589 sendInfo.packetCount++;
00590 sendInfo.octetCount += packet->getPayloadSize();
00591 delete packetLink;
00592
00593 sendLock.unlock();
00594 return rtn;
00595 }
00596
00597 size_t
00598 OutgoingDataQueue::setPartial(uint32 stamp, unsigned char *data,
00599 size_t offset, size_t max)
00600 {
00601 sendLock.writeLock();
00602 OutgoingRTPPktLink* packetLink = sendFirst;
00603 while ( packetLink )
00604 {
00605 uint32 pstamp = packetLink->getPacket()->getTimestamp();
00606 if ( pstamp > stamp )
00607 packetLink = NULL;
00608 if ( pstamp >= stamp )
00609 break;
00610
00611 packetLink = packetLink->getNext();
00612 }
00613 if ( !packetLink )
00614 {
00615 sendLock.unlock();
00616 return 0;
00617 }
00618
00619 OutgoingRTPPkt* packet = packetLink->getPacket();
00620 if ( offset >= packet->getPayloadSize() )
00621 return 0;
00622
00623 if ( max > packet->getPayloadSize() - offset )
00624 max = packet->getPayloadSize() - offset;
00625
00626 memcpy((unsigned char*)(packet->getPayload()) + offset,
00627 data, max);
00628 sendLock.unlock();
00629 return max;
00630 }
00631
00632 void
00633 OutgoingDataQueue::setOutQueueCryptoContext(CryptoContext* cc)
00634 {
00635 std::list<CryptoContext *>::iterator i;
00636
00637 MutexLock lock(cryptoMutex);
00638
00639
00640 for( i = cryptoContexts.begin(); i!= cryptoContexts.end(); i++ ){
00641 if( (*i)->getSsrc() == cc->getSsrc() ) {
00642 CryptoContext* tmp = *i;
00643 cryptoContexts.erase(i);
00644 delete tmp;
00645 break;
00646 }
00647 }
00648 cryptoContexts.push_back(cc);
00649 }
00650
00651 void
00652 OutgoingDataQueue::removeOutQueueCryptoContext(CryptoContext* cc)
00653 {
00654 std::list<CryptoContext *>::iterator i;
00655
00656 MutexLock lock(cryptoMutex);
00657 if (cc == NULL) {
00658 for (i = cryptoContexts.begin(); i != cryptoContexts.end(); ) {
00659 CryptoContext* tmp = *i;
00660 i = cryptoContexts.erase(i);
00661 delete tmp;
00662 }
00663 }
00664 else {
00665 for( i = cryptoContexts.begin(); i != cryptoContexts.end(); i++ ){
00666 if( (*i)->getSsrc() == cc->getSsrc() ) {
00667 CryptoContext* tmp = *i;
00668 cryptoContexts.erase(i);
00669 delete tmp;
00670 return;
00671 }
00672 }
00673 }
00674 }
00675
00676 CryptoContext*
00677 OutgoingDataQueue::getOutQueueCryptoContext(uint32 ssrc)
00678 {
00679 std::list<CryptoContext *>::iterator i;
00680
00681 MutexLock lock(cryptoMutex);
00682 for( i = cryptoContexts.begin(); i != cryptoContexts.end(); i++ ){
00683 if( (*i)->getSsrc() == ssrc) {
00684 return (*i);
00685 }
00686 }
00687 return NULL;
00688 }
00689
00690 #ifdef CCXX_NAMESPACES
00691 }
00692 #endif
00693