patch-2.4.20 linux-2.4.20/net/sunrpc/xprt.c

Next file: linux-2.4.20/net/unix/af_unix.c
Previous file: linux-2.4.20/net/sunrpc/xdr.c
Back to the patch index
Back to the overall index

diff -urN linux-2.4.19/net/sunrpc/xprt.c linux-2.4.20/net/sunrpc/xprt.c
@@ -67,8 +67,6 @@
 
 #include <asm/uaccess.h>
 
-extern spinlock_t rpc_queue_lock;
-
 /*
  * Local variables
  */
@@ -78,6 +76,8 @@
 # define RPCDBG_FACILITY	RPCDBG_XPRT
 #endif
 
+#define XPRT_MAX_BACKOFF	(8)
+
 /*
  * Local functions
  */
@@ -88,6 +88,7 @@
 static void	xprt_reconn_status(struct rpc_task *task);
 static struct socket *xprt_create_socket(int, struct rpc_timeout *);
 static int	xprt_bind_socket(struct rpc_xprt *, struct socket *);
+static int      __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
 
 #ifdef RPC_DEBUG_DATA
 /*
@@ -130,75 +131,74 @@
 }
 
 /*
- *	Adjust the iovec to move on 'n' bytes
- */
- 
-extern inline void
-xprt_move_iov(struct msghdr *msg, struct iovec *niv, unsigned amount)
-{
-	struct iovec *iv=msg->msg_iov;
-	int i;
-	
-	/*
-	 *	Eat any sent iovecs
-	 */
-	while (iv->iov_len <= amount) {
-		amount -= iv->iov_len;
-		iv++;
-		msg->msg_iovlen--;
-	}
-
-	/*
-	 *	And chew down the partial one
-	 */
-	niv[0].iov_len = iv->iov_len-amount;
-	niv[0].iov_base =((unsigned char *)iv->iov_base)+amount;
-	iv++;
-
-	/*
-	 *	And copy any others
-	 */
-	for(i = 1; i < msg->msg_iovlen; i++)
-		niv[i]=*iv++;
-
-	msg->msg_iov=niv;
-}
-
-/*
  * Serialize write access to sockets, in order to prevent different
  * requests from interfering with each other.
  * Also prevents TCP socket reconnections from colliding with writes.
  */
 static int
-xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
+__xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
 {
-	int retval;
-	spin_lock_bh(&xprt->sock_lock);
-	if (!xprt->snd_task)
-		xprt->snd_task = task;
-	else if (xprt->snd_task != task) {
-		dprintk("RPC: %4d TCP write queue full (task %d)\n",
-			task->tk_pid, xprt->snd_task->tk_pid);
+	if (!xprt->snd_task) {
+		if (xprt->nocong || __xprt_get_cong(xprt, task))
+			xprt->snd_task = task;
+	}
+	if (xprt->snd_task != task) {
+		dprintk("RPC: %4d TCP write queue full\n", task->tk_pid);
 		task->tk_timeout = 0;
 		task->tk_status = -EAGAIN;
-		rpc_sleep_on(&xprt->sending, task, NULL, NULL);
+		if (task->tk_rqstp && task->tk_rqstp->rq_nresend)
+			rpc_sleep_on(&xprt->resend, task, NULL, NULL);
+		else
+			rpc_sleep_on(&xprt->sending, task, NULL, NULL);
 	}
-	retval = xprt->snd_task == task;
+	return xprt->snd_task == task;
+}
+
+static inline int
+xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
+{
+	int retval;
+	spin_lock_bh(&xprt->sock_lock);
+	retval = __xprt_lock_write(xprt, task);
 	spin_unlock_bh(&xprt->sock_lock);
 	return retval;
 }
 
+static void
+__xprt_lock_write_next(struct rpc_xprt *xprt)
+{
+	struct rpc_task *task;
+
+	if (xprt->snd_task)
+		return;
+	task = rpc_wake_up_next(&xprt->resend);
+	if (!task) {
+		if (!xprt->nocong && RPCXPRT_CONGESTED(xprt))
+			return;
+		task = rpc_wake_up_next(&xprt->sending);
+		if (!task)
+			return;
+	}
+	if (xprt->nocong || __xprt_get_cong(xprt, task))
+		xprt->snd_task = task;
+}
+
 /*
  * Releases the socket for use by other requests.
  */
 static void
+__xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
+{
+	if (xprt->snd_task == task)
+		xprt->snd_task = NULL;
+	__xprt_lock_write_next(xprt);
+}
+
+static inline void
 xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
 {
 	spin_lock_bh(&xprt->sock_lock);
-	if (xprt->snd_task == task) {
-		xprt->snd_task = NULL;
-		rpc_wake_up_next(&xprt->sending);
-	}
+	__xprt_release_write(xprt, task);
 	spin_unlock_bh(&xprt->sock_lock);
 }
 
@@ -210,13 +210,11 @@
 {
 	struct socket	*sock = xprt->sock;
 	struct msghdr	msg;
+	struct xdr_buf	*xdr = &req->rq_snd_buf;
+	struct iovec	niv[MAX_IOVEC];
+	unsigned int	niov, slen, skip;
 	mm_segment_t	oldfs;
 	int		result;
-	int		slen = req->rq_slen - req->rq_bytes_sent;
-	struct iovec	niv[MAX_IOVEC];
-
-	if (slen <= 0)
-		return 0;
 
 	if (!sock)
 		return -ENOTCONN;
@@ -225,22 +223,26 @@
 				req->rq_svec->iov_base,
 				req->rq_svec->iov_len);
 
+	/* Dont repeat bytes */
+	skip = req->rq_bytes_sent;
+	slen = xdr->len - skip;
+	niov = xdr_kmap(niv, xdr, skip);
+
 	msg.msg_flags   = MSG_DONTWAIT|MSG_NOSIGNAL;
-	msg.msg_iov	= req->rq_svec;
-	msg.msg_iovlen	= req->rq_snr;
+	msg.msg_iov	= niv;
+	msg.msg_iovlen	= niov;
 	msg.msg_name	= (struct sockaddr *) &xprt->addr;
 	msg.msg_namelen = sizeof(xprt->addr);
 	msg.msg_control = NULL;
 	msg.msg_controllen = 0;
 
-	/* Dont repeat bytes */
-	if (req->rq_bytes_sent)
-		xprt_move_iov(&msg, niv, req->rq_bytes_sent);
-
 	oldfs = get_fs(); set_fs(get_ds());
+	clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
 	result = sock_sendmsg(sock, &msg, slen);
 	set_fs(oldfs);
 
+	xdr_kunmap(xdr, skip);
+
 	dprintk("RPC:      xprt_sendmsg(%d) = %d\n", slen, result);
 
 	if (result >= 0)
@@ -251,10 +253,7 @@
 		/* When the server has died, an ICMP port unreachable message
 		 * prompts ECONNREFUSED.
 		 */
-		break;
 	case -EAGAIN:
-		if (test_bit(SOCK_NOSPACE, &sock->flags))
-			result = -ENOMEM;
 		break;
 	case -ENOTCONN:
 	case -EPIPE:
@@ -269,6 +268,40 @@
 }
 
 /*
+ * Van Jacobson congestion avoidance. Check if the congestion window
+ * overflowed. Put the task to sleep if this is the case.
+ */
+static int
+__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
+{
+	struct rpc_rqst *req = task->tk_rqstp;
+
+	if (req->rq_cong)
+		return 1;
+	dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n",
+			task->tk_pid, xprt->cong, xprt->cwnd);
+	if (RPCXPRT_CONGESTED(xprt))
+		return 0;
+	req->rq_cong = 1;
+	xprt->cong += RPC_CWNDSCALE;
+	return 1;
+}
+
+/*
+ * Adjust the congestion window, and wake up the next task
+ * that has been sleeping due to congestion
+ */
+static void
+__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
+{
+	if (!req->rq_cong)
+		return;
+	req->rq_cong = 0;
+	xprt->cong -= RPC_CWNDSCALE;
+	__xprt_lock_write_next(xprt);
+}
+
+/*
  * Adjust RPC congestion window
  * We use a time-smoothed congestion estimator to avoid heavy oscillation.
  */
