patch-2.4.20 linux-2.4.20/net/sunrpc/svcsock.c

Next file: linux-2.4.20/net/sunrpc/timer.c
Previous file: linux-2.4.20/net/sunrpc/svc.c
Back to the patch index
Back to the overall index

diff -urN linux-2.4.19/net/sunrpc/svcsock.c linux-2.4.20/net/sunrpc/svcsock.c
@@ -44,10 +44,19 @@
 
 /* SMP locking strategy:
  *
- * 	svc_sock->sk_lock and svc_serv->sv_lock protect their
- *	respective structures.
+ * 	svc_serv->sv_lock protects most stuff for that service.
+ *
+ *	Some flags can be set to certain values at any time
+ *	providing that certain rules are followed:
+ *
+ *	SK_BUSY  can be set to 0 at any time.  
+ *		svc_sock_enqueue must be called afterwards
+ *	SK_CONN, SK_DATA, can be set or cleared at any time.
+ *		after a set, svc_sock_enqueue must be called.	
+ *		after a clear, the socket must be read/accepted
+ *		 if this succeeds, it must be set again.
+ *	SK_CLOSE can set at any time. It is never cleared.
  *
- *	Antideadlock ordering is sk_lock --> sv_lock.
  */
 
 #define RPCDBG_FACILITY	RPCDBG_SVCSOCK
@@ -62,11 +71,14 @@
 
 /*
  * Queue up an idle server thread.  Must have serv->sv_lock held.
+ * Note: this is really a stack rather than a queue, so that we only
+ * use as many different threads as we need, and the rest don't polute
+ * the cache.
  */
 static inline void
 svc_serv_enqueue(struct svc_serv *serv, struct svc_rqst *rqstp)
 {
-	rpc_append_list(&serv->sv_threads, rqstp);
+	list_add(&rqstp->rq_list, &serv->sv_threads);
 }
 
 /*
@@ -75,7 +87,7 @@
 static inline void
 svc_serv_dequeue(struct svc_serv *serv, struct svc_rqst *rqstp)
 {
-	rpc_remove_list(&serv->sv_threads, rqstp);
+	list_del(&rqstp->rq_list);
 }
 
 /*
@@ -98,7 +110,6 @@
  * Queue up a socket with data pending. If there are idle nfsd
  * processes, wake 'em up.
  *
- * This must be called with svsk->sk_lock held.
  */
 static void
 svc_sock_enqueue(struct svc_sock *svsk)
@@ -106,26 +117,44 @@
 	struct svc_serv	*serv = svsk->sk_server;
 	struct svc_rqst	*rqstp;
 
-	/* NOTE: Local BH is already disabled by our caller. */
-	spin_lock(&serv->sv_lock);
+	if (!(svsk->sk_flags &
+	      ( (1<<SK_CONN)|(1<<SK_DATA)|(1<<SK_CLOSE)) ))
+		return;
+
+	spin_lock_bh(&serv->sv_lock);
 
-	if (serv->sv_threads && serv->sv_sockets)
+	if (!list_empty(&serv->sv_threads) && 
+	    !list_empty(&serv->sv_sockets))
 		printk(KERN_ERR
 			"svc_sock_enqueue: threads and sockets both waiting??\n");
 
