patch-1.3.82 linux/fs/nfs/rpcsock.c

Next file: linux/fs/nfs/sock.c
Previous file: linux/fs/nfs/proc.c
Back to the patch index
Back to the overall index

diff -u --recursive --new-file v1.3.81/linux/fs/nfs/rpcsock.c linux/fs/nfs/rpcsock.c
@@ -6,17 +6,27 @@
  *  this:
  *
  *  -	When a process places a call, it allocates a request slot if
- *	one is available. Otherwise, it sleeps on the backlog queue.
- *  -	The first process on the receive queue waits for the next RPC reply,
+ *	one is available. Otherwise, it sleeps on the backlog queue
+ *	(rpc_reserve).
+ *  -	Then, the message is transmitted via rpc_send (exported by name of
+ *	rpc_transmit).
+ *  -	Finally, the process waits for the call to complete (rpc_doio):
+ *	The first process on the receive queue waits for the next RPC packet,
  *	and peeks at the XID. If it finds a matching request, it receives
  *	the datagram on behalf of that process and wakes it up. Otherwise,
  *	the datagram is discarded.
  *  -	If the process having received the datagram was the first one on
  *	the receive queue, it wakes up the next one to listen for replies.
- *  -	It then removes itself from the request queue. If there are more
- *	callers waiting on the backlog queue, they are woken up, too.
+ *  -	It then removes itself from the request queue (rpc_release).
+ *	If there are more callers waiting on the backlog queue, they are
+ *	woken up, too.
  *
- *  Copyright (C) 1995, Olaf Kirch <okir@monad.swb.de>
+ * Mar 1996:
+ *  -	Split up large functions into smaller chunks as per Linus' coding
+ *	style. Found an interesting bug this way, too.
+ *  -	Added entry points for nfsiod.
+ *
+ *  Copyright (C) 1995, 1996, Olaf Kirch <okir@monad.swb.de>
  */
 
 #include <linux/types.h>
@@ -31,70 +41,115 @@
 #include <linux/mm.h>
 #include <linux/rpcsock.h>
 
+#include <linux/udp.h>
+#include <net/sock.h>
+
 #include <asm/segment.h>
 
 #define msleep(sec)	{ current->timeout = sec * HZ / 1000; \
 			  current->state = TASK_INTERRUPTIBLE; \
 			  schedule(); \
 			}
-			
-#ifdef DEBUG_NFS			
-#define dprintk(x)		printk(x)
+
+#undef DEBUG_RPC
+#ifdef DEBUG_RPC			
+#define dprintk(args...)	printk(## args)
 #else
-#define	dprintk(x)
+#define	dprintk(args...)
 #endif
 
+
+/*
+ * Insert new request into wait list. We make sure list is sorted by
+ * increasing timeout value.
+ */
 static inline void
 rpc_insque(struct rpc_sock *rsock, struct rpc_wait *slot)
 {
-	struct rpc_wait	*tmp;
+	struct rpc_wait	*next = rsock->pending;
 
-	if ((tmp = rsock->tail) != NULL) {
-		tmp->next = slot;
-	} else {
-		rsock->head = slot;
-	}
-	rsock->tail = slot;
-	slot->prev = tmp;
-	slot->next = NULL;
-	dprintk(("RPC: inserted %08lx into queue.\n", (long)slot));
-	dprintk(("RPC: head = %08lx, tail = %08lx.\n",
-			(long) rsock->head, (long) rsock->tail));
+	slot->w_next = next;
+	slot->w_prev = NULL;
+	if (next)
+		next->w_prev = slot;
+	rsock->pending = slot;
+	slot->w_queued = 1;
+
+	dprintk("RPC: inserted %p into queue\n", slot);
 }
 