@@ -277,40 +310,22 @@
 {
 	unsigned long	cwnd;
 
-	if (xprt->nocong)
-		return;
-	/*
-	 * Note: we're in a BH context
-	 */
-	spin_lock(&xprt->xprt_lock);
 	cwnd = xprt->cwnd;
-	if (result >= 0) {
-		if (xprt->cong < cwnd || time_before(jiffies, xprt->congtime))
-			goto out;
+	if (result >= 0 && cwnd <= xprt->cong) {
 		/* The (cwnd >> 1) term makes sure
 		 * the result gets rounded properly. */
 		cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
 		if (cwnd > RPC_MAXCWND)
 			cwnd = RPC_MAXCWND;
-		else
-			pprintk("RPC: %lu %ld cwnd\n", jiffies, cwnd);
-		xprt->congtime = jiffies + ((cwnd * HZ) << 2) / RPC_CWNDSCALE;
-		dprintk("RPC:      cong %08lx, cwnd was %08lx, now %08lx, "
-			"time %ld ms\n", xprt->cong, xprt->cwnd, cwnd,
-			(xprt->congtime-jiffies)*1000/HZ);
+		__xprt_lock_write_next(xprt);
 	} else if (result == -ETIMEDOUT) {
-		if ((cwnd >>= 1) < RPC_CWNDSCALE)
+		cwnd >>= 1;
+		if (cwnd < RPC_CWNDSCALE)
 			cwnd = RPC_CWNDSCALE;
-		xprt->congtime = jiffies + ((cwnd * HZ) << 3) / RPC_CWNDSCALE;
-		dprintk("RPC:      cong %ld, cwnd was %ld, now %ld, "
-			"time %ld ms\n", xprt->cong, xprt->cwnd, cwnd,
-			(xprt->congtime-jiffies)*1000/HZ);
-		pprintk("RPC: %lu %ld cwnd\n", jiffies, cwnd);
 	}
-
+	dprintk("RPC:      cong %ld, cwnd was %ld, now %ld\n",
+			xprt->cong, xprt->cwnd, cwnd);
 	xprt->cwnd = cwnd;
- out:
-	spin_unlock(&xprt->xprt_lock);
 }
 
 /*
@@ -370,7 +385,7 @@
 
 	sock_release(sock);
 	/*
-	 *	TCP doesnt require the rpciod now - other things may
+	 *	TCP doesn't require the rpciod now - other things may
 	 *	but rpciod handles that not us.
 	 */
 	if(xprt->stream)
@@ -460,9 +475,9 @@
 		if (inet->state != TCP_ESTABLISHED) {
 			task->tk_timeout = xprt->timeout.to_maxval;
 			/* if the socket is already closing, delay 5 secs */
-			if ((1<<inet->state) & ~(TCP_SYN_SENT|TCP_SYN_RECV))
+			if ((1<<inet->state) & ~(TCPF_SYN_SENT|TCPF_SYN_RECV))
 				task->tk_timeout = 5*HZ;
-			rpc_sleep_on(&xprt->sending, task, xprt_reconn_status, NULL);
+			rpc_sleep_on(&xprt->pending, task, xprt_reconn_status, NULL);
 			release_sock(inet);
 			return;
 		}
@@ -498,30 +513,16 @@
 static inline struct rpc_rqst *
 xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
 {
-	struct rpc_task	*head, *task;
-	struct rpc_rqst	*req;
-	int		safe = 0;
+	struct list_head *pos;
+	struct rpc_rqst	*req = NULL;
 
-	spin_lock_bh(&rpc_queue_lock);
-	if ((head = xprt->pending.task) != NULL) {
-		task = head;
-		do {
-			if ((req = task->tk_rqstp) && req->rq_xid == xid)
-				goto out;
-			task = task->tk_next;
-			if (++safe > 100) {
-				printk("xprt_lookup_rqst: loop in Q!\n");
-				goto out_bad;
-			}
-		} while (task != head);
+	list_for_each(pos, &xprt->recv) {
+		struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list);
+		if (entry->rq_xid == xid) {
+			req = entry;
+			break;
+		}
 	}
-	dprintk("RPC:      unknown XID %08x in reply.\n", xid);
- out_bad:
-	req = NULL;
- out:
-	if (req && !__rpc_lock_task(req->rq_task))
-		req = NULL;
-	spin_unlock_bh(&rpc_queue_lock);
 	return req;
 }
 
@@ -529,13 +530,23 @@
  * Complete reply received.
  * The TCP code relies on us to remove the request from xprt->pending.
  */
-static inline void
+static void
 xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
 {
 	struct rpc_task	*task = req->rq_task;
+	struct rpc_clnt *clnt = task->tk_client;
 
 	/* Adjust congestion window */
-	xprt_adjust_cwnd(xprt, copied);
+	if (!xprt->nocong) {
+		xprt_adjust_cwnd(xprt, copied);
+		__xprt_put_cong(xprt, req);
+	       	if (!req->rq_nresend) {
+			int timer = rpcproc_timer(clnt, task->tk_msg.rpc_proc);
+			if (timer)
+				rpc_update_rtt(&clnt->cl_rtt, timer, (long)jiffies - req->rq_xtime);
+		}
+		rpc_clear_timeo(&clnt->cl_rtt);
+	}
 
 #ifdef RPC_PROFILE
 	/* Profile only reads for now */
@@ -557,66 +568,68 @@
 #endif
 
 	dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);
-	task->tk_status = copied;
-	req->rq_received = 1;
+	req->rq_received = copied;
+	list_del_init(&req->rq_list);
 
 	/* ... and wake up the process. */
 	rpc_wake_up_task(task);
 	return;
 }
 