-	if (svsk->sk_busy) {
+	if (test_bit(SK_BUSY, &svsk->sk_flags)) {
 		/* Don't enqueue socket while daemon is receiving */
 		dprintk("svc: socket %p busy, not enqueued\n", svsk->sk_sk);
 		goto out_unlock;
 	}
 
+	if (((svsk->sk_reserved + serv->sv_bufsz)*2
+	     > sock_wspace(svsk->sk_sk))
+	    && !test_bit(SK_CLOSE, &svsk->sk_flags)
+	    && !test_bit(SK_CONN, &svsk->sk_flags)) {
+		/* Don't enqueue while not enough space for reply */
+		dprintk("svc: socket %p  no space, %d*2 > %ld, not enqueued\n",
+			svsk->sk_sk, svsk->sk_reserved+serv->sv_bufsz,
+			sock_wspace(svsk->sk_sk));
+		goto out_unlock;
+	}
+
 	/* Mark socket as busy. It will remain in this state until the
 	 * server has processed all pending data and put the socket back
 	 * on the idle list.
 	 */
-	svsk->sk_busy = 1;
+	set_bit(SK_BUSY, &svsk->sk_flags);
 
-	if ((rqstp = serv->sv_threads) != NULL) {
+	if (!list_empty(&serv->sv_threads)) {
+		rqstp = list_entry(serv->sv_threads.next,
+				   struct svc_rqst,
+				   rq_list);
 		dprintk("svc: socket %p served by daemon %p\n",
 			svsk->sk_sk, rqstp);
 		svc_serv_dequeue(serv, rqstp);
@@ -135,15 +164,17 @@
 				rqstp, rqstp->rq_sock);
 		rqstp->rq_sock = svsk;
 		svsk->sk_inuse++;
+		rqstp->rq_reserved = serv->sv_bufsz;
+		svsk->sk_reserved += rqstp->rq_reserved;
 		wake_up(&rqstp->rq_wait);
 	} else {
 		dprintk("svc: socket %p put into queue\n", svsk->sk_sk);
-		rpc_append_list(&serv->sv_sockets, svsk);
-		svsk->sk_qued = 1;
+		list_add_tail(&svsk->sk_ready, &serv->sv_sockets);
+		set_bit(SK_QUED, &svsk->sk_flags);
 	}
 
 out_unlock:
-	spin_unlock(&serv->sv_lock);
+	spin_unlock_bh(&serv->sv_lock);
 }
 
 /*
@@ -154,71 +185,69 @@
 {
 	struct svc_sock	*svsk;
 
-	if ((svsk = serv->sv_sockets) != NULL)
-		rpc_remove_list(&serv->sv_sockets, svsk);
+	if (list_empty(&serv->sv_sockets))
+		return NULL;
 
-	if (svsk) {
-		dprintk("svc: socket %p dequeued, inuse=%d\n",
-			svsk->sk_sk, svsk->sk_inuse);
-		svsk->sk_qued = 0;
-	}
+	svsk = list_entry(serv->sv_sockets.next,
+			  struct svc_sock, sk_ready);
+	list_del(&svsk->sk_ready);
+
+	dprintk("svc: socket %p dequeued, inuse=%d\n",
+		svsk->sk_sk, svsk->sk_inuse);
+	clear_bit(SK_QUED, &svsk->sk_flags);
 
 	return svsk;
 }
 
 /*
- * Having read count bytes from a socket, check whether it
+ * Having read something from a socket, check whether it
  * needs to be re-enqueued.
+ * Note: SK_DATA only gets cleared when a read-attempt finds
+ * no (or insufficient) data.
  */
 static inline void
-svc_sock_received(struct svc_sock *svsk, int count)
+svc_sock_received(struct svc_sock *svsk)
 {
-	spin_lock_bh(&svsk->sk_lock);
-	if ((svsk->sk_data -= count) < 0) {
-		printk(KERN_NOTICE "svc: sk_data negative!\n");
-		svsk->sk_data = 0;
-	}
-	svsk->sk_rqstp = NULL; /* XXX */
-	svsk->sk_busy = 0;
-	if (svsk->sk_conn || svsk->sk_data || svsk->sk_close) {
-		dprintk("svc: socket %p re-enqueued after receive\n",
-						svsk->sk_sk);
-		svc_sock_enqueue(svsk);
-	}
-	spin_unlock_bh(&svsk->sk_lock);
+	clear_bit(SK_BUSY, &svsk->sk_flags);
+	svc_sock_enqueue(svsk);
 }
 
-/*
- * Dequeue a new connection.
+
+/**
+ * svc_reserve - change the space reserved for the reply to a request.
+ * @rqstp:  The request in question
+ * @space: new max space to reserve
+ *
+ * Each request reserves some space on the output queue of the socket
+ * to make sure the reply fits.  This function reduces that reserved
+ * space to be the amount of space used already, plus @space.
+ *
  */
-static inline void
-svc_sock_accepted(struct svc_sock *svsk)
+void svc_reserve(struct svc_rqst *rqstp, int space)
 {
-	spin_lock_bh(&svsk->sk_lock);
-        svsk->sk_busy = 0;
-        svsk->sk_conn--;
-        if (svsk->sk_conn || svsk->sk_data || svsk->sk_close) {
-                dprintk("svc: socket %p re-enqueued after accept\n",
-						svsk->sk_sk);
-                svc_sock_enqueue(svsk);
-        }
-	spin_unlock_bh(&svsk->sk_lock);
+	space += rqstp->rq_resbuf.len<<2;
+
+	if (space < rqstp->rq_reserved) {
+		struct svc_sock *svsk = rqstp->rq_sock;
+		spin_lock_bh(&svsk->sk_server->sv_lock);
+		svsk->sk_reserved -= (rqstp->rq_reserved - space);
+		rqstp->rq_reserved = space;
+		spin_unlock_bh(&svsk->sk_server->sv_lock);
+
+		svc_sock_enqueue(svsk);
+	}
 }
 
 /*
  * Release a socket after use.
  */
 static inline void