+/*
+ * Remove request from request queue
+ */
 static inline void
 rpc_remque(struct rpc_sock *rsock, struct rpc_wait *slot)
 {
-	struct rpc_wait	*prev = slot->prev,
-			*next = slot->next;
+	struct rpc_wait	*prev = slot->w_prev,
+			*next = slot->w_next;
 
 	if (prev != NULL)
-		prev->next = next;
+		prev->w_next = next;
 	else
-		rsock->head = next;
+		rsock->pending = next;
 	if (next != NULL)
-		next->prev = prev;
-	else
-		rsock->tail = prev;
-	dprintk(("RPC: removed %08lx from queue.\n", (long)slot));
-	dprintk(("RPC: head = %08lx, tail = %08lx.\n",
-			(long) rsock->head, (long) rsock->tail));
+		next->w_prev = prev;
+
+	slot->w_queued = 0;
+	dprintk("RPC: removed %p from queue, head now %p.\n",
+			slot, rsock->pending);
 }
 
+/*
+ * Write data to socket.
+ */
 static inline int
-rpc_sendmsg(struct rpc_sock *rsock, struct msghdr *msg, int len)
+rpc_sendmsg(struct rpc_sock *rsock, struct iovec *iov, int nr, int len,
+				struct sockaddr *sap, int salen)
 {
 	struct socket	*sock = rsock->sock;
+	struct msghdr	msg;
 	unsigned long	oldfs;
 	int		result;
 
-	dprintk(("RPC: sending %d bytes (buf %p)\n", len, msg->msg_iov[0].iov_base));
+	msg.msg_iov	= iov;
+	msg.msg_iovlen	= nr;
+	msg.msg_name	= sap;
+	msg.msg_namelen = salen;
+	msg.msg_accrights = NULL;
+
+	oldfs = get_fs();
+	set_fs(get_ds());
+	result = sock->ops->sendmsg(sock, &msg, len, 0, 0);
+	set_fs(oldfs);
+
+	dprintk("RPC: rpc_sendmsg(iov %p, len %d) = %d\n", iov, len, result);
+	return result;
+}
+/*
+ * Read data from socket
+ */
+static inline int
+rpc_recvmsg(struct rpc_sock *rsock, struct iovec *iov,
+			int nr, int len, int flags)
+{
+	struct socket	*sock = rsock->sock;
+	struct sockaddr	sa;
+	struct msghdr	msg;
+	unsigned long	oldfs;
+	int		result, alen;
+
+	msg.msg_iov	= iov;
+	msg.msg_iovlen	= nr;
+	msg.msg_name	= &sa;
+	msg.msg_namelen = sizeof(sa);
+	msg.msg_accrights = NULL;
+
 	oldfs = get_fs();
 	set_fs(get_ds());
-	result = sock->ops->sendmsg(sock, msg, len, 0, 0);
+	result = sock->ops->recvmsg(sock, &msg, len, 1, flags, &alen);
 	set_fs(oldfs);
-	dprintk(("RPC: result = %d\n", result));
 
+	dprintk("RPC: rpc_recvmsg(iov %p, len %d) = %d\n", iov, len, result);
 	return result;
 }
 
@@ -109,7 +164,7 @@
 	struct file	*file = rsock->file;
 	select_table	wait_table;
 
-	dprintk(("RPC: selecting on socket...\n"));
+	dprintk("RPC: selecting on socket...\n");
 	wait_table.nr = 0;
 	wait_table.entry = &entry;
 	current->state = TASK_INTERRUPTIBLE;
@@ -125,232 +180,347 @@
 	} else if (wait_table.nr)
 		remove_wait_queue(entry.wait_address, &entry.wait);
 	current->state = TASK_RUNNING;
-	dprintk(("RPC: ...Okay, there appears to be some data.\n"));
+	dprintk("RPC: ...Okay, there appears to be some data.\n");
 	return 0;
 }
 