+static size_t
+skb_read_bits(skb_reader_t *desc, void *to, size_t len)
+{
+	if (len > desc->count)
+		len = desc->count;
+	skb_copy_bits(desc->skb, desc->offset, to, len);
+	desc->count -= len;
+	desc->offset += len;
+	return len;
+}
+
+static size_t
+skb_read_and_csum_bits(skb_reader_t *desc, void *to, size_t len)
+{
+	unsigned int csum2, pos;
+
+	if (len > desc->count)
+		len = desc->count;
+	pos = desc->offset;
+	csum2 = skb_copy_and_csum_bits(desc->skb, pos, to, len, 0);
+	desc->csum = csum_block_add(desc->csum, csum2, pos);
+	desc->count -= len;
+	desc->offset += len;
+	return len;
+}
+
 /*
  * We have set things up such that we perform the checksum of the UDP
  * packet in parallel with the copies into the RPC client iovec.  -DaveM
  */
-static int csum_partial_copy_to_page_cache(struct iovec *iov,
-					   struct sk_buff *skb,
-					   int copied)
-{
-	int offset = sizeof(struct udphdr);
-	__u8 *cur_ptr = iov->iov_base;
-	__kernel_size_t cur_len = iov->iov_len;
-	unsigned int csum = skb->csum;
-	int need_csum = (skb->ip_summed != CHECKSUM_UNNECESSARY);
-	int slack = skb->len - copied - sizeof(struct udphdr);
-
-	if (need_csum)
-		csum = csum_partial(skb->data, sizeof(struct udphdr), csum);
-	while (copied > 0) {
-		if (cur_len) {
-			int to_move = cur_len;
-			if (to_move > copied)
-				to_move = copied;
-			if (need_csum) {
-				unsigned int csum2;
-
-				csum2 = skb_copy_and_csum_bits(skb, offset,
-							       cur_ptr,
-							       to_move, 0);
-				csum = csum_block_add(csum, csum2, offset);
-			} else
-				skb_copy_bits(skb, offset, cur_ptr, to_move);
-			offset += to_move;
-			copied -= to_move;
-			cur_ptr += to_move;
-			cur_len -= to_move;
-		}
-		if (cur_len <= 0) {
-			iov++;
-			cur_len = iov->iov_len;
-			cur_ptr = iov->iov_base;
-		}
-	}
-	if (need_csum) {
-		if (slack > 0) {
-			unsigned int csum2;
+static int
+csum_partial_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
+{
+	skb_reader_t desc;
 
-			csum2 = skb_checksum(skb, offset, slack, 0);
-			csum = csum_block_add(csum, csum2, offset);
-		}
-		if ((unsigned short)csum_fold(csum))
-			return -1;
+	desc.skb = skb;
+	desc.offset = sizeof(struct udphdr);
+	desc.count = skb->len - desc.offset;
+
+	if (skb->ip_summed == CHECKSUM_UNNECESSARY)
+		goto no_checksum;
+
+	desc.csum = csum_partial(skb->data, desc.offset, skb->csum);
+	xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_and_csum_bits);
+	if (desc.offset != skb->len) {
+		unsigned int csum2;
+		csum2 = skb_checksum(skb, desc.offset, skb->len - desc.offset, 0);
+		desc.csum = csum_block_add(desc.csum, csum2, desc.offset);
 	}
+	if ((unsigned short)csum_fold(desc.csum))
+		return -1;
+	return 0;
+no_checksum:
+	xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_bits);
 	return 0;
 }
 
@@ -654,9 +667,10 @@
 	}
 
 	/* Look up and lock the request corresponding to the given XID */
+	spin_lock(&xprt->sock_lock);
 	rovr = xprt_lookup_rqst(xprt, *(u32 *) (skb->h.raw + sizeof(struct udphdr)));
 	if (!rovr)
-		goto dropit;
+		goto out_unlock;
 	task = rovr->rq_task;
 
 	dprintk("RPC: %4d received reply\n", task->tk_pid);
@@ -667,7 +681,7 @@
 		copied = repsize;
 
 	/* Suck it into the iovec, verify checksum if not done by hw. */
-	if (csum_partial_copy_to_page_cache(rovr->rq_rvec, skb, copied))
+	if (csum_partial_copy_to_xdr(&rovr->rq_rcv_buf, skb))
 		goto out_unlock;
 
 	/* Something worked... */
@@ -676,8 +690,7 @@
 	xprt_complete_rqst(xprt, rovr, copied);
 
  out_unlock:
-	rpc_unlock_task(task);
-
+	spin_unlock(&xprt->sock_lock);
  dropit:
 	skb_free_datagram(sk, skb);
  out:
@@ -685,12 +698,6 @@
 		wake_up_interruptible(sk->sleep);
 }
 
-typedef struct {
-	struct sk_buff *skb;
-	unsigned offset;
-	size_t count;
-} skb_reader_t;
-
 /*
  * Copy from an skb into memory and shrink the skb.
  */
@@ -781,50 +788,43 @@
 tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
 {
 	struct rpc_rqst *req;
-	struct iovec *iov;
-	char *p;
-	unsigned long skip;
-	size_t len, used;
-	int n;
+	struct xdr_buf *rcvbuf;
+	size_t len;
 
 	/* Find and lock the request corresponding to this xid */
+	spin_lock(&xprt->sock_lock);
 	req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
 	if (!req) {
 		xprt->tcp_flags &= ~XPRT_COPY_DATA;
 		dprintk("RPC:      XID %08x request not found!\n",
 				xprt->tcp_xid);
+		spin_unlock(&xprt->sock_lock);
 		return;
 	}
-	skip = xprt->tcp_copied;
-	iov = req->rq_rvec;
-	for (n = req->rq_rnr; n != 0; n--, iov++) {
-		if (skip >= iov->iov_len) {
-			skip -= iov->iov_len;
-			continue;
-		}
-		p = iov->iov_base;
-		len = iov->iov_len;
-		if (skip) {
-			p += skip;
-			len -= skip;
-			skip = 0;
-		}
-		if (xprt->tcp_offset + len > xprt->tcp_reclen)
-			len = xprt->tcp_reclen - xprt->tcp_offset;
-		used = tcp_copy_data(desc, p, len);
-		xprt->tcp_copied += used;
-		xprt->tcp_offset += used;
-		if (used != len)
-			break;
-		if (xprt->tcp_copied == req->rq_rlen) {
+
+	rcvbuf = &req->rq_rcv_buf;
+	len = desc->count;
+	if (len > xprt->tcp_reclen - xprt->tcp_offset) {
+		skb_reader_t my_desc;
+
+		len = xprt->tcp_reclen - xprt->tcp_offset;
+		memcpy(&my_desc, desc, sizeof(my_desc));
+		my_desc.count = len;
+		xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
+					  &my_desc, tcp_copy_data);
+		desc->count -= len;
+		desc->offset += len;
+	} else
+		xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
+					  desc, tcp_copy_data);
+	xprt->tcp_copied += len;
+	xprt->tcp_offset += len;
+
+	if (xprt->tcp_copied == req->rq_rlen)
+		xprt->tcp_flags &= ~XPRT_COPY_DATA;
+	else if (xprt->tcp_offset == xprt->tcp_reclen) {
+		if (xprt->tcp_flags & XPRT_LAST_FRAG)
 			xprt->tcp_flags &= ~XPRT_COPY_DATA;
-			break;
-		}
-		if (xprt->tcp_offset == xprt->tcp_reclen) {
-			if (xprt->tcp_flags & XPRT_LAST_FRAG)
-				xprt->tcp_flags &= ~XPRT_COPY_DATA;
-			break;
-		}
 	}
 
 	if (!(xprt->tcp_flags & XPRT_COPY_DATA)) {
@@ -832,7 +832,7 @@
 				req->rq_task->tk_pid);
 		xprt_complete_rqst(xprt, req, xprt->tcp_copied);
 	}
-	rpc_unlock_task(req->rq_task); 
+	spin_unlock(&xprt->sock_lock);
 	tcp_check_recm(xprt);
 }
 
@@ -932,7 +932,7 @@
 		xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID;
 
 		spin_lock(&xprt->sock_lock);
-		if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
+		if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)
 			rpc_wake_up_task(xprt->snd_task);
 		spin_unlock(&xprt->sock_lock);
 		break;
@@ -949,8 +949,10 @@
 }
 
 /*
- * The following 2 routines allow a task to sleep while socket memory is
- * low.
+ * Called when more output buffer space is available for this socket.
+ * We try not to wake our writers until they can make "significant"
+ * progress, otherwise we'll waste resources thrashing sock_sendmsg
+ * with a bunch of small requests.
  */
 static void
 xprt_write_space(struct sock *sk)
@@ -964,22 +966,40 @@
 		return;
 
 	/* Wait until we have enough socket memory */
-	if (!sock_writeable(sk))
+	if (xprt->stream) {
+		/* from net/ipv4/tcp.c:tcp_write_space */
+		if (tcp_wspace(sk) < tcp_min_write_space(sk))
+			return;
+	} else {
+		/* from net/core/sock.c:sock_def_write_space */
+		if (!sock_writeable(sk))
+			return;
+	}
+
+	if (!test_and_clear_bit(SOCK_NOSPACE, &sock->flags))
 		return;
 