-svc_sock_release(struct svc_rqst *rqstp)
+svc_sock_put(struct svc_sock *svsk)
 {
-	struct svc_sock	*svsk = rqstp->rq_sock;
-	struct svc_serv	*serv = svsk->sk_server;
-
-	svc_release_skb(rqstp);
-	rqstp->rq_sock = NULL;
+	struct svc_serv *serv = svsk->sk_server;
 
 	spin_lock_bh(&serv->sv_lock);
-	if (!--(svsk->sk_inuse) && svsk->sk_dead) {
+	if (!--(svsk->sk_inuse) && test_bit(SK_DEAD, &svsk->sk_flags)) {
 		spin_unlock_bh(&serv->sv_lock);
 		dprintk("svc: releasing dead socket\n");
 		sock_release(svsk->sk_sock);
@@ -228,6 +257,31 @@
 		spin_unlock_bh(&serv->sv_lock);
 }
 
+static void
+svc_sock_release(struct svc_rqst *rqstp)
+{
+	struct svc_sock	*svsk = rqstp->rq_sock;
+
+	svc_release_skb(rqstp);
+
+	/* Reset response buffer and release
+	 * the reservation.
+	 * But first, check that enough space was reserved
+	 * for the reply, otherwise we have a bug!
+	 */
+	if ((rqstp->rq_resbuf.len<<2) >  rqstp->rq_reserved)
+		printk(KERN_ERR "RPC request reserved %d but used %d\n",
+		       rqstp->rq_reserved,
+		       rqstp->rq_resbuf.len<<2);
+
+	rqstp->rq_resbuf.buf = rqstp->rq_resbuf.base;
+	rqstp->rq_resbuf.len = 0;
+	svc_reserve(rqstp, 0);
+	rqstp->rq_sock = NULL;
+
+	svc_sock_put(svsk);
+}
+
 /*
  * External function to wake up a server waiting for data
  */
@@ -237,7 +291,10 @@
 	struct svc_rqst	*rqstp;
 
 	spin_lock_bh(&serv->sv_lock);
-	if ((rqstp = serv->sv_threads) != NULL) {
+	if (!list_empty(&serv->sv_threads)) {
+		rqstp = list_entry(serv->sv_threads.next,
+				   struct svc_rqst,
+				   rq_list);
 		dprintk("svc: daemon %p woken up.\n", rqstp);
 		/*
 		svc_serv_dequeue(serv, rqstp);
@@ -270,7 +327,13 @@
 	msg.msg_control = NULL;
 	msg.msg_controllen = 0;
 
-	msg.msg_flags	= MSG_DONTWAIT;
+	/* This was MSG_DONTWAIT, but I now want it to wait.
+	 * The only thing that it would wait for is memory and
+	 * if we are fairly low on memory, then we aren't likely
+	 * to make much progress anyway.
+	 * sk->sndtimeo is set to 30seconds just in case.
+	 */
+	msg.msg_flags	= 0;
 
 	oldfs = get_fs(); set_fs(KERNEL_DS);
 	len = sock_sendmsg(sock, &msg, buflen);
@@ -340,6 +403,32 @@
 }
 
 /*
+ * Set socket snd and rcv buffer lengths
+ */
+static inline void
+svc_sock_setbufsize(struct socket *sock, unsigned int snd, unsigned int rcv)
+{
+#if 0
+	mm_segment_t	oldfs;
+	oldfs = get_fs(); set_fs(KERNEL_DS);
+	sock_setsockopt(sock, SOL_SOCKET, SO_SNDBUF,
+			(char*)&snd, sizeof(snd));
+	sock_setsockopt(sock, SOL_SOCKET, SO_RCVBUF,
+			(char*)&rcv, sizeof(rcv));
+#else
+	/* sock_setsockopt limits use to sysctl_?mem_max,
+	 * which isn't acceptable.  Until that is made conditional
+	 * on not having CAP_SYS_RESOURCE or similar, we go direct...
+	 * DaveM said I could!
+	 */
+	lock_sock(sock->sk);
+	sock->sk->sndbuf = snd * 2;
+	sock->sk->rcvbuf = rcv * 2;
+	sock->sk->userlocks |= SOCK_SNDBUF_LOCK|SOCK_RCVBUF_LOCK;
+	release_sock(sock->sk);
+#endif
+}
+/*
  * INET callback when data has been received on the socket.
  */
 static void
@@ -350,17 +439,36 @@
 	if (!svsk)
 		goto out;
 	dprintk("svc: socket %p(inet %p), count=%d, busy=%d\n",
-		svsk, sk, count, svsk->sk_busy);
-	spin_lock_bh(&svsk->sk_lock);
-	svsk->sk_data = 1;
+		svsk, sk, count, test_bit(SK_BUSY, &svsk->sk_flags));
+	set_bit(SK_DATA, &svsk->sk_flags);
 	svc_sock_enqueue(svsk);
-	spin_unlock_bh(&svsk->sk_lock);
  out:
 	if (sk->sleep && waitqueue_active(sk->sleep))
 		wake_up_interruptible(sk->sleep);
 }
 
 /*
+ * INET callback when space is newly available on the socket.
+ */
+static void
+svc_write_space(struct sock *sk)
+{
+	struct svc_sock	*svsk = (struct svc_sock *)(sk->user_data);
+
+	if (svsk) {
+		dprintk("svc: socket %p(inet %p), write_space busy=%d\n",
+			svsk, sk, test_bit(SK_BUSY, &svsk->sk_flags));
+		svc_sock_enqueue(svsk);
+	}
+
+	if (sk->sleep && waitqueue_active(sk->sleep)) {
+		printk(KERN_WARNING "RPC svc_write_space: some sleeping on %p\n",
+		       svsk);
+		wake_up_interruptible(sk->sleep);
+	}
+}
+
+/*
  * Receive a datagram from a UDP socket.
  */
 static int
@@ -372,20 +480,31 @@
 	u32		*data;
 	int		err, len;
 
-	svsk->sk_data = 0;
+	if (test_and_clear_bit(SK_CHNGBUF, &svsk->sk_flags))
+		/* udp sockets need large rcvbuf as all pending
+		 * requests are still in that buffer.  sndbuf must
+		 * also be large enough that there is enough space
+		 * for one reply per thread.
+		 */
+		svc_sock_setbufsize(svsk->sk_sock,
+				    (serv->sv_nrthreads+3)* serv->sv_bufsz,
+				    (serv->sv_nrthreads+3)* serv->sv_bufsz);
+
+	clear_bit(SK_DATA, &svsk->sk_flags);
 	while ((skb = skb_recv_datagram(svsk->sk_sk, 0, 1, &err)) == NULL) {
-		svc_sock_received(svsk, 0);
+		svc_sock_received(svsk);
 		if (err == -EAGAIN)
 			return err;
 		/* possibly an icmp error */
 		dprintk("svc: recvfrom returned error %d\n", -err);
 	}
+	set_bit(SK_DATA, &svsk->sk_flags); /* there may be more data... */
 
 	/* Sorry. */
 	if (skb_is_nonlinear(skb)) {
 		if (skb_linearize(skb, GFP_KERNEL) != 0) {
 			kfree_skb(skb);
-			svc_sock_received(svsk, 0);
+			svc_sock_received(svsk);
 			return 0;
 		}
 	}
@@ -393,13 +512,11 @@
 	if (skb->ip_summed != CHECKSUM_UNNECESSARY) {
 		if ((unsigned short)csum_fold(skb_checksum(skb, 0, skb->len, skb->csum))) {
 			skb_free_datagram(svsk->sk_sk, skb);
-			svc_sock_received(svsk, 0);
+			svc_sock_received(svsk);
 			return 0;
 		}
 	}
 
-	/* There may be more data */
-	svsk->sk_data = 1;
 
 	len  = skb->len - sizeof(struct udphdr);
 	data = (u32 *) (skb->data + sizeof(struct udphdr));
@@ -421,7 +538,7 @@
 
 	/* One down, maybe more to go... */
 	svsk->sk_sk->stamp = skb->stamp;
-	svc_sock_received(svsk, 0);
+	svc_sock_received(svsk);
 
 	return len;
 }