-static inline int
-rpc_recvmsg(struct rpc_sock *rsock, struct msghdr *msg, int len,int flags)
+/*
+ * Reserve an RPC call slot. nocwait determines whether we wait in case
+ * of congestion or not.
+ */
+int
+rpc_reserve(struct rpc_sock *rsock, struct rpc_ioreq *req, int nocwait)
 {
-	struct socket	*sock = rsock->sock;
-	struct sockaddr	sa;
-	int		alen = sizeof(sa);
-	unsigned long	oldfs;
-	int		result;
+	struct rpc_wait	*slot;
 
-	dprintk(("RPC: receiving %d bytes max (buf %p)\n", len, msg->msg_iov[0].iov_base));
-	oldfs = get_fs();
-	set_fs(get_ds());
-	result = sock->ops->recvmsg(sock, msg, len, 1, flags, &alen);
-	set_fs(oldfs);
-	dprintk(("RPC: result = %d\n", result));
+	req->rq_slot = NULL;
 
-#if 0
-	if (alen != salen || memcmp(&sa, sap, alen)) {
-		dprintk(("RPC: reply address mismatch... rejected.\n"));
-		result = -EAGAIN;
+	while (!(slot = rsock->free) || rsock->cong >= rsock->cwnd) {
+		if (nocwait) {
+			current->timeout = 0;
+			return -ENOBUFS;
+		}
+		dprintk("RPC: rpc_reserve waiting on backlog\n");
+		interruptible_sleep_on(&rsock->backlog);
+		if (current->timeout == 0)
+			return -ETIMEDOUT;
+		if (current->signal & ~current->blocked)
+			return -ERESTARTSYS;
+		if (rsock->shutdown)
+			return -EIO;
 	}
-#endif
 
-	return result;
+	rsock->free = slot->w_next;
+	rsock->cong += RPC_CWNDSCALE;	/* bump congestion value */
+
+	slot->w_queued = 0;
+	slot->w_gotit = 0;
+	slot->w_req = req;
+
+	dprintk("RPC: reserved slot %p\n", slot);
+	req->rq_slot = slot;
+	return 0;
+}
+
+/*
+ * Release an RPC call slot
+ */
+void
+rpc_release(struct rpc_sock *rsock, struct rpc_ioreq *req)
+{
+	struct rpc_wait	*slot = req->rq_slot;
+
+	if (slot != NULL) {
+		dprintk("RPC: release slot %p\n", slot);
+
+		/* Wake up the next receiver */
+		if (slot == rsock->pending && slot->w_next != NULL)
+			wake_up(&slot->w_next->w_wait);
+
+		/* remove slot from queue of pending */
+		if (slot->w_queued)
+			rpc_remque(rsock, slot);
+		slot->w_next = rsock->free;
+		rsock->free = slot;
+
+		/* decrease congestion value */
+		rsock->cong -= RPC_CWNDSCALE;
+		if (rsock->cong < rsock->cwnd && rsock->backlog)
+			wake_up(&rsock->backlog);
+		if (rsock->shutdown)
+			wake_up(&rsock->shutwait);
+
+		req->rq_slot = NULL;
+	}
+}
+
+/*
+ * Adjust RPC congestion window
+ */
+static void
+rpc_cwnd_adjust(struct rpc_sock *rsock, int timeout)
+{
+	unsigned long	cwnd = rsock->cwnd;
+
+	if (!timeout) {
+		if (rsock->cong >= cwnd) {
+			/* The (cwnd >> 1) term makes sure
+			 * the result gets rounded properly. */
+			cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE +
+					(cwnd >> 1)) / cwnd;
+			if (cwnd > RPC_MAXCWND)
+				cwnd = RPC_MAXCWND;
+		}
+	} else {
+		if ((cwnd >>= 1) < RPC_CWNDSCALE)
+			cwnd = RPC_CWNDSCALE;
+		dprintk("RPC: cwnd decrease %08lx\n", cwnd);
+	}
+	dprintk("RPC: cong %08lx, cwnd was %08lx, now %08lx\n",
+			rsock->cong, rsock->cwnd, cwnd);
+
+	rsock->cwnd = cwnd;
+}
+
+static inline void
+rpc_send_check(char *where, u32 *ptr)
+{
+	if (ptr[1] != 0 || ptr[2] != htonl(2) || ptr[3] != htonl(100003)) {
+		printk("RPC: %s sending evil packet:\n"
+		       "     %08x %08x %08x %08x %08x %08x %08x %08x\n",
+		       where,
+		       ptr[0], ptr[1], ptr[2], ptr[3],
+		       ptr[4], ptr[5], ptr[6], ptr[7]);
+	}
 }
 
 /*
  * Place the actual RPC call.
+ * We have to copy the iovec because sendmsg fiddles with its contents.
  */