-	if (!xprt_test_and_set_wspace(xprt)) {
-		spin_lock(&xprt->sock_lock);
-		if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
-			rpc_wake_up_task(xprt->snd_task);
-		spin_unlock(&xprt->sock_lock);
-	}
+	spin_lock_bh(&xprt->sock_lock);
+	if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)
+		rpc_wake_up_task(xprt->snd_task);
+	spin_unlock_bh(&xprt->sock_lock);
+	if (sk->sleep && waitqueue_active(sk->sleep))
+		wake_up_interruptible(sk->sleep);
+}
 
-	if (test_bit(SOCK_NOSPACE, &sock->flags)) {
-		if (sk->sleep && waitqueue_active(sk->sleep)) {
-			clear_bit(SOCK_NOSPACE, &sock->flags);
-			wake_up_interruptible(sk->sleep);
-		}
-	}
+/*
+ * Exponential backoff for UDP retries
+ */
+static inline int
+xprt_expbackoff(struct rpc_task *task, struct rpc_rqst *req)
+{
+	int backoff;
+
+	req->rq_ntimeo++;
+	backoff = min(rpc_ntimeo(&task->tk_client->cl_rtt), XPRT_MAX_BACKOFF);
+	if (req->rq_ntimeo < (1 << backoff))
+		return 1;
+	return 0;
 }
 
 /*
@@ -989,16 +1009,31 @@
 xprt_timer(struct rpc_task *task)
 {
 	struct rpc_rqst	*req = task->tk_rqstp;
+	struct rpc_xprt *xprt = req->rq_xprt;
 
-	if (req)
-		xprt_adjust_cwnd(task->tk_xprt, -ETIMEDOUT);
+	spin_lock(&xprt->sock_lock);
+	if (req->rq_received)
+		goto out;
+
+	if (!xprt->nocong) {
+		if (xprt_expbackoff(task, req)) {
+			rpc_add_timer(task, xprt_timer);
+			goto out_unlock;
+		}
+		rpc_inc_timeo(&task->tk_client->cl_rtt);
+		xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT);
+	}
+	req->rq_nresend++;
 
 	dprintk("RPC: %4d xprt_timer (%s request)\n",
 		task->tk_pid, req ? "pending" : "backlogged");
 
 	task->tk_status  = -ETIMEDOUT;
+out:
 	task->tk_timeout = 0;
 	rpc_wake_up_task(task);
+out_unlock:
+	spin_unlock(&xprt->sock_lock);
 }
 
 /*
@@ -1034,37 +1069,35 @@
 		*marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));
 	}
 
-	if (!xprt_lock_write(xprt, task))
+	spin_lock_bh(&xprt->sock_lock);
+	if (!__xprt_lock_write(xprt, task)) {
+		spin_unlock_bh(&xprt->sock_lock);
 		return;
+	}
+	if (list_empty(&req->rq_list)) {
+		list_add_tail(&req->rq_list, &xprt->recv);
+		req->rq_received = 0;
+	}
+	spin_unlock_bh(&xprt->sock_lock);
 
-#ifdef RPC_PROFILE
-	req->rq_xtime = jiffies;
-#endif
 	do_xprt_transmit(task);
 }
 
 static void
 do_xprt_transmit(struct rpc_task *task)
 {
+	struct rpc_clnt *clnt = task->tk_client;
 	struct rpc_rqst	*req = task->tk_rqstp;
 	struct rpc_xprt	*xprt = req->rq_xprt;
 	int status, retry = 0;
 
 
-	/* For fast networks/servers we have to put the request on
-	 * the pending list now:
-	 * Note that we don't want the task timing out during the
-	 * call to xprt_sendmsg(), so we initially disable the timeout,
-	 * and then reset it later...
-	 */
-	xprt_receive(task);
-
 	/* Continue transmitting the packet/record. We must be careful
 	 * to cope with writespace callbacks arriving _after_ we have
 	 * called xprt_sendmsg().
 	 */
 	while (1) {
-		xprt_clear_wspace(xprt);
+		req->rq_xtime = jiffies;
 		status = xprt_sendmsg(xprt, req);
 
 		if (status < 0)
@@ -1078,7 +1111,7 @@
 		} else {
 			if (status >= req->rq_slen)
 				goto out_receive;
-			status = -ENOMEM;
+			status = -EAGAIN;
 			break;
 		}
 
@@ -1090,31 +1123,28 @@
 		if (retry++ > 50)
 			break;
 	}
-	rpc_unlock_task(task);
 
 	/* Note: at this point, task->tk_sleeping has not yet been set,
 	 *	 hence there is no danger of the waking up task being put on
 	 *	 schedq, and being picked up by a parallel run of rpciod().
 	 */