@@ -444,9 +561,6 @@
 	if (error == -ECONNREFUSED)
 		/* ICMP error on earlier request. */
 		error = svc_sendto(rqstp, bufp->iov, bufp->nriov);
-	else if (error == -EAGAIN)
-		/* Ignore and wait for re-xmit */
-		error = 0;
 
 	return error;
 }
@@ -455,9 +569,20 @@
 svc_udp_init(struct svc_sock *svsk)
 {
 	svsk->sk_sk->data_ready = svc_udp_data_ready;
+	svsk->sk_sk->write_space = svc_write_space;
 	svsk->sk_recvfrom = svc_udp_recvfrom;
 	svsk->sk_sendto = svc_udp_sendto;
 
+	/* initialise setting must have enough space to
+	 * receive and respond to one request.  
+	 * svc_udp_recvfrom will re-adjust if necessary
+	 */
+	svc_sock_setbufsize(svsk->sk_sock,
+			    3 * svsk->sk_server->sv_bufsz,
+			    3 * svsk->sk_server->sv_bufsz);
+
+	set_bit(SK_CHNGBUF, &svsk->sk_flags);
+
 	return 0;
 }
 
@@ -481,10 +606,8 @@
 		printk("svc: socket %p: no user data\n", sk);
 		goto out;
 	}
-	spin_lock_bh(&svsk->sk_lock);
-	svsk->sk_conn++;
+	set_bit(SK_CONN, &svsk->sk_flags);
 	svc_sock_enqueue(svsk);
