/* * Copyright (c) 1997 Regents of the University of California. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. All advertising materials mentioning features or use of this software * must display the following acknowledgement: * This product includes software developed by the MASH Research * Group at the University of California Berkeley. * 4. Neither the name of the University nor of the Research Group may be * used to endorse or promote products derived from this software without * specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ #ifndef lint static char rcsid[] = "@(#) $Header: /afs/cs/project/cmcl-ns2/NetBSD1.2.1/ns-allinone/ns-2/RCS/csfq.cc,v 1.7 1998/09/12 19:39:27 istoica Exp istoica $ (ANS)"; #endif #include #include #include "random.h" #include "queue.h" #define MAXFLOWS 200 #ifndef TRUE #define TRUE 1 #define FALSE 0 #endif #define KALPHA 29 // 0.99^29 = 0.75 //#define CSFQ_LOG //#define PENALTY_BOX /* use penalty box in conjucntion with CSFQ */ #ifdef PENALTY_BOX #define MONITOR_TABLE_SIZE 10 #define PUNISH_TABLE_SIZE 10 #define DROPPED_ARRAY_SIZE 100 #define PUNISH_THRESH 1.3 /* * when the flow's rate exceeds * PUNISH_THRESH times the fair rate * the flow is punished */ #define GOOD_THRESH 0.7 /* * when the punished flow's rate is * GOOD_THRESH times smaller than the * link's fair rate the flow * is no longer punished */ #define BETA 0.98 /* * (1 - BETA) = precentage by which * link's fair rate is decreased when a * forced drop occurs */ typedef struct identHash { int valid_; double prevTime_; double estNormRate_; } IdentHashTable; #endif //#define SRCID /* identify flow bassed of source id rather than flow id */ //#define CSFQ_RLMEX /* used for RLM experiments only */ /* the following three functions are in util.cc */ void initRand(unsigned seed); float funifRand(float a, float b); void panic(char *s); double min(double x, double y) { return (x < y ? x : y); } class CSFQ : public Queue { public: CSFQ(); virtual int command(int argc, const char*const* argv); Packet *deque(void); void enque(Packet *pkt); protected: double estRate(int flowid, int pktSize, double arvTime); void estAlpha(int pktSize, double pktLabel, double arvTime, int enqueue); void logArvPacket(int flowId, double ctime, int pktSize); void logDptPacket(int flowId, double ctime, int pktSize); void logDroppedPacket(int flowId, double ctime); void logDroppedPacket1(int flowId, double ctime); void logEstAlpha(double ctime); void logEstLabel(int flowId, double ctime, double label); #ifdef SRCID int getId(int src); #endif #ifdef PENALTY_BOX int isInHash(int idx, IdentHashTable *hashTable, int size); int insertInHash(int idx, double flowRate, IdentHashTable *hashTable, int size); void deleteFromHash(int idx, IdentHashTable *hashTable, int size); double updateNormRate(int idx, double pktLabel, IdentHashTable *hashTable, int size); int recordDroppedPacket(int flowId); void choseToMonitor(); void punishFlow(int flowId); #endif struct flowState { double weight_; /* the weight of the flow */ double k_; /* averaging interval for rate estimation */ double estRate_; /* current flow's estimated rate */ double prevTime_; /* internal statistics */ int size_; /* keep track of packets that arrive at the same time */ int numArv_; /* number of arrival packets */ int numDpt_; /* number of dropped packets */ int numDroped_; /* number of droped packets */ } fs_[MAXFLOWS]; int id_; /* queue & link identifier */ #ifdef SRCID int hashid_[MAXFLOWS]; #endif int CSFQ::monitor(int flowId, double pktLabel); void CSFQ::dropPkt(int flowId, Packet *pkt); #ifdef PENALTY_BOX IdentHashTable monitorTable_[MONITOR_TABLE_SIZE]; IdentHashTable punishTable_[PUNISH_TABLE_SIZE]; int droppedArray_[DROPPED_ARRAY_SIZE]; int numDroppedPkts_; double kDropped_; #endif PacketQueue q_; /* packet queue */ int qsize_; /* maximum queue size (in bits) */ int qsizeThresh_; /* threshold (in bits); if queue occupancy does * not exceed qsizeThresh_ the queue is * assumed to be uncongested */ int qsizeCrt_; /* current queue size (in bits) */ int maxflow_; /* maximum number of flows */ int edge_; /* queue type: 1 if belongs to an edge node; 0 otherwise*/ double rate_; /* link rate (in bps) */ double lastArv_; /* the arival time of the last packet (in sec) */ double kLink_; /* avg. interval for computing rateTotal (usec)*/ double alpha_; /* output link fair rate */ double tmpAlpha_; /* * used to compute the largest label of a packet, i.e., * the largest flow rate seen during an * interval of length kLink_ */ int kalpha_; /* maximum number of times the fair rate * (alpha_) can be decreased when the queue * overflows, during a time interval of length kLink_ */ double rateAlpha_; /* aggreagte forwarded rate corresponding to crt. alpha_ */ double rateTotal_; /* aggregate arrival rate */ double lArv_; /* used to store the start of an interval of * length kLink_ */ int congested_; /* specify whether the link is congested or not*/ int pktSize_; /* * the total number of bits enqueued between * two consecutive rate estimations. * usualy, this represent the size of one packet; * however, if more packets are received at the same time * this will represent the cumulative size of all * pacekets received simulatneously */ int pktSizeE_; /* * same as above, but for the total number of bytes that * are enqueued between consecutive rate estimations */ }; static class CSFQClass : public TclClass { public: CSFQClass() : TclClass("Queue/CSFQ") {} TclObject* create(int argc, const char*const* argv) { return (new CSFQ); } } class_csfq; CSFQ::CSFQ() { for (int i = 0; i < MAXFLOWS; ++i) { fs_[i].weight_ = 1.0; fs_[i].k_ = 100000.0; fs_[i].numArv_ = fs_[i].numDpt_ = fs_[i].numDroped_ = 0; fs_[i].prevTime_ = 0.0; fs_[i].estRate_ = 0.0; fs_[i].size_ = 0; #ifdef SRCID hashid_[i] = 0; #endif } maxflow_ = -1; qsizeCrt_ = 0; alpha_ = tmpAlpha_ = 0.; kalpha_ = KALPHA; #ifdef PENALTY_BOX for (int i = 0; i < MONITOR_TABLE_SIZE; i++) { monitorTable_[i].valid_ = 0; monitorTable_[i].prevTime_ = monitorTable_[i].estNormRate_ = 0.; } for (int i = 0; i < PUNISH_TABLE_SIZE; i++) { punishTable_[i].valid_ = 0; punishTable_[i].prevTime_ = monitorTable_[i].estNormRate_ = 0.; } numDroppedPkts_ = 0; #endif lastArv_ = rateAlpha_ = rateTotal_ = 0.; lArv_ = 0.; pktSize_ = pktSizeE_ = 0; congested_ = 1; #ifdef PENALTY_BOX bind("kLink_", &kDropped_); #endif bind("kLink_", &kLink_); edge_ = 1; bind("edge_", &edge_); bind("qsize_", &qsize_); bind("qsizeThresh_", &qsizeThresh_); bind("rate_", &rate_); bind("id_", &id_); } int CSFQ::command(int argc, const char*const* argv) { if (argc == 5) { /* flowid, weight_, k_ */ if (strcmp(argv[1], "init-flow") == 0) { int flowId = atoi(argv[2]); if (flowId >= MAXFLOWS) { fprintf(stderr, "CSFQ: Flow id=%d too large; it should be < %d!\n", flowId, MAXFLOWS); abort(); } fs_[flowId].weight_ = (double)atoi(argv[3]); fs_[flowId].k_ = (double)atoi(argv[4]); return (TCL_OK); } } else if ((argc == 7) && (strcmp(argv[1], "init-link") == 0)) { /* edge_, kLink_, qsize_, qsizeThresh_ */ edge_ = atoi(argv[2]); if (edge_ != 0 && edge_ != 1) { fprintf(stderr, "CSFQ: Invalide link type !\n"); abort(); } kLink_ = (double)atoi(argv[3]); #ifdef PENALTY_BOX kDropped_ = kLink_; #endif qsize_ = atoi(argv[4]); qsizeThresh_ = atoi(argv[5]); rate_ = atof(argv[6]); return (TCL_OK); } return (Queue::command(argc, argv)); } /* Receive a new packet. The flow rate is estimated, and the link's * fair rate (alpha_) is computed. If the packet's label is larger * than alpha_ then a dropping probability is computed, and the * newly-arriving packet is dropped with that probability. The packet * is also dropped if the maximum queue size is exceeded. */ void CSFQ::enque(Packet* pkt) { double now = Scheduler::instance().clock(); hdr_cmn* hdr = hdr_cmn::access(pkt); hdr_ip* hip = hdr_ip::access(pkt); /* (hdr_ip*)pkt->access(off_ip_); */ int pktSize = hdr->size() << 3; /* length of the packet in bits */ double pktLabel = hdr->label(); #ifdef SRCID int flowId = getId(hip->src()); #else int flowId = hip->flowid(); #endif /* * if this queue belongs to an edge node, estimate session rate * and label the packet */ if (edge_ == 1) { #ifdef CSFQ_RLMEX hdr->setlabel(estRate(1, pktSize, now)/fs_[1].weight_); #else hdr->setlabel(estRate(flowId, pktSize, now)/fs_[flowId].weight_); #endif pktLabel = hdr->label(); } #ifdef PENALTY_BOX /* * if the flow is monitored and its rate (i.e., the packet label) * exceeds PUNISH_THRESH times link's fair rate, then punish the * flow if the flow is punished but its rate is no larger than * GOOD_THRESH times link's fair rate, then remove flow from penalty * box. */ if (monitor(flowId, pktLabel)) { drop(pkt); return; } #endif #ifdef CSFQ_LOG logEstLabel(flowId, now, pktLabel); #endif #ifdef CSFQ_LOG logEstAlpha(now); logArvPacket(flowId, now, pktSize); #endif /* * check whether alpha_ has been initialized * alpha_ is set to 0. whenever qusizeCrt_ < qsizeThreshold_ * (see estAlpha method) */ if (alpha_ != 0) { if (alpha_/pktLabel < funifRand(0., 1.)) { /* drop packet prbabilistically */ dropPkt(flowId, pkt); /* * estimate fair rate (alpha_); call with last parameter 0 as * the packet is probabilistically dropped */ estAlpha(pktSize, pktLabel, now, 0); #ifdef CSFQ_LOG logDroppedPacket(flowId, now); #endif return; } /* forward the packet and relabel it */ if (alpha_/pktLabel < 1.0) hdr->setlabel(alpha_); } /* if buffer full, drop the packet */ if (qsizeCrt_ + pktSize > qsize_) { // forced drop dropPkt(flowId, pkt); #ifdef CSFQ_LOG logDroppedPacket1(flowId, now); #endif /* * call estAlpha with 1, as this packet would have been enqued * by CSFQ */ estAlpha(pktSize, pktLabel, now, 1); /* * decrease alpha_; the number of times alpha_ is decreased * During an interval of length kLink_ is bounded by kalpha_. * This is to avoid overcorrection. */ if (kalpha_-- >= 0) alpha_ *= 0.99; return; } /* enqueue the packet and estimate alpha_ */ qsizeCrt_ += pktSize; q_.enque(pkt); estAlpha(pktSize, pktLabel, now, 1); } Packet* CSFQ::deque() { Packet *pkt = q_.deque(); hdr_ip* hip; hdr_cmn* hdr; int pktSize; if (pkt) { hip = hdr_ip::access(pkt); hdr = hdr_cmn::access(pkt); pktSize = hdr->size() << 3; qsizeCrt_ -= pktSize; #ifdef CSFQ_LOG #ifdef SRCID logDptPacket(getId(hip->src()), Scheduler::instance().clock(), pktSize); #else logDptPacket(hip->flowid(), Scheduler::instance().clock(), pktSize); #endif #endif } return (pkt); } /* compute estimated flow rate by using exponentially averaging */ double CSFQ::estRate(int flowid, int pktSize, double arvTime) { double d = (arvTime - fs_[flowid].prevTime_)*1000000; double k = fs_[flowid].k_; if (d == 0.0) { fs_[flowid].size_ += pktSize; if (fs_[flowid].estRate_) return fs_[flowid].estRate_; else /* this is the first packet; just initialize the rate */ return (fs_[flowid].estRate_ = rateAlpha_/2); } else { pktSize += fs_[flowid].size_; fs_[flowid].size_ = 0; } fs_[flowid].prevTime_ = arvTime; fs_[flowid].estRate_ = (1. - exp(-d/k))*(double)pktSize/d + exp(-d/k)*fs_[flowid].estRate_; return fs_[flowid].estRate_; } /* estimate the link's alpha parameter */ void CSFQ::estAlpha(int pktSize, double pktLabel, double arvTime, int enqueue) { float d = (arvTime - lastArv_)*1000000., w, rate = rate_/1000000.; double k = kLink_/1000000.; // set lastArv_ to the arrival time of the first packet if (lastArv_ == 0.) lastArv_ = arvTime; // account for packets received simultaneously pktSize_ += pktSize; if (enqueue) pktSizeE_ += pktSize; if (arvTime == lastArv_) return; // estimate the aggreagte arrival rate (rateTotal_) and the // aggregate forwarded (rateAlpha_) rates w = exp(-d/kLink_); rateAlpha_ = (1 - w)*(double)pktSizeE_/d + w*rateAlpha_; rateTotal_ = (1 - w)*(double)pktSize_/d + w*rateTotal_; lastArv_ = arvTime; pktSize_ = pktSizeE_ = 0; // compute the initial value of alpha_ if (alpha_ == 0.) { if (qsizeCrt_ < qsizeThresh_) { if (tmpAlpha_ < pktLabel) tmpAlpha_ = pktLabel; return; } if (alpha_ < tmpAlpha_) alpha_ = tmpAlpha_; if (alpha_ == 0.) alpha_ = rate / 2.; // arbitrary initialization tmpAlpha_ = 0.; } // update alpha_ if (rate <= rateTotal_) { // link congested if (!congested_) { congested_ = 1; lArv_ = arvTime; kalpha_ = KALPHA; } else { if (arvTime < lArv_ + k) return; lArv_ = arvTime; alpha_ *= rate/rateAlpha_; if (rate < alpha_) alpha_ = rate; } } else { // (rate < rateTotal_) => link uncongested if (congested_) { congested_ = 0; lArv_ = arvTime; tmpAlpha_ = 0; } else { if (arvTime < lArv_ + k) { if (tmpAlpha_ < pktLabel) tmpAlpha_ = pktLabel; } else { alpha_ = tmpAlpha_; lArv_ = arvTime; if (qsizeCrt_ < qsizeThresh_) alpha_ = 0.; else tmpAlpha_ = 0.; } } } #ifdef CSFQ_LOG printf("|%d %f %f %f\n", id_, arvTime, rateAlpha_, rateTotal_); #endif } #ifdef SRCID /* get identifier from source address */ int CSFQ::getId(int src) { int i = src % MAXFLOWS; if (!hashid_[i]) { /* first packet of this flow */ fs_[i].weight_ = fs_[0].weight_; fs_[i].k_ = fs_[0].k_; hashid_[i] = src; } else if (hashid_[i] != src) panic1("Source addresses colision!\n"); return i; } #endif #ifdef PENALTY_BOX int CSFQ::isInHash(int idx, IdentHashTable *hashTable, int size) { return (hashTable[idx % size].valid_); } int CSFQ::insertInHash(int idx, double flowNormRate, IdentHashTable *hashTable, int size) { double now = Scheduler::instance().clock(); int i = idx % size; if (!hashTable[i].valid_) { hashTable[i].valid_ = TRUE; hashTable[i].estNormRate_ = flowNormRate; hashTable[i].prevTime_ = now; return TRUE; } else /* hash colision */ return FALSE; } void CSFQ::deleteFromHash(int idx, IdentHashTable *hashTable, int size) { hashTable[idx % size].valid_ = FALSE; } double CSFQ::updateNormRate(int idx, double pktLabel, IdentHashTable *hashTable, int size) { int i = idx % size; double u = pktLabel/alpha_; if (!hashTable[i].valid_) /* this is just a double-check */ return 0.; hashTable[i].estNormRate_ = (1 - BETA)*u + BETA*hashTable[i].estNormRate_; return hashTable[i].estNormRate_; } int CSFQ::recordDroppedPacket(int flowId) { droppedArray_[numDroppedPkts_] = flowId; numDroppedPkts_++; if (numDroppedPkts_ == DROPPED_ARRAY_SIZE) return TRUE; else return FALSE; } void CSFQ::choseToMonitor() { /* do a sort of last received packets */ /* * since usually the array size is no larger * than 100, for simplicity we do buble sort ;-) */ int i, j, count = 0, temp; int ids[DROPPED_ARRAY_SIZE]; int cnt[DROPPED_ARRAY_SIZE]; for (i = 0; i < DROPPED_ARRAY_SIZE - 1; i++) { for (j = i; j < DROPPED_ARRAY_SIZE - 1; j++) { if (droppedArray_[i] < droppedArray_[j]) { temp = droppedArray_[j]; droppedArray_[j] = droppedArray_[i]; droppedArray_[i] = temp; } } } /* now count the number of dropped packets for every flow */ for (i = 0; i < DROPPED_ARRAY_SIZE - 1; ) { ids[count] = droppedArray_[i]; cnt[count] = 1; count++; for (j = i; j < DROPPED_ARRAY_SIZE - 1; j++) { if (droppedArray_[i] == droppedArray_[j]) cnt[count - 1]++; else break; } i = j; } /* * now decide what to monitor: monitor flows that have a higher * dropping rate than the average */ for (i = count - 1; i >= 0; i--) { if (isInHash(ids[i], monitorTable_, MONITOR_TABLE_SIZE) || isInHash(ids[i], punishTable_, PUNISH_TABLE_SIZE)) continue; if (cnt[i] >= DROPPED_ARRAY_SIZE/count || !monitorTable_[ids[i] % MONITOR_TABLE_SIZE].valid_) { deleteFromHash(ids[i], monitorTable_, MONITOR_TABLE_SIZE); insertInHash(ids[i], 1., monitorTable_, MONITOR_TABLE_SIZE); } } } void CSFQ::punishFlow(int flowId) { double rate = monitorTable_[flowId % MONITOR_TABLE_SIZE].estNormRate_; deleteFromHash(flowId, monitorTable_, MONITOR_TABLE_SIZE); insertInHash(flowId, rate, punishTable_, PUNISH_TABLE_SIZE); } int CSFQ::monitor(int flowId, double pktLabel) { if (isInHash(flowId, punishTable_, PUNISH_TABLE_SIZE)) { double rate = updateNormRate(flowId, pktLabel, punishTable_, PUNISH_TABLE_SIZE); if (rate < GOOD_THRESH) punishTable_[flowId % PUNISH_TABLE_SIZE].valid_ = 0; return TRUE; } if (isInHash(flowId, monitorTable_, MONITOR_TABLE_SIZE)) { double rate = updateNormRate(flowId, pktLabel, monitorTable_, MONITOR_TABLE_SIZE); if (rate > PUNISH_THRESH) { punishFlow(flowId); } } return FALSE; } #endif void CSFQ::dropPkt(int flowId, Packet *pkt) { #ifdef PENALTY_BOX if (recordDroppedPacket(flowId)) { choseToMonitor(); numDroppedPkts_ = 0; } #endif drop(pkt); } void CSFQ::logDroppedPacket(int flowId, double ctime) { printf("@%d %d %f %d\n", flowId, id_, ctime, fs_[flowId].numDroped_++); } void CSFQ::logDroppedPacket1(int flowId, double ctime) { printf("1@%d %d %f %d\n", flowId, id_, ctime, fs_[flowId].numDroped_++); } void CSFQ::logArvPacket(int flowId, double ctime, int pktSize) { printf(">%d %d %f %d %d %d\n", flowId, id_, ctime, fs_[flowId].numArv_++, pktSize/8, qsizeCrt_); } void CSFQ::logDptPacket(int flowId, double ctime, int pktSize) { printf("<%d %d %f %d %d %d\n", flowId, id_, ctime, fs_[flowId].numDpt_++, pktSize/8, qsizeCrt_); } void CSFQ::logEstAlpha(double ctime) { printf("#%d %f %f %d\n", id_, ctime, alpha_, qsizeCrt_/8); } void CSFQ::logEstLabel(int flowId, double ctime, double label) { printf("=%d %d %f %f\n", flowId, id_, ctime, label); }