-	rpc_wake_up_task(task);
-	if (!RPC_IS_RUNNING(task))
-		goto out_release;
 	if (req->rq_received)
 		goto out_release;
 
 	task->tk_status = status;
 
 	switch (status) {
-	case -ENOMEM:
-		/* Protect against (udp|tcp)_write_space */
-		spin_lock_bh(&xprt->sock_lock);
-		if (!xprt_wspace(xprt)) {
-			task->tk_timeout = req->rq_timeout.to_current;
-			rpc_sleep_on(&xprt->sending, task, NULL, NULL);
-		}
-		spin_unlock_bh(&xprt->sock_lock);
-		return;
 	case -EAGAIN:
+		if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) {
+			/* Protect against races with xprt_write_space */
+			spin_lock_bh(&xprt->sock_lock);
+			if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) {
+				task->tk_timeout = req->rq_timeout.to_current;
+				rpc_sleep_on(&xprt->pending, task, NULL, NULL);
+			}
+			spin_unlock_bh(&xprt->sock_lock);
+			return;
+		}
 		/* Keep holding the socket if it is blocked */
 		rpc_delay(task, HZ>>4);
 		return;
@@ -1126,35 +1156,26 @@
 		if (xprt->stream)
 			xprt_disconnect(xprt);
 		req->rq_bytes_sent = 0;
-		goto out_release;
 	}
-
+ out_release:
+	xprt_release_write(xprt, task);
+	return;
  out_receive:
 	dprintk("RPC: %4d xmit complete\n", task->tk_pid);
 	/* Set the task's receive timeout value */
-	task->tk_timeout = req->rq_timeout.to_current;
-	rpc_add_timer(task, xprt_timer);
-	rpc_unlock_task(task);
- out_release:
-	xprt_release_write(xprt, task);
-}
-
-/*
- * Queue the task for a reply to our call.
- * When the callback is invoked, the congestion window should have
- * been updated already.
- */
-void
-xprt_receive(struct rpc_task *task)
-{
-	struct rpc_rqst	*req = task->tk_rqstp;
-	struct rpc_xprt	*xprt = req->rq_xprt;
-
-	dprintk("RPC: %4d xprt_receive\n", task->tk_pid);
-
-	req->rq_received = 0;
-	task->tk_timeout = 0;
-	rpc_sleep_locked(&xprt->pending, task, NULL, NULL);
+	if (!xprt->nocong) {
+		task->tk_timeout = rpc_calc_rto(&clnt->cl_rtt,
+				rpcproc_timer(clnt, task->tk_msg.rpc_proc));
+		req->rq_ntimeo = 0;
+		if (task->tk_timeout > req->rq_timeout.to_maxval)
+			task->tk_timeout = req->rq_timeout.to_maxval;
+	} else
+		task->tk_timeout = req->rq_timeout.to_current;
+	spin_lock_bh(&xprt->sock_lock);
+	if (!req->rq_received)
+		rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer);
+	__xprt_release_write(xprt, task);
+	spin_unlock_bh(&xprt->sock_lock);
 }
 
 /*
@@ -1169,9 +1190,7 @@
 	if (task->tk_rqstp)
 		return 0;
 
-	dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n",
-				task->tk_pid, xprt->cong, xprt->cwnd);
-	spin_lock_bh(&xprt->xprt_lock);
+	spin_lock(&xprt->xprt_lock);
 	xprt_reserve_status(task);
 	if (task->tk_rqstp) {
 		task->tk_timeout = 0;
@@ -1182,7 +1201,7 @@
 		task->tk_status = -EAGAIN;
 		rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
 	}
-	spin_unlock_bh(&xprt->xprt_lock);
+	spin_unlock(&xprt->xprt_lock);
 	dprintk("RPC: %4d xprt_reserve returns %d\n",
 				task->tk_pid, task->tk_status);
 	return task->tk_status;
@@ -1204,18 +1223,13 @@
 	} else if (task->tk_rqstp) {
 		/* We've already been given a request slot: NOP */
 	} else {
-		if (RPCXPRT_CONGESTED(xprt) || !(req = xprt->free))
+		if (!(req = xprt->free))
 			goto out_nofree;
-		/* OK: There's room for us. Grab a free slot and bump
-		 * congestion value */
+		/* OK: There's room for us. Grab a free slot */
 		xprt->free     = req->rq_next;
 		req->rq_next   = NULL;
-		xprt->cong    += RPC_CWNDSCALE;
 		task->tk_rqstp = req;
 		xprt_request_init(task, xprt);
-
-		if (xprt->free)
-			xprt_clear_backlog(xprt);
 	}
 
 	return;