-	spin_unlock_bh(&svsk->sk_lock);
  out:
 	if (sk->sleep && waitqueue_active(sk->sleep))
 		wake_up_interruptible_all(sk->sleep);
@@ -505,10 +628,8 @@
 		printk("svc: socket %p: no user data\n", sk);
 		goto out;
 	}
-	spin_lock_bh(&svsk->sk_lock);
-	svsk->sk_close = 1;
+	set_bit(SK_CLOSE, &svsk->sk_flags);
 	svc_sock_enqueue(svsk);
-	spin_unlock_bh(&svsk->sk_lock);
  out:
 	if (sk->sleep && waitqueue_active(sk->sleep))
 		wake_up_interruptible_all(sk->sleep);
@@ -523,10 +644,8 @@
 			sk, sk->user_data);
 	if (!(svsk = (struct svc_sock *)(sk->user_data)))
 		goto out;
-	spin_lock_bh(&svsk->sk_lock);
-	svsk->sk_data++;
+	set_bit(SK_DATA, &svsk->sk_flags);
 	svc_sock_enqueue(svsk);
-	spin_unlock_bh(&svsk->sk_lock);
  out:
 	if (sk->sleep && waitqueue_active(sk->sleep))
 		wake_up_interruptible(sk->sleep);
@@ -559,12 +678,15 @@
 	newsock->type = sock->type;
 	newsock->ops = ops = sock->ops;
 
+	clear_bit(SK_CONN, &svsk->sk_flags);
 	if ((err = ops->accept(sock, newsock, O_NONBLOCK)) < 0) {
-		if (net_ratelimit())
+		if (err != -EAGAIN && net_ratelimit())
 			printk(KERN_WARNING "%s: accept failed (err %d)!\n",
 				   serv->sv_name, -err);
 		goto failed;		/* aborted connection or whatever */
 	}
+	set_bit(SK_CONN, &svsk->sk_flags);
+	svc_sock_enqueue(svsk);
 
 	slen = sizeof(sin);
 	err = ops->getname(newsock, (struct sockaddr *) &sin, &slen, 1);
@@ -580,11 +702,10 @@
 	 * tell us anything. For now just warn about unpriv connections.
 	 */
 	if (ntohs(sin.sin_port) >= 1024) {
-		if (net_ratelimit())
-			printk(KERN_WARNING
-				   "%s: connect from unprivileged port: %u.%u.%u.%u:%d\n",
-				   serv->sv_name, 
-				   NIPQUAD(sin.sin_addr.s_addr), ntohs(sin.sin_port));
+		dprintk(KERN_WARNING
+			"%s: connect from unprivileged port: %u.%u.%u.%u:%d\n",
+			serv->sv_name, 
+			NIPQUAD(sin.sin_addr.s_addr), ntohs(sin.sin_port));
 	}
 
 	dprintk("%s: connect from %u.%u.%u.%u:%04x\n", serv->sv_name,
@@ -593,14 +714,45 @@
 	if (!(newsvsk = svc_setup_socket(serv, newsock, &err, 0)))
 		goto failed;
 
+	/* make sure that a write doesn't block forever when
+	 * low on memory
+	 */
+	newsock->sk->sndtimeo = HZ*30;
+
 	/* Precharge. Data may have arrived on the socket before we
 	 * installed the data_ready callback. 
 	 */
-	spin_lock_bh(&newsvsk->sk_lock);
-	newsvsk->sk_data = 1;
-	newsvsk->sk_temp = 1;
+	set_bit(SK_DATA, &newsvsk->sk_flags);
 	svc_sock_enqueue(newsvsk);
-	spin_unlock_bh(&newsvsk->sk_lock);
+
+	/* make sure that we don't have too many active connections.
+	 * If we have, something must be dropped.
+	 * We randomly choose between newest and oldest (in terms
+	 * of recent activity) and drop it.
+	 */
+	if (serv->sv_tmpcnt > (serv->sv_nrthreads+3)*10) {
+		struct svc_sock *svsk = NULL;
+		spin_lock_bh(&serv->sv_lock);
+		if (!list_empty(&serv->sv_tempsocks)) {
+			if (net_random()&1)
+				svsk = list_entry(serv->sv_tempsocks.prev,
+						  struct svc_sock,
+						  sk_list);
+			else
+				svsk = list_entry(serv->sv_tempsocks.next,
+						  struct svc_sock,
+						  sk_list);
+			set_bit(SK_CLOSE, &svsk->sk_flags);
+			svsk->sk_inuse ++;
+		}
+		spin_unlock_bh(&serv->sv_lock);
+
+		if (svsk) {
+			svc_sock_enqueue(svsk);
+			svc_sock_put(svsk);
+		}
+
+	}
 
 	if (serv->sv_stats)
 		serv->sv_stats->nettcpconn++;
@@ -621,23 +773,38 @@
 	struct svc_sock	*svsk = rqstp->rq_sock;
 	struct svc_serv	*serv = svsk->sk_server;
 	struct svc_buf	*bufp = &rqstp->rq_argbuf;
-	int		len, ready, used;
+	int		len;
 
 	dprintk("svc: tcp_recv %p data %d conn %d close %d\n",
-			svsk, svsk->sk_data, svsk->sk_conn, svsk->sk_close);
+		svsk, test_bit(SK_DATA, &svsk->sk_flags),
+		test_bit(SK_CONN, &svsk->sk_flags),
+		test_bit(SK_CLOSE, &svsk->sk_flags));
 