-static int
-rpc_call_one(struct rpc_sock *rsock, struct rpc_wait *slot,
-		struct sockaddr *sap, int salen,
-		const int *sndbuf, int slen, int *rcvbuf, int rlen)
+static inline int
+rpc_send(struct rpc_sock *rsock, struct rpc_wait *slot)
 {
-	struct rpc_wait	*rovr = NULL;
-	int		result;
+	struct rpc_ioreq *req = slot->w_req;
+	struct iovec	iov[MAX_IOVEC];
+
+	if (rsock->shutdown)
+		return -EIO;
+
+	memcpy(iov, req->rq_svec, req->rq_snr * sizeof(iov[0]));
+	slot->w_xid = *(u32 *)(iov[0].iov_base);
+	if (!slot->w_queued)
+		rpc_insque(rsock, slot);
+
+	dprintk("rpc_send(%p, %x)\n", slot, slot->w_xid);
+	rpc_send_check("rpc_send", (u32 *) req->rq_svec[0].iov_base);
+	return rpc_sendmsg(rsock, iov, req->rq_snr, req->rq_slen,
+				req->rq_addr, req->rq_alen);
+}
+
+/*
+ * This is the same as rpc_send but for the functions exported to nfsiod
+ */
+int
+rpc_transmit(struct rpc_sock *rsock, struct rpc_ioreq *req)
+{
+	rpc_send_check("rpc_transmit", (u32 *) req->rq_svec[0].iov_base);
+	return rpc_send(rsock, req->rq_slot);
+}
+
+/*
+ * Receive and dispatch a single reply
+ */
+static inline int
+rpc_grok(struct rpc_sock *rsock)
+{
+	struct rpc_wait	*rovr;
+	struct rpc_ioreq *req;
+	struct iovec	iov[MAX_IOVEC];
 	u32		xid;
-	int		safe;
-	struct msghdr   msg;
-	struct iovec	iov;
-	
-	msg.msg_iov	=	&iov;
-	msg.msg_iovlen	=	1;
-	msg.msg_name	=	(void *)sap;
-	msg.msg_namelen	=	salen;
-	msg.msg_accrights =	NULL;
-	iov.iov_base	=	(void *)sndbuf;
-	iov.iov_len	=	slen;
-
-	dprintk(("RPC: placing one call, rsock = %08lx, slot = %08lx, "
-		"sap = %08lx, salen = %d, "
-		"sndbuf = %08lx, slen = %d, rcvbuf = %08lx, rlen = %d\n",
-		(long) rsock, (long) slot, (long) sap, 
-		salen, (long) sndbuf, slen, (long) rcvbuf, rlen));
-
-	result = rpc_sendmsg(rsock, &msg, slen);
-	if (result < 0)
-		return result;
+	int		safe, result;
 
-	do {
-		/* We are not the receiver. Wait on the side lines. */
-		if (rsock->head != slot) {
-			interruptible_sleep_on(&slot->wait);
-			if (slot->gotit)
-				break;
-			if (current->timeout != 0)
-				continue;
-			if (rsock->shutdown) {
-				printk("RPC: aborting call due to shutdown.\n");
-				return -EIO;
-			}
-			return -ETIMEDOUT;
+	iov[0].iov_base = (void *) &xid;
+	iov[0].iov_len  = sizeof(xid);
+	result = rpc_recvmsg(rsock, iov, 1, sizeof(xid), MSG_PEEK);
+
+	if (result < 0) {
+		switch (-result) {
+		case EAGAIN: case ECONNREFUSED:
+			return 0;
+		case ERESTARTSYS:
+			return result;
+		default:
+			dprintk("rpc_grok: recv error = %d\n", result);
 		}
-		
-		/* wait for data to arrive */
-		result = rpc_select(rsock);
-		if (result < 0) {
-			dprintk(("RPC: select error = %d\n", result));
+	}
+	if (result < 4) {
+		printk(KERN_WARNING "RPC: impossible RPC reply size %d\n",
+						result);
+		return 0;
+	}
+
+	dprintk("RPC: rpc_grok: got xid %08lx\n", (unsigned long) xid);
+
+	/* Look for the caller */
+	safe = 0;
+	for (rovr = rsock->pending; rovr; rovr = rovr->w_next) {
+		if (rovr->w_xid == xid)
+			break;
+		if (safe++ > RPC_MAXREQS) {
+			printk(KERN_WARNING "RPC: loop in request Q!!\n");
+			rovr = NULL;
 			break;
 		}
+	}
 
-		iov.iov_base=(void *)&xid;
-		iov.iov_len=sizeof(xid);
-		
-		result = rpc_recvmsg(rsock, &msg, sizeof(xid), MSG_PEEK);
-		if (result < 0) {
-			switch (-result) {
-			case EAGAIN: case ECONNREFUSED:
-				continue;
-			default:
-				dprintk(("rpc_call: recv error = %d\n", result));
-			case ERESTARTSYS:
-				return result;
-			}
-		}
+	if (!rovr || rovr->w_gotit) {
+		/* discard dgram */
+		dprintk("RPC: rpc_grok: %s.\n",
+			rovr? "duplicate reply" : "bad XID");
+		iov[0].iov_base = (void *) &xid;
+		iov[0].iov_len  = sizeof(xid);
+		rpc_recvmsg(rsock, iov, 1, sizeof(xid), 0);
+		return 0;
+	}
+	req = rovr->w_req;
 
-		/* Look for the caller */
-		safe = 0;
-		for (rovr = rsock->head; rovr; rovr = rovr->next) {
-			if (safe++ > NRREQS) {
-				printk("RPC: loop in request Q!!\n");
-				rovr = NULL;
-				break;
+	/* Now receive the reply... Copy the iovec first because of 
+	 * memcpy_fromiovec fiddling. */
+	memcpy(iov, req->rq_rvec, req->rq_rnr * sizeof(iov[0]));
+	result = rpc_recvmsg(rsock, iov, req->rq_rnr, req->rq_rlen, 0);
+	rovr->w_result = result;
+	rovr->w_gotit = 1;
+
+	/* ... and wake up the process */
+	wake_up(&rovr->w_wait);
+
+	return result;
+}
+
+/*
+ * Wait for the reply to our call.
+ */
+static int
+rpc_recv(struct rpc_sock *rsock, struct rpc_wait *slot)
+{
+	int	result;
+
+	do {
+		/* If we are not the receiver, wait on the sidelines */
+		dprintk("RPC: rpc_recv TP1\n");
+		while (rsock->pending != slot) {
+			if (!slot->w_gotit)
+				interruptible_sleep_on(&slot->w_wait);
+			if (slot->w_gotit) {
+				result = slot->w_result; /* quite important */
+				return result;
 			}
-			if (rovr->xid == xid)
-				break;
+			if (current->signal & ~current->blocked)
+				return -ERESTARTSYS;
+			if (rsock->shutdown)
+				return -EIO;
+			if (current->timeout == 0)
+				return -ETIMEDOUT;
 		}
 
-		if (!rovr || rovr->gotit) {
-			/* bad XID or duplicate reply, discard dgram */
-			dprintk(("RPC: bad XID or duplicate reply.\n"));
-			iov.iov_base=(void *)&xid;
-			iov.iov_len=sizeof(xid);
-			rpc_recvmsg(rsock, &msg, sizeof(xid),0);
-			continue;
+		/* Wait for data to arrive */
+		if ((result = rpc_select(rsock)) < 0) {
+			dprintk("RPC: select error = %d\n", result);
+			break;
 		}
-		rovr->gotit = 1;
 
-		/* Now receive the reply */
-		
-		iov.iov_base=rovr->buf;
-		iov.iov_len=rovr->len;
-		
-		result = rpc_recvmsg(rsock, &msg, rovr->len, 0);
-
-		/* If this is not for ourselves, wake up the caller */
-		if (rovr != slot)
-			wake_up(&rovr->wait);
-	} while (rovr != slot);
-
-	/* This is somewhat tricky. We rely on the fact that we are able to
-	 * remove ourselves from the queues before the next reader is scheduled,
-	 * otherwise it would find that we're still at the head of the queue
-	 * and go to sleep again.
-	 */
-	if (rsock->head == slot && slot->next != NULL)
-		wake_up(&slot->next->wait);
+		/* Receive and dispatch */
+		if ((result = rpc_grok(rsock)) < 0)
+			break;
+	} while (current->timeout && !slot->w_gotit);
 
-	return result;
+	return slot->w_gotit? result : -ETIMEDOUT;
 }
 
 /*
- * Generic RPC call routine. This handles retries and timeouts etc pp
+ * Generic RPC call routine. This handles retries and timeouts etc pp.
+ *
+ * If sent is non-null, it assumes the called has already sent out the
+ * message, so it won't need to do so unless a timeout occurs.
  */
 int
-rpc_call(struct rpc_sock *rsock, struct sockaddr *sap, int addrlen,
-		const int *sndbuf, int slen, int *rcvbuf, int rlen,
-		struct rpc_timeout *strategy, int flag)
-{
-	struct rpc_wait		*slot;
-	int			result, retries;
-	unsigned long		timeout;
+rpc_doio(struct rpc_sock *rsock, struct rpc_ioreq *req,
+			struct rpc_timeout *strategy, int sent)
+{
+	struct rpc_wait	*slot;
+	int		result, retries;
+	unsigned long	timeout;
 
-	timeout = strategy->init_timeout;
+	timeout = strategy->to_initval;
 	retries = 0;
-	slot = NULL;
+	slot = req->rq_slot;
 
 	do {
-		dprintk(("RPC call TP1\n"));
+		dprintk("RPC: rpc_doio: TP1 (req %p)\n", req);
 		current->timeout = jiffies + timeout;
 		if (slot == NULL) {
-			while ((slot = rsock->free) == NULL) {
-				if (!flag) {
-					current->timeout = 0;
-					return -ENOBUFS;
-				}
-				interruptible_sleep_on(&rsock->backlog);
-				if (current->timeout == 0) {
-					result = -ETIMEDOUT;
-					goto timedout;
-				}
-				if (rsock->shutdown) {
-					dprintk(("RPC: aborting call due to shutdown.\n"));
-					current->timeout = 0;
-					return -EIO;
-				}
-			}
-			dprintk(("RPC call TP2\n"));
-			slot->gotit = 0;
-			slot->xid = *(u32 *)sndbuf;
-			slot->buf = rcvbuf;
-			slot->len = rlen;
-			rsock->free = slot->next;
+			result = rpc_reserve(rsock, req, 0);
+			if (result == -ETIMEDOUT)
+				goto timedout;
+			if (result < 0)
+				break;
+			slot = req->rq_slot;
+			rpc_send_check("rpc_doio",
+				(u32 *) req->rq_svec[0].iov_base);
 			rpc_insque(rsock, slot);
 		}
 
-		dprintk(("RPC call TP3\n"));
-		result = rpc_call_one(rsock, slot, sap, addrlen,
-					sndbuf, slen, rcvbuf, rlen);
-		if (result != -ETIMEDOUT)
+		/* This check is for loopback NFS. Sometimes replies come
+		 * in before biod has called rpc_doio... */
+		if (slot->w_gotit) {
+			result = slot->w_result;
 			break;
+		}
+
+		dprintk("RPC: rpc_doio: TP2\n");
+		if (sent || (result = rpc_send(rsock, slot)) >= 0) {
+			result = rpc_recv(rsock, slot);
+			sent = 0;
+		}
+
+		if (result != -ETIMEDOUT) {
+			/* dprintk("RPC: rpc_recv returned %d\n", result); */
+			rpc_cwnd_adjust(rsock, 0);
+			break;
+		}
+
+		rpc_cwnd_adjust(rsock, 1);
 
 timedout:
-		dprintk(("RPC call TP4\n"));
-		dprintk(("RPC: rpc_call_one returned timeout.\n"));
-		if (strategy->exponential)
+		dprintk("RPC: rpc_recv returned timeout.\n");
+		if (strategy->to_exponential)
 			timeout <<= 1;
 		else
-			timeout += strategy->increment;
-		if (strategy->max_timeout && timeout >= strategy->max_timeout)
-			timeout = strategy->max_timeout;
-		if (strategy->retries && ++retries >= strategy->retries)
+			timeout += strategy->to_increment;
+		if (strategy->to_maxval && timeout >= strategy->to_maxval)
+			timeout = strategy->to_maxval;
+		if (strategy->to_retries && ++retries >= strategy->to_retries)
 			break;
 	} while (1);
 
-	dprintk(("RPC call TP5\n"));
+	dprintk("RPC: rpc_doio: TP3\n");
 	current->timeout = 0;
-	if (slot != NULL) {
-		dprintk(("RPC call TP6\n"));
-		rpc_remque(rsock, slot);
-		slot->next = rsock->free;
-		rsock->free = slot;
-
-		/* wake up tasks that haven't sent anything yet. (Waking
-		 * up the first one on the wait queue would be enough) */
-		if (rsock->backlog)
-			wake_up(&rsock->backlog);
-	}
+	return result;
+}
 
-	if (rsock->shutdown)
-		wake_up(&rsock->shutwait);
+/*
+ */
+int
+rpc_call(struct rpc_sock *rsock, struct rpc_ioreq *req,
+			struct rpc_timeout *strategy)
+{
+	int	result;
 
+	result = rpc_doio(rsock, req, strategy, 0);
+	if (req->rq_slot == NULL)
+		printk(KERN_WARNING "RPC: bad: rq_slot == NULL\n");
+	rpc_release(rsock, req);
 	return result;
 }
 
@@ -358,31 +528,35 @@
 rpc_makesock(struct file *file)
 {
 	struct rpc_sock	*rsock;
+	struct socket	*sock;
+	struct sock	*sk;
 	struct rpc_wait	*slot;
 	int		i;
 
-	dprintk(("RPC: make RPC socket...\n"));
+	dprintk("RPC: make RPC socket...\n");
+	sock = &file->f_inode->u.socket_i;
+	if (sock->type != SOCK_DGRAM || sock->ops->family != AF_INET) {
+		printk(KERN_WARNING "RPC: only UDP sockets supported\n");
+		return NULL;
+	}
+	sk = (struct sock *) sock->data;
+
 	if ((rsock = kmalloc(sizeof(struct rpc_sock), GFP_KERNEL)) == NULL)
 		return NULL;
 	memset(rsock, 0, sizeof(*rsock)); /* Nnnngh! */
 
-	rsock->sock = &file->f_inode->u.socket_i;
+	rsock->sock = sock;
+	rsock->inet = sk;
 	rsock->file = file;
+	rsock->cwnd = RPC_INITCWND;
 
+	dprintk("RPC: slots %p, %p, ...\n", rsock->waiting, rsock->waiting + 1);
 	rsock->free = rsock->waiting;
-	for (i = 0, slot = rsock->waiting; i < NRREQS-1; i++, slot++)
-		slot->next = slot + 1;
-	slot->next = NULL;
-
-	/* --- taken care of by memset above ---
-	rsock->backlog = NULL;
-	rsock->head = rsock->tail = NULL;
-
-	rsock->shutwait = NULL;
-	rsock->shutdown = 0;
-	 */
+	for (i = 0, slot = rsock->waiting; i < RPC_MAXREQS-1; i++, slot++)
+		slot->w_next = slot + 1;
+	slot->w_next = NULL;
 
-	dprintk(("RPC: made socket %08lx", (long) rsock));
+	dprintk("RPC: made socket %p\n", rsock);
 	return rsock;
 }
 
@@ -392,13 +566,13 @@
 	unsigned long	t0 = jiffies;
 
 	rsock->shutdown = 1;
-	while (rsock->head || rsock->backlog) {
+	while (rsock->pending || rsock->backlog) {
 		interruptible_sleep_on(&rsock->shutwait);
 		if (current->signal & ~current->blocked)
 			return -EINTR;
 #if 1
 		if (t0 && t0 - jiffies > 60 * HZ) {
-			printk("RPC: hanging in rpc_closesock.\n");
+			printk(KERN_WARNING "RPC: hanging in rpc_closesock.\n");
 			t0 = 0;
 		}
 #endif

FUNET's LINUX-ADM group, linux-adm@nic.funet.fi
TCL-scripts by Sam Shen, slshen@lbl.gov with Sam's (original) version
of this