@@ -1244,6 +1258,7 @@
 	req->rq_xid     = xid++;
 	if (!xid)
 		xid++;
+	INIT_LIST_HEAD(&req->rq_list);
 }
 
 /*
@@ -1255,27 +1270,25 @@
 	struct rpc_xprt	*xprt = task->tk_xprt;
 	struct rpc_rqst	*req;
 
-	if (xprt->snd_task == task) {
-		if (xprt->stream)
-			xprt_disconnect(xprt);
-		xprt_release_write(xprt, task);
-	}
 	if (!(req = task->tk_rqstp))
 		return;
+	spin_lock_bh(&xprt->sock_lock);
+	__xprt_release_write(xprt, task);
+	__xprt_put_cong(xprt, req);
+	if (!list_empty(&req->rq_list))
+		list_del(&req->rq_list);
+	spin_unlock_bh(&xprt->sock_lock);
 	task->tk_rqstp = NULL;
 	memset(req, 0, sizeof(*req));	/* mark unused */
 
 	dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
 
-	spin_lock_bh(&xprt->xprt_lock);
+	spin_lock(&xprt->xprt_lock);
 	req->rq_next = xprt->free;
 	xprt->free   = req;
 
-	/* Decrease congestion value. */
-	xprt->cong -= RPC_CWNDSCALE;
-
 	xprt_clear_backlog(xprt);
-	spin_unlock_bh(&xprt->xprt_lock);
+	spin_unlock(&xprt->xprt_lock);
 }
 
 /*
@@ -1331,11 +1344,12 @@
 		xprt->nocong = 1;
 	} else
 		xprt->cwnd = RPC_INITCWND;
-	xprt->congtime = jiffies;
 	spin_lock_init(&xprt->sock_lock);
 	spin_lock_init(&xprt->xprt_lock);
 	init_waitqueue_head(&xprt->cong_wait);
 
+	INIT_LIST_HEAD(&xprt->recv);
+
 	/* Set timeout parameters */
 	if (to) {
 		xprt->timeout = *to;
@@ -1344,9 +1358,10 @@
 	} else
 		xprt_default_timeout(&xprt->timeout, xprt->prot);
 
-	xprt->pending = RPC_INIT_WAITQ("xprt_pending");
-	xprt->sending = RPC_INIT_WAITQ("xprt_sending");
-	xprt->backlog = RPC_INIT_WAITQ("xprt_backlog");
+	INIT_RPC_WAITQ(&xprt->pending, "xprt_pending");
+	INIT_RPC_WAITQ(&xprt->sending, "xprt_sending");
+	INIT_RPC_WAITQ(&xprt->resend, "xprt_resend");
+	INIT_RPC_WAITQ(&xprt->backlog, "xprt_backlog");
 
 	/* initialize free list */
 	for (i = 0, req = xprt->slot; i < RPC_MAXREQS-1; i++, req++)
@@ -1420,6 +1435,27 @@
 }
 
 /*
+ * Set socket buffer length
+ */
+void
+xprt_sock_setbufsize(struct rpc_xprt *xprt)
+{
+	struct sock *sk = xprt->inet;
+
+	if (xprt->stream)
+		return;
+	if (xprt->rcvsize) {
+		sk->userlocks |= SOCK_RCVBUF_LOCK;
+		sk->rcvbuf = xprt->rcvsize * RPC_MAXCONG * 2;
+	}
+	if (xprt->sndsize) {
+		sk->userlocks |= SOCK_SNDBUF_LOCK;
+		sk->sndbuf = xprt->sndsize * RPC_MAXCONG * 2;
+		sk->write_space(sk);
+	}
+}
+
+/*
  * Create a client socket given the protocol and peer address.
  */
 static struct socket *
@@ -1477,6 +1513,7 @@
 {
 	xprt->shutdown = 1;
 	rpc_wake_up(&xprt->sending);
+	rpc_wake_up(&xprt->resend);
 	rpc_wake_up(&xprt->pending);
 	rpc_wake_up(&xprt->backlog);
 	if (waitqueue_active(&xprt->cong_wait))
@@ -1488,8 +1525,6 @@
  */
 int
 xprt_clear_backlog(struct rpc_xprt *xprt) {
-	if (RPCXPRT_CONGESTED(xprt))
-		return 0;
 	rpc_wake_up_next(&xprt->backlog);
 	if (waitqueue_active(&xprt->cong_wait))
 		wake_up(&xprt->cong_wait);

FUNET's LINUX-ADM group, linux-adm@nic.funet.fi
TCL-scripts by Sam Shen (who was at: slshen@lbl.gov)