-	if (svsk->sk_close) {
+	if (test_bit(SK_CLOSE, &svsk->sk_flags)) {
 		svc_delete_socket(svsk);
 		return 0;
 	}
 
-	if (svsk->sk_conn) {
+	if (test_bit(SK_CONN, &svsk->sk_flags)) {
 		svc_tcp_accept(svsk);
-		svc_sock_accepted(svsk);
+		svc_sock_received(svsk);
 		return 0;
 	}
 
-	ready = svsk->sk_data;
+	if (test_and_clear_bit(SK_CHNGBUF, &svsk->sk_flags))
+		/* sndbuf needs to have room for one request
+		 * per thread, otherwise we can stall even when the
+		 * network isn't a bottleneck.
+		 * rcvbuf just needs to be able to hold a few requests.
+		 * Normally they will be removed from the queue 
+		 * as soon as a complete request arrives.
+		 */
+		svc_sock_setbufsize(svsk->sk_sock,
+				    (serv->sv_nrthreads+3) *
+				    serv->sv_bufsz,
+				    3 * serv->sv_bufsz);
+
+	clear_bit(SK_DATA, &svsk->sk_flags);
 
 	/* Receive data. If we haven't got the record length yet, get
 	 * the next four bytes. Otherwise try to gobble up as much as
@@ -652,6 +819,8 @@
 		if ((len = svc_recvfrom(rqstp, &iov, 1, want)) < 0)
 			goto error;
 		svsk->sk_tcplen += len;
+		if (len < want)
+			return 0;
 
 		svsk->sk_reclen = ntohl(svsk->sk_reclen);
 		if (!(svsk->sk_reclen & 0x80000000)) {
@@ -660,13 +829,17 @@
 			 *  bit set in the fragment length header.
 			 *  But apparently no known nfs clients send fragmented
 			 *  records. */
-			/* FIXME: shutdown socket */
-			printk(KERN_NOTICE "RPC: bad TCP reclen %08lx",
+			printk(KERN_NOTICE "RPC: bad TCP reclen 0x%08lx (non-terminal)\n",
 			       (unsigned long) svsk->sk_reclen);
-			return -EIO;
+			goto err_delete;
 		}
 		svsk->sk_reclen &= 0x7fffffff;
 		dprintk("svc: TCP record, %d bytes\n", svsk->sk_reclen);
+		if (svsk->sk_reclen > (bufp->buflen<<2)) {
+			printk(KERN_NOTICE "RPC: bad TCP reclen 0x%08lx (large)\n",
+			       (unsigned long) svsk->sk_reclen);
+			goto err_delete;
+		}
 	}
 
 	/* Check whether enough data is available */
@@ -675,21 +848,12 @@
 		goto error;
 
 	if (len < svsk->sk_reclen) {
-		/* FIXME: if sk_reclen > window-size, then we will
-		 * never be able to receive the record, so should
-		 * shutdown the connection
-		 */
 		dprintk("svc: incomplete TCP record (%d of %d)\n",
 			len, svsk->sk_reclen);
-		svc_sock_received(svsk, ready);
+		svc_sock_received(svsk);
 		return -EAGAIN;	/* record not complete */
 	}
-	/* if we think there is only one more record to read, but
-	 * it is bigger than we expect, then two records must have arrived
-	 * together, so pretend we aren't using the record.. */
-	if (len > svsk->sk_reclen && ready == 1)
-		used = 0;
-	else	used = 1;
+	set_bit(SK_DATA, &svsk->sk_flags);
 
 	/* Frob argbuf */
 	bufp->iov[0].iov_base += 4;
@@ -716,20 +880,24 @@
 	svsk->sk_reclen = 0;
 	svsk->sk_tcplen = 0;
 
-	svc_sock_received(svsk, used);
+	svc_sock_received(svsk);
 	if (serv->sv_stats)
 		serv->sv_stats->nettcpcnt++;
 
 	return len;
 
-error:
+ err_delete:
+	svc_delete_socket(svsk);
+	return -EAGAIN;
+
+ error:
 	if (len == -EAGAIN) {
 		dprintk("RPC: TCP recvfrom got EAGAIN\n");
-		svc_sock_received(svsk, ready); /* Clear data ready */
+		svc_sock_received(svsk);
 	} else {
 		printk(KERN_NOTICE "%s: recvfrom returned errno %d\n",
 					svsk->sk_server->sv_name, -len);
-		svc_sock_received(svsk, 0);
+		svc_sock_received(svsk);
 	}
 
 	return len;
@@ -737,8 +905,6 @@
 
 /*
  * Send out data on TCP socket.
- * FIXME: Make the sendto call non-blocking in order not to hang
- * a daemon on a dead client. Requires write queue maintenance.
  */
 static int
 svc_tcp_sendto(struct svc_rqst *rqstp)
@@ -756,13 +922,11 @@
 
 	sent = svc_sendto(rqstp, bufp->iov, bufp->nriov);
 	if (sent != bufp->len<<2) {
-		printk(KERN_NOTICE "rpc-srv/tcp: %s: sent only %d bytes of %d - should shutdown socket\n",
+		printk(KERN_NOTICE "rpc-srv/tcp: %s: sent only %d bytes of %d - shutting down socket\n",
 		       rqstp->rq_sock->sk_server->sv_name,
 		       sent, bufp->len << 2);
-		/* FIXME: should shutdown the socket, or allocate more memort
-		 * or wait and try again or something.  Otherwise
-		 * client will get confused
-		 */
+		svc_delete_socket(rqstp->rq_sock);
+		sent = -EAGAIN;
 	}
 	return sent;
 }
@@ -782,21 +946,58 @@
 		dprintk("setting up TCP socket for reading\n");
 		sk->state_change = svc_tcp_state_change;
 		sk->data_ready = svc_tcp_data_ready;
+		sk->write_space = svc_write_space;
 
 		svsk->sk_reclen = 0;
 		svsk->sk_tcplen = 0;
+
+		/* initialise setting must have enough space to
+		 * receive and respond to one request.  
+		 * svc_tcp_recvfrom will re-adjust if necessary
+		 */
+		svc_sock_setbufsize(svsk->sk_sock,
+				    3 * svsk->sk_server->sv_bufsz,
+				    3 * svsk->sk_server->sv_bufsz);
+
+		set_bit(SK_CHNGBUF, &svsk->sk_flags);
 	}
 
 	return 0;
 }
 
