diff options
Diffstat (limited to 'freebsd/sys/rpc/clnt_dg.c')
-rw-r--r-- | freebsd/sys/rpc/clnt_dg.c | 1151 |
1 files changed, 1151 insertions, 0 deletions
diff --git a/freebsd/sys/rpc/clnt_dg.c b/freebsd/sys/rpc/clnt_dg.c new file mode 100644 index 00000000..4c40a494 --- /dev/null +++ b/freebsd/sys/rpc/clnt_dg.c @@ -0,0 +1,1151 @@ +/* $NetBSD: clnt_dg.c,v 1.4 2000/07/14 08:40:41 fvdl Exp $ */ + +/*- + * SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2009, Sun Microsystems, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * - Neither the name of Sun Microsystems, Inc. nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ +/* + * Copyright (c) 1986-1991 by Sun Microsystems Inc. + */ + +#if defined(LIBC_SCCS) && !defined(lint) +#ident "@(#)clnt_dg.c 1.23 94/04/22 SMI" +static char sccsid[] = "@(#)clnt_dg.c 1.19 89/03/16 Copyr 1988 Sun Micro"; +#endif +#include <sys/cdefs.h> +__FBSDID("$FreeBSD$"); + +/* + * Implements a connectionless client side RPC. + */ + +#include <sys/param.h> +#include <sys/systm.h> +#include <sys/kernel.h> +#include <sys/lock.h> +#include <sys/malloc.h> +#include <sys/mbuf.h> +#include <sys/mutex.h> +#include <sys/pcpu.h> +#include <sys/proc.h> +#include <sys/socket.h> +#include <sys/socketvar.h> +#include <sys/time.h> +#include <sys/uio.h> + +#include <net/vnet.h> + +#include <rpc/rpc.h> +#include <rpc/rpc_com.h> + + +#ifdef _FREEFALL_CONFIG +/* + * Disable RPC exponential back-off for FreeBSD.org systems. + */ +#define RPC_MAX_BACKOFF 1 /* second */ +#else +#define RPC_MAX_BACKOFF 30 /* seconds */ +#endif + +static bool_t time_not_ok(struct timeval *); +static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *, + rpcproc_t, struct mbuf *, struct mbuf **, struct timeval); +static void clnt_dg_geterr(CLIENT *, struct rpc_err *); +static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *); +static void clnt_dg_abort(CLIENT *); +static bool_t clnt_dg_control(CLIENT *, u_int, void *); +static void clnt_dg_close(CLIENT *); +static void clnt_dg_destroy(CLIENT *); +static int clnt_dg_soupcall(struct socket *so, void *arg, int waitflag); + +static struct clnt_ops clnt_dg_ops = { + .cl_call = clnt_dg_call, + .cl_abort = clnt_dg_abort, + .cl_geterr = clnt_dg_geterr, + .cl_freeres = clnt_dg_freeres, + .cl_close = clnt_dg_close, + .cl_destroy = clnt_dg_destroy, + .cl_control = clnt_dg_control +}; + +/* + * A pending RPC request which awaits a reply. Requests which have + * received their reply will have cr_xid set to zero and cr_mrep to + * the mbuf chain of the reply. + */ +struct cu_request { + TAILQ_ENTRY(cu_request) cr_link; + CLIENT *cr_client; /* owner */ + uint32_t cr_xid; /* XID of request */ + struct mbuf *cr_mrep; /* reply received by upcall */ + int cr_error; /* any error from upcall */ + char cr_verf[MAX_AUTH_BYTES]; /* reply verf */ +}; + +TAILQ_HEAD(cu_request_list, cu_request); + +#define MCALL_MSG_SIZE 24 + +/* + * This structure is pointed to by the socket buffer's sb_upcallarg + * member. It is separate from the client private data to facilitate + * multiple clients sharing the same socket. The cs_lock mutex is used + * to protect all fields of this structure, the socket's receive + * buffer SOCKBUF_LOCK is used to ensure that exactly one of these + * structures is installed on the socket. + */ +struct cu_socket { + struct mtx cs_lock; + int cs_refs; /* Count of clients */ + struct cu_request_list cs_pending; /* Requests awaiting replies */ + int cs_upcallrefs; /* Refcnt of upcalls in prog.*/ +}; + +static void clnt_dg_upcallsdone(struct socket *, struct cu_socket *); + +/* + * Private data kept per client handle + */ +struct cu_data { + int cu_threads; /* # threads in clnt_vc_call */ + bool_t cu_closing; /* TRUE if we are closing */ + bool_t cu_closed; /* TRUE if we are closed */ + struct socket *cu_socket; /* connection socket */ + bool_t cu_closeit; /* opened by library */ + struct sockaddr_storage cu_raddr; /* remote address */ + int cu_rlen; + struct timeval cu_wait; /* retransmit interval */ + struct timeval cu_total; /* total time for the call */ + struct rpc_err cu_error; + uint32_t cu_xid; + char cu_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */ + size_t cu_mcalllen; + size_t cu_sendsz; /* send size */ + size_t cu_recvsz; /* recv size */ + int cu_async; + int cu_connect; /* Use connect(). */ + int cu_connected; /* Have done connect(). */ + const char *cu_waitchan; + int cu_waitflag; + int cu_cwnd; /* congestion window */ + int cu_sent; /* number of in-flight RPCs */ + bool_t cu_cwnd_wait; +}; + +#define CWNDSCALE 256 +#define MAXCWND (32 * CWNDSCALE) + +/* + * Connection less client creation returns with client handle parameters. + * Default options are set, which the user can change using clnt_control(). + * fd should be open and bound. + * NB: The rpch->cl_auth is initialized to null authentication. + * Caller may wish to set this something more useful. + * + * sendsz and recvsz are the maximum allowable packet sizes that can be + * sent and received. Normally they are the same, but they can be + * changed to improve the program efficiency and buffer allocation. + * If they are 0, use the transport default. + * + * If svcaddr is NULL, returns NULL. + */ +CLIENT * +clnt_dg_create( + struct socket *so, + struct sockaddr *svcaddr, /* servers address */ + rpcprog_t program, /* program number */ + rpcvers_t version, /* version number */ + size_t sendsz, /* buffer recv size */ + size_t recvsz) /* buffer send size */ +{ + CLIENT *cl = NULL; /* client handle */ + struct cu_data *cu = NULL; /* private data */ + struct cu_socket *cs = NULL; + struct sockbuf *sb; + struct timeval now; + struct rpc_msg call_msg; + struct __rpc_sockinfo si; + XDR xdrs; + int error; + + if (svcaddr == NULL) { + rpc_createerr.cf_stat = RPC_UNKNOWNADDR; + return (NULL); + } + + if (!__rpc_socket2sockinfo(so, &si)) { + rpc_createerr.cf_stat = RPC_TLIERROR; + rpc_createerr.cf_error.re_errno = 0; + return (NULL); + } + + /* + * Find the receive and the send size + */ + sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz); + recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz); + if ((sendsz == 0) || (recvsz == 0)) { + rpc_createerr.cf_stat = RPC_TLIERROR; /* XXX */ + rpc_createerr.cf_error.re_errno = 0; + return (NULL); + } + + cl = mem_alloc(sizeof (CLIENT)); + + /* + * Should be multiple of 4 for XDR. + */ + sendsz = rounddown(sendsz + 3, 4); + recvsz = rounddown(recvsz + 3, 4); + cu = mem_alloc(sizeof (*cu)); + cu->cu_threads = 0; + cu->cu_closing = FALSE; + cu->cu_closed = FALSE; + (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len); + cu->cu_rlen = svcaddr->sa_len; + /* Other values can also be set through clnt_control() */ + cu->cu_wait.tv_sec = 3; /* heuristically chosen */ + cu->cu_wait.tv_usec = 0; + cu->cu_total.tv_sec = -1; + cu->cu_total.tv_usec = -1; + cu->cu_sendsz = sendsz; + cu->cu_recvsz = recvsz; + cu->cu_async = FALSE; + cu->cu_connect = FALSE; + cu->cu_connected = FALSE; + cu->cu_waitchan = "rpcrecv"; + cu->cu_waitflag = 0; + cu->cu_cwnd = MAXCWND / 2; + cu->cu_sent = 0; + cu->cu_cwnd_wait = FALSE; + (void) getmicrotime(&now); + cu->cu_xid = __RPC_GETXID(&now); + call_msg.rm_xid = cu->cu_xid; + call_msg.rm_call.cb_prog = program; + call_msg.rm_call.cb_vers = version; + xdrmem_create(&xdrs, cu->cu_mcallc, MCALL_MSG_SIZE, XDR_ENCODE); + if (! xdr_callhdr(&xdrs, &call_msg)) { + rpc_createerr.cf_stat = RPC_CANTENCODEARGS; /* XXX */ + rpc_createerr.cf_error.re_errno = 0; + goto err2; + } + cu->cu_mcalllen = XDR_GETPOS(&xdrs); + + /* + * By default, closeit is always FALSE. It is users responsibility + * to do a close on it, else the user may use clnt_control + * to let clnt_destroy do it for him/her. + */ + cu->cu_closeit = FALSE; + cu->cu_socket = so; + error = soreserve(so, (u_long)sendsz, (u_long)recvsz); + if (error != 0) { + rpc_createerr.cf_stat = RPC_FAILED; + rpc_createerr.cf_error.re_errno = error; + goto err2; + } + + sb = &so->so_rcv; + SOCKBUF_LOCK(&so->so_rcv); +recheck_socket: + if (sb->sb_upcall) { + if (sb->sb_upcall != clnt_dg_soupcall) { + SOCKBUF_UNLOCK(&so->so_rcv); + printf("clnt_dg_create(): socket already has an incompatible upcall\n"); + goto err2; + } + cs = (struct cu_socket *) sb->sb_upcallarg; + mtx_lock(&cs->cs_lock); + cs->cs_refs++; + mtx_unlock(&cs->cs_lock); + } else { + /* + * We are the first on this socket - allocate the + * structure and install it in the socket. + */ + SOCKBUF_UNLOCK(&so->so_rcv); + cs = mem_alloc(sizeof(*cs)); + SOCKBUF_LOCK(&so->so_rcv); + if (sb->sb_upcall) { + /* + * We have lost a race with some other client. + */ + mem_free(cs, sizeof(*cs)); + goto recheck_socket; + } + mtx_init(&cs->cs_lock, "cs->cs_lock", NULL, MTX_DEF); + cs->cs_refs = 1; + cs->cs_upcallrefs = 0; + TAILQ_INIT(&cs->cs_pending); + soupcall_set(so, SO_RCV, clnt_dg_soupcall, cs); + } + SOCKBUF_UNLOCK(&so->so_rcv); + + cl->cl_refs = 1; + cl->cl_ops = &clnt_dg_ops; + cl->cl_private = (caddr_t)(void *)cu; + cl->cl_auth = authnone_create(); + cl->cl_tp = NULL; + cl->cl_netid = NULL; + return (cl); +err2: + mem_free(cl, sizeof (CLIENT)); + mem_free(cu, sizeof (*cu)); + + return (NULL); +} + +static enum clnt_stat +clnt_dg_call( + CLIENT *cl, /* client handle */ + struct rpc_callextra *ext, /* call metadata */ + rpcproc_t proc, /* procedure number */ + struct mbuf *args, /* pointer to args */ + struct mbuf **resultsp, /* pointer to results */ + struct timeval utimeout) /* seconds to wait before giving up */ +{ + struct cu_data *cu = (struct cu_data *)cl->cl_private; + struct cu_socket *cs; + struct rpc_timers *rt; + AUTH *auth; + struct rpc_err *errp; + enum clnt_stat stat; + XDR xdrs; + struct rpc_msg reply_msg; + bool_t ok; + int retrans; /* number of re-transmits so far */ + int nrefreshes = 2; /* number of times to refresh cred */ + struct timeval *tvp; + int timeout; + int retransmit_time; + int next_sendtime, starttime, rtt, time_waited, tv = 0; + struct sockaddr *sa; + uint32_t xid = 0; + struct mbuf *mreq = NULL, *results; + struct cu_request *cr; + int error; + + cs = cu->cu_socket->so_rcv.sb_upcallarg; + cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK); + + mtx_lock(&cs->cs_lock); + + if (cu->cu_closing || cu->cu_closed) { + mtx_unlock(&cs->cs_lock); + free(cr, M_RPC); + return (RPC_CANTSEND); + } + cu->cu_threads++; + + if (ext) { + auth = ext->rc_auth; + errp = &ext->rc_err; + } else { + auth = cl->cl_auth; + errp = &cu->cu_error; + } + + cr->cr_client = cl; + cr->cr_mrep = NULL; + cr->cr_error = 0; + + if (cu->cu_total.tv_usec == -1) { + tvp = &utimeout; /* use supplied timeout */ + } else { + tvp = &cu->cu_total; /* use default timeout */ + } + if (tvp->tv_sec || tvp->tv_usec) + timeout = tvtohz(tvp); + else + timeout = 0; + + if (cu->cu_connect && !cu->cu_connected) { + mtx_unlock(&cs->cs_lock); + error = soconnect(cu->cu_socket, + (struct sockaddr *)&cu->cu_raddr, curthread); + mtx_lock(&cs->cs_lock); + if (error) { + errp->re_errno = error; + errp->re_status = stat = RPC_CANTSEND; + goto out; + } + cu->cu_connected = 1; + } + if (cu->cu_connected) + sa = NULL; + else + sa = (struct sockaddr *)&cu->cu_raddr; + time_waited = 0; + retrans = 0; + if (ext && ext->rc_timers) { + rt = ext->rc_timers; + if (!rt->rt_rtxcur) + rt->rt_rtxcur = tvtohz(&cu->cu_wait); + retransmit_time = next_sendtime = rt->rt_rtxcur; + } else { + rt = NULL; + retransmit_time = next_sendtime = tvtohz(&cu->cu_wait); + } + + starttime = ticks; + +call_again: + mtx_assert(&cs->cs_lock, MA_OWNED); + + cu->cu_xid++; + xid = cu->cu_xid; + +send_again: + mtx_unlock(&cs->cs_lock); + + mreq = m_gethdr(M_WAITOK, MT_DATA); + KASSERT(cu->cu_mcalllen <= MHLEN, ("RPC header too big")); + bcopy(cu->cu_mcallc, mreq->m_data, cu->cu_mcalllen); + mreq->m_len = cu->cu_mcalllen; + + /* + * The XID is the first thing in the request. + */ + *mtod(mreq, uint32_t *) = htonl(xid); + + xdrmbuf_create(&xdrs, mreq, XDR_ENCODE); + + if (cu->cu_async == TRUE && args == NULL) + goto get_reply; + + if ((! XDR_PUTINT32(&xdrs, &proc)) || + (! AUTH_MARSHALL(auth, xid, &xdrs, + m_copym(args, 0, M_COPYALL, M_WAITOK)))) { + errp->re_status = stat = RPC_CANTENCODEARGS; + mtx_lock(&cs->cs_lock); + goto out; + } + mreq->m_pkthdr.len = m_length(mreq, NULL); + + cr->cr_xid = xid; + mtx_lock(&cs->cs_lock); + + /* + * Try to get a place in the congestion window. + */ + while (cu->cu_sent >= cu->cu_cwnd) { + cu->cu_cwnd_wait = TRUE; + error = msleep(&cu->cu_cwnd_wait, &cs->cs_lock, + cu->cu_waitflag, "rpccwnd", 0); + if (error) { + errp->re_errno = error; + if (error == EINTR || error == ERESTART) + errp->re_status = stat = RPC_INTR; + else + errp->re_status = stat = RPC_CANTSEND; + goto out; + } + } + cu->cu_sent += CWNDSCALE; + + TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); + mtx_unlock(&cs->cs_lock); + + /* + * sosend consumes mreq. + */ + error = sosend(cu->cu_socket, sa, NULL, mreq, NULL, 0, curthread); + mreq = NULL; + + /* + * sub-optimal code appears here because we have + * some clock time to spare while the packets are in flight. + * (We assume that this is actually only executed once.) + */ + reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL; + reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf; + reply_msg.acpted_rply.ar_verf.oa_length = 0; + reply_msg.acpted_rply.ar_results.where = NULL; + reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void; + + mtx_lock(&cs->cs_lock); + if (error) { + TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); + errp->re_errno = error; + errp->re_status = stat = RPC_CANTSEND; + cu->cu_sent -= CWNDSCALE; + if (cu->cu_cwnd_wait) { + cu->cu_cwnd_wait = FALSE; + wakeup(&cu->cu_cwnd_wait); + } + goto out; + } + + /* + * Check to see if we got an upcall while waiting for the + * lock. + */ + if (cr->cr_error) { + TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); + errp->re_errno = cr->cr_error; + errp->re_status = stat = RPC_CANTRECV; + cu->cu_sent -= CWNDSCALE; + if (cu->cu_cwnd_wait) { + cu->cu_cwnd_wait = FALSE; + wakeup(&cu->cu_cwnd_wait); + } + goto out; + } + if (cr->cr_mrep) { + TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); + cu->cu_sent -= CWNDSCALE; + if (cu->cu_cwnd_wait) { + cu->cu_cwnd_wait = FALSE; + wakeup(&cu->cu_cwnd_wait); + } + goto got_reply; + } + + /* + * Hack to provide rpc-based message passing + */ + if (timeout == 0) { + TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); + errp->re_status = stat = RPC_TIMEDOUT; + cu->cu_sent -= CWNDSCALE; + if (cu->cu_cwnd_wait) { + cu->cu_cwnd_wait = FALSE; + wakeup(&cu->cu_cwnd_wait); + } + goto out; + } + +get_reply: + for (;;) { + /* Decide how long to wait. */ + if (next_sendtime < timeout) + tv = next_sendtime; + else + tv = timeout; + tv -= time_waited; + + if (tv > 0) { + if (cu->cu_closing || cu->cu_closed) { + error = 0; + cr->cr_error = ESHUTDOWN; + } else { + error = msleep(cr, &cs->cs_lock, + cu->cu_waitflag, cu->cu_waitchan, tv); + } + } else { + error = EWOULDBLOCK; + } + + TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); + cu->cu_sent -= CWNDSCALE; + if (cu->cu_cwnd_wait) { + cu->cu_cwnd_wait = FALSE; + wakeup(&cu->cu_cwnd_wait); + } + + if (!error) { + /* + * We were woken up by the upcall. If the + * upcall had a receive error, report that, + * otherwise we have a reply. + */ + if (cr->cr_error) { + errp->re_errno = cr->cr_error; + errp->re_status = stat = RPC_CANTRECV; + goto out; + } + + cu->cu_cwnd += (CWNDSCALE * CWNDSCALE + + cu->cu_cwnd / 2) / cu->cu_cwnd; + if (cu->cu_cwnd > MAXCWND) + cu->cu_cwnd = MAXCWND; + + if (rt) { + /* + * Add one to the time since a tick + * count of N means that the actual + * time taken was somewhere between N + * and N+1. + */ + rtt = ticks - starttime + 1; + + /* + * Update our estimate of the round + * trip time using roughly the + * algorithm described in RFC + * 2988. Given an RTT sample R: + * + * RTTVAR = (1-beta) * RTTVAR + beta * |SRTT-R| + * SRTT = (1-alpha) * SRTT + alpha * R + * + * where alpha = 0.125 and beta = 0.25. + * + * The initial retransmit timeout is + * SRTT + 4*RTTVAR and doubles on each + * retransmision. + */ + if (rt->rt_srtt == 0) { + rt->rt_srtt = rtt; + rt->rt_deviate = rtt / 2; + } else { + int32_t error = rtt - rt->rt_srtt; + rt->rt_srtt += error / 8; + error = abs(error) - rt->rt_deviate; + rt->rt_deviate += error / 4; + } + rt->rt_rtxcur = rt->rt_srtt + 4*rt->rt_deviate; + } + + break; + } + + /* + * The sleep returned an error so our request is still + * on the list. If we got EWOULDBLOCK, we may want to + * re-send the request. + */ + if (error != EWOULDBLOCK) { + errp->re_errno = error; + if (error == EINTR || error == ERESTART) + errp->re_status = stat = RPC_INTR; + else + errp->re_status = stat = RPC_CANTRECV; + goto out; + } + + time_waited = ticks - starttime; + + /* Check for timeout. */ + if (time_waited > timeout) { + errp->re_errno = EWOULDBLOCK; + errp->re_status = stat = RPC_TIMEDOUT; + goto out; + } + + /* Retransmit if necessary. */ + if (time_waited >= next_sendtime) { + cu->cu_cwnd /= 2; + if (cu->cu_cwnd < CWNDSCALE) + cu->cu_cwnd = CWNDSCALE; + if (ext && ext->rc_feedback) { + mtx_unlock(&cs->cs_lock); + if (retrans == 0) + ext->rc_feedback(FEEDBACK_REXMIT1, + proc, ext->rc_feedback_arg); + else + ext->rc_feedback(FEEDBACK_REXMIT2, + proc, ext->rc_feedback_arg); + mtx_lock(&cs->cs_lock); + } + if (cu->cu_closing || cu->cu_closed) { + errp->re_errno = ESHUTDOWN; + errp->re_status = stat = RPC_CANTRECV; + goto out; + } + retrans++; + /* update retransmit_time */ + if (retransmit_time < RPC_MAX_BACKOFF * hz) + retransmit_time = 2 * retransmit_time; + next_sendtime += retransmit_time; + goto send_again; + } + cu->cu_sent += CWNDSCALE; + TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); + } + +got_reply: + /* + * Now decode and validate the response. We need to drop the + * lock since xdr_replymsg may end up sleeping in malloc. + */ + mtx_unlock(&cs->cs_lock); + + if (ext && ext->rc_feedback) + ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg); + + xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE); + ok = xdr_replymsg(&xdrs, &reply_msg); + cr->cr_mrep = NULL; + + if (ok) { + if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) && + (reply_msg.acpted_rply.ar_stat == SUCCESS)) + errp->re_status = stat = RPC_SUCCESS; + else + stat = _seterr_reply(&reply_msg, &(cu->cu_error)); + + if (errp->re_status == RPC_SUCCESS) { + results = xdrmbuf_getall(&xdrs); + if (! AUTH_VALIDATE(auth, xid, + &reply_msg.acpted_rply.ar_verf, + &results)) { + errp->re_status = stat = RPC_AUTHERROR; + errp->re_why = AUTH_INVALIDRESP; + if (retrans && + auth->ah_cred.oa_flavor == RPCSEC_GSS) { + /* + * If we retransmitted, its + * possible that we will + * receive a reply for one of + * the earlier transmissions + * (which will use an older + * RPCSEC_GSS sequence + * number). In this case, just + * go back and listen for a + * new reply. We could keep a + * record of all the seq + * numbers we have transmitted + * so far so that we could + * accept a reply for any of + * them here. + */ + XDR_DESTROY(&xdrs); + mtx_lock(&cs->cs_lock); + cu->cu_sent += CWNDSCALE; + TAILQ_INSERT_TAIL(&cs->cs_pending, + cr, cr_link); + cr->cr_mrep = NULL; + goto get_reply; + } + } else { + *resultsp = results; + } + } /* end successful completion */ + /* + * If unsuccessful AND error is an authentication error + * then refresh credentials and try again, else break + */ + else if (stat == RPC_AUTHERROR) + /* maybe our credentials need to be refreshed ... */ + if (nrefreshes > 0 && + AUTH_REFRESH(auth, &reply_msg)) { + nrefreshes--; + XDR_DESTROY(&xdrs); + mtx_lock(&cs->cs_lock); + goto call_again; + } + /* end of unsuccessful completion */ + } /* end of valid reply message */ + else { + errp->re_status = stat = RPC_CANTDECODERES; + + } + XDR_DESTROY(&xdrs); + mtx_lock(&cs->cs_lock); +out: + mtx_assert(&cs->cs_lock, MA_OWNED); + + if (mreq) + m_freem(mreq); + if (cr->cr_mrep) + m_freem(cr->cr_mrep); + + cu->cu_threads--; + if (cu->cu_closing) + wakeup(cu); + + mtx_unlock(&cs->cs_lock); + + if (auth && stat != RPC_SUCCESS) + AUTH_VALIDATE(auth, xid, NULL, NULL); + + free(cr, M_RPC); + + return (stat); +} + +static void +clnt_dg_geterr(CLIENT *cl, struct rpc_err *errp) +{ + struct cu_data *cu = (struct cu_data *)cl->cl_private; + + *errp = cu->cu_error; +} + +static bool_t +clnt_dg_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr) +{ + XDR xdrs; + bool_t dummy; + + xdrs.x_op = XDR_FREE; + dummy = (*xdr_res)(&xdrs, res_ptr); + + return (dummy); +} + +/*ARGSUSED*/ +static void +clnt_dg_abort(CLIENT *h) +{ +} + +static bool_t +clnt_dg_control(CLIENT *cl, u_int request, void *info) +{ + struct cu_data *cu = (struct cu_data *)cl->cl_private; + struct cu_socket *cs; + struct sockaddr *addr; + + cs = cu->cu_socket->so_rcv.sb_upcallarg; + mtx_lock(&cs->cs_lock); + + switch (request) { + case CLSET_FD_CLOSE: + cu->cu_closeit = TRUE; + mtx_unlock(&cs->cs_lock); + return (TRUE); + case CLSET_FD_NCLOSE: + cu->cu_closeit = FALSE; + mtx_unlock(&cs->cs_lock); + return (TRUE); + } + + /* for other requests which use info */ + if (info == NULL) { + mtx_unlock(&cs->cs_lock); + return (FALSE); + } + switch (request) { + case CLSET_TIMEOUT: + if (time_not_ok((struct timeval *)info)) { + mtx_unlock(&cs->cs_lock); + return (FALSE); + } + cu->cu_total = *(struct timeval *)info; + break; + case CLGET_TIMEOUT: + *(struct timeval *)info = cu->cu_total; + break; + case CLSET_RETRY_TIMEOUT: + if (time_not_ok((struct timeval *)info)) { + mtx_unlock(&cs->cs_lock); + return (FALSE); + } + cu->cu_wait = *(struct timeval *)info; + break; + case CLGET_RETRY_TIMEOUT: + *(struct timeval *)info = cu->cu_wait; + break; + case CLGET_SVC_ADDR: + /* + * Slightly different semantics to userland - we use + * sockaddr instead of netbuf. + */ + memcpy(info, &cu->cu_raddr, cu->cu_raddr.ss_len); + break; + case CLSET_SVC_ADDR: /* set to new address */ + addr = (struct sockaddr *)info; + (void) memcpy(&cu->cu_raddr, addr, addr->sa_len); + break; + case CLGET_XID: + *(uint32_t *)info = cu->cu_xid; + break; + + case CLSET_XID: + /* This will set the xid of the NEXT call */ + /* decrement by 1 as clnt_dg_call() increments once */ + cu->cu_xid = *(uint32_t *)info - 1; + break; + + case CLGET_VERS: + /* + * This RELIES on the information that, in the call body, + * the version number field is the fifth field from the + * beginning of the RPC header. MUST be changed if the + * call_struct is changed + */ + *(uint32_t *)info = + ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + + 4 * BYTES_PER_XDR_UNIT)); + break; + + case CLSET_VERS: + *(uint32_t *)(void *)(cu->cu_mcallc + 4 * BYTES_PER_XDR_UNIT) + = htonl(*(uint32_t *)info); + break; + + case CLGET_PROG: + /* + * This RELIES on the information that, in the call body, + * the program number field is the fourth field from the + * beginning of the RPC header. MUST be changed if the + * call_struct is changed + */ + *(uint32_t *)info = + ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + + 3 * BYTES_PER_XDR_UNIT)); + break; + + case CLSET_PROG: + *(uint32_t *)(void *)(cu->cu_mcallc + 3 * BYTES_PER_XDR_UNIT) + = htonl(*(uint32_t *)info); + break; + case CLSET_ASYNC: + cu->cu_async = *(int *)info; + break; + case CLSET_CONNECT: + cu->cu_connect = *(int *)info; + break; + case CLSET_WAITCHAN: + cu->cu_waitchan = (const char *)info; + break; + case CLGET_WAITCHAN: + *(const char **) info = cu->cu_waitchan; + break; + case CLSET_INTERRUPTIBLE: + if (*(int *) info) + cu->cu_waitflag = PCATCH; + else + cu->cu_waitflag = 0; + break; + case CLGET_INTERRUPTIBLE: + if (cu->cu_waitflag) + *(int *) info = TRUE; + else + *(int *) info = FALSE; + break; + default: + mtx_unlock(&cs->cs_lock); + return (FALSE); + } + mtx_unlock(&cs->cs_lock); + return (TRUE); +} + +static void +clnt_dg_close(CLIENT *cl) +{ + struct cu_data *cu = (struct cu_data *)cl->cl_private; + struct cu_socket *cs; + struct cu_request *cr; + + cs = cu->cu_socket->so_rcv.sb_upcallarg; + mtx_lock(&cs->cs_lock); + + if (cu->cu_closed) { + mtx_unlock(&cs->cs_lock); + return; + } + + if (cu->cu_closing) { + while (cu->cu_closing) + msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); + KASSERT(cu->cu_closed, ("client should be closed")); + mtx_unlock(&cs->cs_lock); + return; + } + + /* + * Abort any pending requests and wait until everyone + * has finished with clnt_vc_call. + */ + cu->cu_closing = TRUE; + TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { + if (cr->cr_client == cl) { + cr->cr_xid = 0; + cr->cr_error = ESHUTDOWN; + wakeup(cr); + } + } + + while (cu->cu_threads) + msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); + + cu->cu_closing = FALSE; + cu->cu_closed = TRUE; + + mtx_unlock(&cs->cs_lock); + wakeup(cu); +} + +static void +clnt_dg_destroy(CLIENT *cl) +{ + struct cu_data *cu = (struct cu_data *)cl->cl_private; + struct cu_socket *cs; + struct socket *so = NULL; + bool_t lastsocketref; + + cs = cu->cu_socket->so_rcv.sb_upcallarg; + clnt_dg_close(cl); + + SOCKBUF_LOCK(&cu->cu_socket->so_rcv); + mtx_lock(&cs->cs_lock); + + cs->cs_refs--; + if (cs->cs_refs == 0) { + mtx_unlock(&cs->cs_lock); + soupcall_clear(cu->cu_socket, SO_RCV); + clnt_dg_upcallsdone(cu->cu_socket, cs); + SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); + mtx_destroy(&cs->cs_lock); + mem_free(cs, sizeof(*cs)); + lastsocketref = TRUE; + } else { + mtx_unlock(&cs->cs_lock); + SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); + lastsocketref = FALSE; + } + + if (cu->cu_closeit && lastsocketref) { + so = cu->cu_socket; + cu->cu_socket = NULL; + } + + if (so) + soclose(so); + + if (cl->cl_netid && cl->cl_netid[0]) + mem_free(cl->cl_netid, strlen(cl->cl_netid) +1); + if (cl->cl_tp && cl->cl_tp[0]) + mem_free(cl->cl_tp, strlen(cl->cl_tp) +1); + mem_free(cu, sizeof (*cu)); + mem_free(cl, sizeof (CLIENT)); +} + +/* + * Make sure that the time is not garbage. -1 value is allowed. + */ +static bool_t +time_not_ok(struct timeval *t) +{ + return (t->tv_sec < -1 || t->tv_sec > 100000000 || + t->tv_usec < -1 || t->tv_usec > 1000000); +} + +int +clnt_dg_soupcall(struct socket *so, void *arg, int waitflag) +{ + struct cu_socket *cs = (struct cu_socket *) arg; + struct uio uio; + struct mbuf *m; + struct mbuf *control; + struct cu_request *cr; + int error, rcvflag, foundreq; + uint32_t xid; + + cs->cs_upcallrefs++; + uio.uio_resid = 1000000000; + uio.uio_td = curthread; + do { + SOCKBUF_UNLOCK(&so->so_rcv); + m = NULL; + control = NULL; + rcvflag = MSG_DONTWAIT; + error = soreceive(so, NULL, &uio, &m, &control, &rcvflag); + if (control) + m_freem(control); + SOCKBUF_LOCK(&so->so_rcv); + + if (error == EWOULDBLOCK) + break; + + /* + * If there was an error, wake up all pending + * requests. + */ + if (error) { + mtx_lock(&cs->cs_lock); + TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { + cr->cr_xid = 0; + cr->cr_error = error; + wakeup(cr); + } + mtx_unlock(&cs->cs_lock); + break; + } + + /* + * The XID is in the first uint32_t of the reply. + */ + if (m->m_len < sizeof(xid) && m_length(m, NULL) < sizeof(xid)) { + /* + * Should never happen. + */ + m_freem(m); + continue; + } + + m_copydata(m, 0, sizeof(xid), (char *)&xid); + xid = ntohl(xid); + + /* + * Attempt to match this reply with a pending request. + */ + mtx_lock(&cs->cs_lock); + foundreq = 0; + TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { + if (cr->cr_xid == xid) { + /* + * This one matches. We leave the + * reply mbuf in cr->cr_mrep. Set the + * XID to zero so that we will ignore + * any duplicated replies that arrive + * before clnt_dg_call removes it from + * the queue. + */ + cr->cr_xid = 0; + cr->cr_mrep = m; + cr->cr_error = 0; + foundreq = 1; + wakeup(cr); + break; + } + } + mtx_unlock(&cs->cs_lock); + + /* + * If we didn't find the matching request, just drop + * it - its probably a repeated reply. + */ + if (!foundreq) + m_freem(m); + } while (m); + cs->cs_upcallrefs--; + if (cs->cs_upcallrefs < 0) + panic("rpcdg upcall refcnt"); + if (cs->cs_upcallrefs == 0) + wakeup(&cs->cs_upcallrefs); + return (SU_OK); +} + +/* + * Wait for all upcalls in progress to complete. + */ +static void +clnt_dg_upcallsdone(struct socket *so, struct cu_socket *cs) +{ + + SOCKBUF_LOCK_ASSERT(&so->so_rcv); + + while (cs->cs_upcallrefs > 0) + (void) msleep(&cs->cs_upcallrefs, SOCKBUF_MTX(&so->so_rcv), 0, + "rpcdgup", 0); +} |