+void
+svc_sock_update_bufs(struct svc_serv *serv)
+{
+	/*
+	 * The number of server threads has changed. 
+	 * flag all socket to the snd/rcv buffer sizes
+	 * updated.
+	 * We don't just do it, as the locking is rather
+	 * awkward at this point
+	 */
+	struct list_head *le;
+
+	spin_lock_bh(&serv->sv_lock);
+	list_for_each(le, &serv->sv_permsocks) {
+		struct svc_sock *svsk = 
+			list_entry(le, struct svc_sock, sk_list);
+		set_bit(SK_CHNGBUF, &svsk->sk_flags);
+	}
+	list_for_each(le, &serv->sv_tempsocks) {
+		struct svc_sock *svsk =
+			list_entry(le, struct svc_sock, sk_list);
+		set_bit(SK_CHNGBUF, &svsk->sk_flags);
+	}
+	spin_unlock_bh(&serv->sv_lock);
+}
+
 /*
  * Receive the next request on any socket.
  */
 int
 svc_recv(struct svc_serv *serv, struct svc_rqst *rqstp, long timeout)
 {
-	struct svc_sock		*svsk;
+	struct svc_sock		*svsk =NULL;
 	int			len;
 	DECLARE_WAITQUEUE(wait, current);
 
@@ -820,9 +1021,28 @@
 		return -EINTR;
 
 	spin_lock_bh(&serv->sv_lock);
-	if ((svsk = svc_sock_dequeue(serv)) != NULL) {
+	if (!list_empty(&serv->sv_tempsocks)) {
+		svsk = list_entry(serv->sv_tempsocks.next,
+				  struct svc_sock, sk_list);
+		/* apparently the "standard" is that clients close
+		 * idle connections after 5 minutes, servers after
+		 * 6 minutes
+		 *   http://www.connectathon.org/talks96/nfstcp.pdf 
+		 */
+		if (CURRENT_TIME - svsk->sk_lastrecv < 6*60
+		    || test_bit(SK_BUSY, &svsk->sk_flags))
+			svsk = NULL;
+	}
+	if (svsk) {
+		set_bit(SK_BUSY, &svsk->sk_flags);
+		set_bit(SK_CLOSE, &svsk->sk_flags);
+		rqstp->rq_sock = svsk;
+		svsk->sk_inuse++;
+	} else if ((svsk = svc_sock_dequeue(serv)) != NULL) {
 		rqstp->rq_sock = svsk;
 		svsk->sk_inuse++;
+		rqstp->rq_reserved = serv->sv_bufsz;	
+		svsk->sk_reserved += rqstp->rq_reserved;
 	} else {
 		/* No data pending. Go to sleep */
 		svc_serv_enqueue(serv, rqstp);
@@ -859,6 +1079,14 @@
 		svc_sock_release(rqstp);
 		return -EAGAIN;
 	}
+	svsk->sk_lastrecv = CURRENT_TIME;
+	if (test_bit(SK_TEMP, &svsk->sk_flags)) {
+		/* push active sockets to end of list */
+		spin_lock_bh(&serv->sv_lock);
+		list_del(&svsk->sk_list);
+		list_add_tail(&svsk->sk_list, &serv->sv_tempsocks);
+		spin_unlock_bh(&serv->sv_lock);
+	}
 
 	rqstp->rq_secure  = ntohs(rqstp->rq_addr.sin_port) < 1024;
 	rqstp->rq_userset = 0;
@@ -935,8 +1163,9 @@
 	svsk->sk_sk = inet;
 	svsk->sk_ostate = inet->state_change;
 	svsk->sk_odata = inet->data_ready;
+	svsk->sk_owspace = inet->write_space;
 	svsk->sk_server = serv;
-	spin_lock_init(&svsk->sk_lock);
+	svsk->sk_lastrecv = CURRENT_TIME;
 
 	/* Initialize the socket */
 	if (sock->type == SOCK_DGRAM)
@@ -956,9 +1185,16 @@
 		return NULL;
 	}
 
+
 	spin_lock_bh(&serv->sv_lock);
-	svsk->sk_list = serv->sv_allsocks;
-	serv->sv_allsocks = svsk;
+	if (!pmap_register) {
+		set_bit(SK_TEMP, &svsk->sk_flags);
+		list_add(&svsk->sk_list, &serv->sv_tempsocks);
+		serv->sv_tmpcnt++;
+	} else {
+		clear_bit(SK_TEMP, &svsk->sk_flags);
+		list_add(&svsk->sk_list, &serv->sv_permsocks);
+	}
 	spin_unlock_bh(&serv->sv_lock);
 
 	dprintk("svc: svc_setup_socket created %p (inet %p)\n",
@@ -993,6 +1229,7 @@
 		return error;
 
 	if (sin != NULL) {
+		sock->sk->reuse = 1; /* allow address reuse */
 		error = sock->ops->bind(sock, (struct sockaddr *) sin,
 						sizeof(*sin));
 		if (error < 0)
@@ -1000,7 +1237,7 @@
 	}
 
 	if (protocol == IPPROTO_TCP) {
-		if ((error = sock->ops->listen(sock, 5)) < 0)
+		if ((error = sock->ops->listen(sock, 64)) < 0)
 			goto bummer;
 	}
 
@@ -1019,7 +1256,6 @@
 void
 svc_delete_socket(struct svc_sock *svsk)
 {
-	struct svc_sock	**rsk;
 	struct svc_serv	*serv;
 	struct sock	*sk;
 
@@ -1030,23 +1266,18 @@
 
 	sk->state_change = svsk->sk_ostate;
 	sk->data_ready = svsk->sk_odata;
+	sk->write_space = svsk->sk_owspace;
 
 	spin_lock_bh(&serv->sv_lock);
 
-	for (rsk = &serv->sv_allsocks; *rsk; rsk = &(*rsk)->sk_list) {
-		if (*rsk == svsk)
-			break;
-	}
-	if (!*rsk) {
-		spin_unlock_bh(&serv->sv_lock);
-		return;
-	}
-	*rsk = svsk->sk_list;
-	if (svsk->sk_qued)
-		rpc_remove_list(&serv->sv_sockets, svsk);
+	list_del(&svsk->sk_list);
+	if (test_bit(SK_TEMP, &svsk->sk_flags))
+		serv->sv_tmpcnt--;
+	if (test_bit(SK_QUED, &svsk->sk_flags))
+		list_del(&svsk->sk_ready);
 
 
-	svsk->sk_dead = 1;
+	set_bit(SK_DEAD, &svsk->sk_flags);
 
 	if (!svsk->sk_inuse) {
 		spin_unlock_bh(&serv->sv_lock);
@@ -1054,7 +1285,7 @@
 		kfree(svsk);
 	} else {
 		spin_unlock_bh(&serv->sv_lock);
-		printk(KERN_NOTICE "svc: server socket destroy delayed\n");
+		dprintk(KERN_NOTICE "svc: server socket destroy delayed\n");
 		/* svsk->sk_server = NULL; */
 	}
 }

FUNET's LINUX-ADM group, linux-adm@nic.funet.fi
TCL-scripts by Sam Shen (who was at: slshen@lbl.gov)