linux_dsm_epyc7002/net/sunrpc/xprtrdma/transport.c
Linus Torvalds e6b9257280 NFS client updates for Linux 4.21
Note that there is a conflict with the rdma tree in this pull request, since
 we delete a file that has been changed in the rdma tree.  Hopefully that's
 easy enough to resolve!
 
 We also were unable to track down a maintainer for Neil Brown's changes to
 the generic cred code that are prerequisites to his RPC cred cleanup patches.
 We've been asking around for several months without any response, so
 hopefully it's okay to include those patches in this pull request.
 
 Stable bugfixes:
 - xprtrdma: Yet another double DMA-unmap # v4.20
 
 Features:
 - Allow some /proc/sys/sunrpc entries without CONFIG_SUNRPC_DEBUG
 - Per-xprt rdma receive workqueues
 - Drop support for FMR memory registration
 - Make port= mount option optional for RDMA mounts
 
 Other bugfixes and cleanups:
 - Remove unused nfs4_xdev_fs_type declaration
 - Fix comments for behavior that has changed
 - Remove generic RPC credentials by switching to 'struct cred'
 - Fix crossing mountpoints with different auth flavors
 - Various xprtrdma fixes from testing and auditing the close code
 - Fixes for disconnect issues when using xprtrdma with krb5
 - Clean up and improve xprtrdma trace points
 - Fix NFS v4.2 async copy reboot recovery
 -----BEGIN PGP SIGNATURE-----
 
 iQIzBAABCAAdFiEEnZ5MQTpR7cLU7KEp18tUv7ClQOsFAlwtO50ACgkQ18tUv7Cl
 QOtZWQ//e5Hhp2TnQZ6U+99YKedjwBHP6psH3GKSEdeHSNdlSpZ5ckgHxvMb9TBa
 6t4ecgv5P/uYLIePQ0u2ubUFc9+TlyGi7Iacx13/YhK7kihGHDPnZhfl0QbYixV7
 rwa9bFcKmOrXs8ld+Hw3P2UL22G1gMf/LHDhPNshbW7LFZmcshKz+mKTk70kwkq9
 v7tFC59p6GwV8Sr2YI2NXn2fOWsUS00sQfgj2jceJYJ8PsNa+wHYF4wPj2IY5NsE
 D5Oq2kLPbytBhCllOHgopNZaf4qb5BfqhVETyc1O+kDF3BZKUhQ1PoDi2FPinaHM
 5/d8hS+5fr3eMBsQrPWQLXYjWQFUXnkQQJvU3Bo52AIgomsk/8uBq3FvH7XmFcBd
 C8sgnuUAkAS8feMes8GCS50BTxclnGuYGdyFJyCRXoG9Kn9rMrw9EKitky6EVq0v
 NmXhW79jK84a3yDXVlAIpZ8Y9BU/HQ3GviGX8lQEdZU9YiYRzDIHvpMFwzMgqaBi
 XvLbr8PlLOm8GZokThS8QYT/G2Wu6IwfUq/AufVjVD4+HiL3duKKfWSGAvcm6aAa
 GoRF6UG+OmjWlzKojtRc1dI+sy22Fzh+DW+Mx6tuf/b/66wkmYnW7eKcV4rt6Tm5
 /JEhvTMo9q7elL/4FgCoMCcdoc5eXqQyXRXrQiOU7YHLzn2aWU0=
 =DvVW
 -----END PGP SIGNATURE-----

Merge tag 'nfs-for-4.21-1' of git://git.linux-nfs.org/projects/anna/linux-nfs

Pull NFS client updates from Anna Schumaker:
 "Stable bugfixes:
   - xprtrdma: Yet another double DMA-unmap # v4.20

  Features:
   - Allow some /proc/sys/sunrpc entries without CONFIG_SUNRPC_DEBUG
   - Per-xprt rdma receive workqueues
   - Drop support for FMR memory registration
   - Make port= mount option optional for RDMA mounts

  Other bugfixes and cleanups:
   - Remove unused nfs4_xdev_fs_type declaration
   - Fix comments for behavior that has changed
   - Remove generic RPC credentials by switching to 'struct cred'
   - Fix crossing mountpoints with different auth flavors
   - Various xprtrdma fixes from testing and auditing the close code
   - Fixes for disconnect issues when using xprtrdma with krb5
   - Clean up and improve xprtrdma trace points
   - Fix NFS v4.2 async copy reboot recovery"

* tag 'nfs-for-4.21-1' of git://git.linux-nfs.org/projects/anna/linux-nfs: (63 commits)
  sunrpc: convert to DEFINE_SHOW_ATTRIBUTE
  sunrpc: Add xprt after nfs4_test_session_trunk()
  sunrpc: convert unnecessary GFP_ATOMIC to GFP_NOFS
  sunrpc: handle ENOMEM in rpcb_getport_async
  NFS: remove unnecessary test for IS_ERR(cred)
  xprtrdma: Prevent leak of rpcrdma_rep objects
  NFSv4.2 fix async copy reboot recovery
  xprtrdma: Don't leak freed MRs
  xprtrdma: Add documenting comment for rpcrdma_buffer_destroy
  xprtrdma: Replace outdated comment for rpcrdma_ep_post
  xprtrdma: Update comments in frwr_op_send
  SUNRPC: Fix some kernel doc complaints
  SUNRPC: Simplify defining common RPC trace events
  NFS: Fix NFSv4 symbolic trace point output
  xprtrdma: Trace mapping, alloc, and dereg failures
  xprtrdma: Add trace points for calls to transport switch methods
  xprtrdma: Relocate the xprtrdma_mr_map trace points
  xprtrdma: Clean up of xprtrdma chunk trace points
  xprtrdma: Remove unused fields from rpcrdma_ia
  xprtrdma: Cull dprintk() call sites
  ...
2019-01-02 16:35:23 -08:00

883 lines
24 KiB
C

// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
/*
* Copyright (c) 2014-2017 Oracle. All rights reserved.
* Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
* General Public License (GPL) Version 2, available from the file
* COPYING in the main directory of this source tree, or the BSD-type
* license below:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided
* with the distribution.
*
* Neither the name of the Network Appliance, Inc. nor the names of
* its contributors may be used to endorse or promote products
* derived from this software without specific prior written
* permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/*
* transport.c
*
* This file contains the top-level implementation of an RPC RDMA
* transport.
*
* Naming convention: functions beginning with xprt_ are part of the
* transport switch. All others are RPC RDMA internal.
*/
#include <linux/module.h>
#include <linux/slab.h>
#include <linux/seq_file.h>
#include <linux/smp.h>
#include <linux/sunrpc/addr.h>
#include <linux/sunrpc/svc_rdma.h>
#include "xprt_rdma.h"
#include <trace/events/rpcrdma.h>
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
# define RPCDBG_FACILITY RPCDBG_TRANS
#endif
/*
* tunables
*/
static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE;
static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
unsigned int xprt_rdma_memreg_strategy = RPCRDMA_FRWR;
int xprt_rdma_pad_optimize;
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE;
static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE;
static unsigned int min_inline_size = RPCRDMA_MIN_INLINE;
static unsigned int max_inline_size = RPCRDMA_MAX_INLINE;
static unsigned int zero;
static unsigned int max_padding = PAGE_SIZE;
static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS;
static unsigned int max_memreg = RPCRDMA_LAST - 1;
static unsigned int dummy;
static struct ctl_table_header *sunrpc_table_header;
static struct ctl_table xr_tunables_table[] = {
{
.procname = "rdma_slot_table_entries",
.data = &xprt_rdma_slot_table_entries,
.maxlen = sizeof(unsigned int),
.mode = 0644,
.proc_handler = proc_dointvec_minmax,
.extra1 = &min_slot_table_size,
.extra2 = &max_slot_table_size
},
{
.procname = "rdma_max_inline_read",
.data = &xprt_rdma_max_inline_read,
.maxlen = sizeof(unsigned int),
.mode = 0644,
.proc_handler = proc_dointvec_minmax,
.extra1 = &min_inline_size,
.extra2 = &max_inline_size,
},
{
.procname = "rdma_max_inline_write",
.data = &xprt_rdma_max_inline_write,
.maxlen = sizeof(unsigned int),
.mode = 0644,
.proc_handler = proc_dointvec_minmax,
.extra1 = &min_inline_size,
.extra2 = &max_inline_size,
},
{
.procname = "rdma_inline_write_padding",
.data = &dummy,
.maxlen = sizeof(unsigned int),
.mode = 0644,
.proc_handler = proc_dointvec_minmax,
.extra1 = &zero,
.extra2 = &max_padding,
},
{
.procname = "rdma_memreg_strategy",
.data = &xprt_rdma_memreg_strategy,
.maxlen = sizeof(unsigned int),
.mode = 0644,
.proc_handler = proc_dointvec_minmax,
.extra1 = &min_memreg,
.extra2 = &max_memreg,
},
{
.procname = "rdma_pad_optimize",
.data = &xprt_rdma_pad_optimize,
.maxlen = sizeof(unsigned int),
.mode = 0644,
.proc_handler = proc_dointvec,
},
{ },
};
static struct ctl_table sunrpc_table[] = {
{
.procname = "sunrpc",
.mode = 0555,
.child = xr_tunables_table
},
{ },
};
#endif
static const struct rpc_xprt_ops xprt_rdma_procs;
static void
xprt_rdma_format_addresses4(struct rpc_xprt *xprt, struct sockaddr *sap)
{
struct sockaddr_in *sin = (struct sockaddr_in *)sap;
char buf[20];
snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr));
xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
xprt->address_strings[RPC_DISPLAY_NETID] = RPCBIND_NETID_RDMA;
}
static void
xprt_rdma_format_addresses6(struct rpc_xprt *xprt, struct sockaddr *sap)
{
struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)sap;
char buf[40];
snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
xprt->address_strings[RPC_DISPLAY_NETID] = RPCBIND_NETID_RDMA6;
}
void
xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap)
{
char buf[128];
switch (sap->sa_family) {
case AF_INET:
xprt_rdma_format_addresses4(xprt, sap);
break;
case AF_INET6:
xprt_rdma_format_addresses6(xprt, sap);
break;
default:
pr_err("rpcrdma: Unrecognized address family\n");
return;
}
(void)rpc_ntop(sap, buf, sizeof(buf));
xprt->address_strings[RPC_DISPLAY_ADDR] = kstrdup(buf, GFP_KERNEL);
snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap));
xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap));
xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
xprt->address_strings[RPC_DISPLAY_PROTO] = "rdma";
}
void
xprt_rdma_free_addresses(struct rpc_xprt *xprt)
{
unsigned int i;
for (i = 0; i < RPC_DISPLAY_MAX; i++)
switch (i) {
case RPC_DISPLAY_PROTO:
case RPC_DISPLAY_NETID:
continue;
default:
kfree(xprt->address_strings[i]);
}
}
/**
* xprt_rdma_connect_worker - establish connection in the background
* @work: worker thread context
*
* Requester holds the xprt's send lock to prevent activity on this
* transport while a fresh connection is being established. RPC tasks
* sleep on the xprt's pending queue waiting for connect to complete.
*/
static void
xprt_rdma_connect_worker(struct work_struct *work)
{
struct rpcrdma_xprt *r_xprt = container_of(work, struct rpcrdma_xprt,
rx_connect_worker.work);
struct rpc_xprt *xprt = &r_xprt->rx_xprt;
int rc;
rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia);
xprt_clear_connecting(xprt);
if (r_xprt->rx_ep.rep_connected > 0) {
if (!xprt_test_and_set_connected(xprt)) {
xprt->stat.connect_count++;
xprt->stat.connect_time += (long)jiffies -
xprt->stat.connect_start;
xprt_wake_pending_tasks(xprt, -EAGAIN);
}
} else {
if (xprt_test_and_clear_connected(xprt))
xprt_wake_pending_tasks(xprt, rc);
}
}
/**
* xprt_rdma_inject_disconnect - inject a connection fault
* @xprt: transport context
*
* If @xprt is connected, disconnect it to simulate spurious connection
* loss.
*/
static void
xprt_rdma_inject_disconnect(struct rpc_xprt *xprt)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
trace_xprtrdma_op_inject_dsc(r_xprt);
rdma_disconnect(r_xprt->rx_ia.ri_id);
}
/**
* xprt_rdma_destroy - Full tear down of transport
* @xprt: doomed transport context
*
* Caller guarantees there will be no more calls to us with
* this @xprt.
*/
static void
xprt_rdma_destroy(struct rpc_xprt *xprt)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
trace_xprtrdma_op_destroy(r_xprt);
cancel_delayed_work_sync(&r_xprt->rx_connect_worker);
rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia);
rpcrdma_buffer_destroy(&r_xprt->rx_buf);
rpcrdma_ia_close(&r_xprt->rx_ia);
xprt_rdma_free_addresses(xprt);
xprt_free(xprt);
module_put(THIS_MODULE);
}
static const struct rpc_timeout xprt_rdma_default_timeout = {
.to_initval = 60 * HZ,
.to_maxval = 60 * HZ,
};
/**
* xprt_setup_rdma - Set up transport to use RDMA
*
* @args: rpc transport arguments
*/
static struct rpc_xprt *
xprt_setup_rdma(struct xprt_create *args)
{
struct rpcrdma_create_data_internal cdata;
struct rpc_xprt *xprt;
struct rpcrdma_xprt *new_xprt;
struct rpcrdma_ep *new_ep;
struct sockaddr *sap;
int rc;
if (args->addrlen > sizeof(xprt->addr))
return ERR_PTR(-EBADF);
xprt = xprt_alloc(args->net, sizeof(struct rpcrdma_xprt), 0, 0);
if (!xprt)
return ERR_PTR(-ENOMEM);
/* 60 second timeout, no retries */
xprt->timeout = &xprt_rdma_default_timeout;
xprt->bind_timeout = RPCRDMA_BIND_TO;
xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
xprt->resvport = 0; /* privileged port not needed */
xprt->tsh_size = 0; /* RPC-RDMA handles framing */
xprt->ops = &xprt_rdma_procs;
/*
* Set up RDMA-specific connect data.
*/
sap = args->dstaddr;
/* Ensure xprt->addr holds valid server TCP (not RDMA)
* address, for any side protocols which peek at it */
xprt->prot = IPPROTO_TCP;
xprt->addrlen = args->addrlen;
memcpy(&xprt->addr, sap, xprt->addrlen);
if (rpc_get_port(sap))
xprt_set_bound(xprt);
xprt_rdma_format_addresses(xprt, sap);
cdata.max_requests = xprt_rdma_slot_table_entries;
cdata.rsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA write max */
cdata.wsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA read max */
cdata.inline_wsize = xprt_rdma_max_inline_write;
if (cdata.inline_wsize > cdata.wsize)
cdata.inline_wsize = cdata.wsize;
cdata.inline_rsize = xprt_rdma_max_inline_read;
if (cdata.inline_rsize > cdata.rsize)
cdata.inline_rsize = cdata.rsize;
/*
* Create new transport instance, which includes initialized
* o ia
* o endpoint
* o buffers
*/
new_xprt = rpcx_to_rdmax(xprt);
rc = rpcrdma_ia_open(new_xprt);
if (rc)
goto out1;
/*
* initialize and create ep
*/
new_xprt->rx_data = cdata;
new_ep = &new_xprt->rx_ep;
rc = rpcrdma_ep_create(&new_xprt->rx_ep,
&new_xprt->rx_ia, &new_xprt->rx_data);
if (rc)
goto out2;
rc = rpcrdma_buffer_create(new_xprt);
if (rc)
goto out3;
INIT_DELAYED_WORK(&new_xprt->rx_connect_worker,
xprt_rdma_connect_worker);
xprt->max_payload = frwr_maxpages(new_xprt);
if (xprt->max_payload == 0)
goto out4;
xprt->max_payload <<= PAGE_SHIFT;
dprintk("RPC: %s: transport data payload maximum: %zu bytes\n",
__func__, xprt->max_payload);
if (!try_module_get(THIS_MODULE))
goto out4;
dprintk("RPC: %s: %s:%s\n", __func__,
xprt->address_strings[RPC_DISPLAY_ADDR],
xprt->address_strings[RPC_DISPLAY_PORT]);
trace_xprtrdma_create(new_xprt);
return xprt;
out4:
rpcrdma_buffer_destroy(&new_xprt->rx_buf);
rc = -ENODEV;
out3:
rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia);
out2:
rpcrdma_ia_close(&new_xprt->rx_ia);
out1:
trace_xprtrdma_op_destroy(new_xprt);
xprt_rdma_free_addresses(xprt);
xprt_free(xprt);
return ERR_PTR(rc);
}
/**
* xprt_rdma_close - close a transport connection
* @xprt: transport context
*
* Called during autoclose or device removal.
*
* Caller holds @xprt's send lock to prevent activity on this
* transport while the connection is torn down.
*/
void xprt_rdma_close(struct rpc_xprt *xprt)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
struct rpcrdma_ep *ep = &r_xprt->rx_ep;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
might_sleep();
trace_xprtrdma_op_close(r_xprt);
/* Prevent marshaling and sending of new requests */
xprt_clear_connected(xprt);
if (test_and_clear_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags)) {
rpcrdma_ia_remove(ia);
goto out;
}
if (ep->rep_connected == -ENODEV)
return;
if (ep->rep_connected > 0)
xprt->reestablish_timeout = 0;
rpcrdma_ep_disconnect(ep, ia);
/* Prepare @xprt for the next connection by reinitializing
* its credit grant to one (see RFC 8166, Section 3.3.3).
*/
r_xprt->rx_buf.rb_credits = 1;
xprt->cwnd = RPC_CWNDSHIFT;
out:
++xprt->connect_cookie;
xprt_disconnect_done(xprt);
}
/**
* xprt_rdma_set_port - update server port with rpcbind result
* @xprt: controlling RPC transport
* @port: new port value
*
* Transport connect status is unchanged.
*/
static void
xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port)
{
struct sockaddr *sap = (struct sockaddr *)&xprt->addr;
char buf[8];
dprintk("RPC: %s: setting port for xprt %p (%s:%s) to %u\n",
__func__, xprt,
xprt->address_strings[RPC_DISPLAY_ADDR],
xprt->address_strings[RPC_DISPLAY_PORT],
port);
rpc_set_port(sap, port);
kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
snprintf(buf, sizeof(buf), "%u", port);
xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
snprintf(buf, sizeof(buf), "%4hx", port);
xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
}
/**
* xprt_rdma_timer - invoked when an RPC times out
* @xprt: controlling RPC transport
* @task: RPC task that timed out
*
* Invoked when the transport is still connected, but an RPC
* retransmit timeout occurs.
*
* Since RDMA connections don't have a keep-alive, forcibly
* disconnect and retry to connect. This drives full
* detection of the network path, and retransmissions of
* all pending RPCs.
*/
static void
xprt_rdma_timer(struct rpc_xprt *xprt, struct rpc_task *task)
{
xprt_force_disconnect(xprt);
}
/**
* xprt_rdma_connect - try to establish a transport connection
* @xprt: transport state
* @task: RPC scheduler context
*
*/
static void
xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
trace_xprtrdma_op_connect(r_xprt);
if (r_xprt->rx_ep.rep_connected != 0) {
/* Reconnect */
schedule_delayed_work(&r_xprt->rx_connect_worker,
xprt->reestablish_timeout);
xprt->reestablish_timeout <<= 1;
if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO)
xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO;
else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
} else {
schedule_delayed_work(&r_xprt->rx_connect_worker, 0);
if (!RPC_IS_ASYNC(task))
flush_delayed_work(&r_xprt->rx_connect_worker);
}
}
/**
* xprt_rdma_alloc_slot - allocate an rpc_rqst
* @xprt: controlling RPC transport
* @task: RPC task requesting a fresh rpc_rqst
*
* tk_status values:
* %0 if task->tk_rqstp points to a fresh rpc_rqst
* %-EAGAIN if no rpc_rqst is available; queued on backlog
*/
static void
xprt_rdma_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
struct rpcrdma_req *req;
req = rpcrdma_buffer_get(&r_xprt->rx_buf);
if (!req)
goto out_sleep;
task->tk_rqstp = &req->rl_slot;
task->tk_status = 0;
return;
out_sleep:
rpc_sleep_on(&xprt->backlog, task, NULL);
task->tk_status = -EAGAIN;
}
/**
* xprt_rdma_free_slot - release an rpc_rqst
* @xprt: controlling RPC transport
* @rqst: rpc_rqst to release
*
*/
static void
xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst)
{
memset(rqst, 0, sizeof(*rqst));
rpcrdma_buffer_put(rpcr_to_rdmar(rqst));
rpc_wake_up_next(&xprt->backlog);
}
static bool
rpcrdma_get_sendbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
size_t size, gfp_t flags)
{
struct rpcrdma_regbuf *rb;
if (req->rl_sendbuf && rdmab_length(req->rl_sendbuf) >= size)
return true;
rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, flags);
if (IS_ERR(rb))
return false;
rpcrdma_free_regbuf(req->rl_sendbuf);
r_xprt->rx_stats.hardway_register_count += size;
req->rl_sendbuf = rb;
return true;
}
/* The rq_rcv_buf is used only if a Reply chunk is necessary.
* The decision to use a Reply chunk is made later in
* rpcrdma_marshal_req. This buffer is registered at that time.
*
* Otherwise, the associated RPC Reply arrives in a separate
* Receive buffer, arbitrarily chosen by the HCA. The buffer
* allocated here for the RPC Reply is not utilized in that
* case. See rpcrdma_inline_fixup.
*
* A regbuf is used here to remember the buffer size.
*/
static bool
rpcrdma_get_recvbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
size_t size, gfp_t flags)
{
struct rpcrdma_regbuf *rb;
if (req->rl_recvbuf && rdmab_length(req->rl_recvbuf) >= size)
return true;
rb = rpcrdma_alloc_regbuf(size, DMA_NONE, flags);
if (IS_ERR(rb))
return false;
rpcrdma_free_regbuf(req->rl_recvbuf);
r_xprt->rx_stats.hardway_register_count += size;
req->rl_recvbuf = rb;
return true;
}
/**
* xprt_rdma_allocate - allocate transport resources for an RPC
* @task: RPC task
*
* Return values:
* 0: Success; rq_buffer points to RPC buffer to use
* ENOMEM: Out of memory, call again later
* EIO: A permanent error occurred, do not retry
*/
static int
xprt_rdma_allocate(struct rpc_task *task)
{
struct rpc_rqst *rqst = task->tk_rqstp;
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
gfp_t flags;
flags = RPCRDMA_DEF_GFP;
if (RPC_IS_SWAPPER(task))
flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;
if (!rpcrdma_get_sendbuf(r_xprt, req, rqst->rq_callsize, flags))
goto out_fail;
if (!rpcrdma_get_recvbuf(r_xprt, req, rqst->rq_rcvsize, flags))
goto out_fail;
rqst->rq_buffer = req->rl_sendbuf->rg_base;
rqst->rq_rbuffer = req->rl_recvbuf->rg_base;
trace_xprtrdma_op_allocate(task, req);
return 0;
out_fail:
trace_xprtrdma_op_allocate(task, NULL);
return -ENOMEM;
}
/**
* xprt_rdma_free - release resources allocated by xprt_rdma_allocate
* @task: RPC task
*
* Caller guarantees rqst->rq_buffer is non-NULL.
*/
static void
xprt_rdma_free(struct rpc_task *task)
{
struct rpc_rqst *rqst = task->tk_rqstp;
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
if (test_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags))
rpcrdma_release_rqst(r_xprt, req);
trace_xprtrdma_op_free(task, req);
}
/**
* xprt_rdma_send_request - marshal and send an RPC request
* @rqst: RPC message in rq_snd_buf
*
* Caller holds the transport's write lock.
*
* Returns:
* %0 if the RPC message has been sent
* %-ENOTCONN if the caller should reconnect and call again
* %-EAGAIN if the caller should call again
* %-ENOBUFS if the caller should call again after a delay
* %-EMSGSIZE if encoding ran out of buffer space. The request
* was not sent. Do not try to send this message again.
* %-EIO if an I/O error occurred. The request was not sent.
* Do not try to send this message again.
*/
static int
xprt_rdma_send_request(struct rpc_rqst *rqst)
{
struct rpc_xprt *xprt = rqst->rq_xprt;
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
int rc = 0;
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
if (unlikely(!rqst->rq_buffer))
return xprt_rdma_bc_send_reply(rqst);
#endif /* CONFIG_SUNRPC_BACKCHANNEL */
if (!xprt_connected(xprt))
return -ENOTCONN;
if (!xprt_request_get_cong(xprt, rqst))
return -EBADSLT;
rc = rpcrdma_marshal_req(r_xprt, rqst);
if (rc < 0)
goto failed_marshal;
/* Must suppress retransmit to maintain credits */
if (rqst->rq_connect_cookie == xprt->connect_cookie)
goto drop_connection;
rqst->rq_xtime = ktime_get();
__set_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
goto drop_connection;
rqst->rq_xmit_bytes_sent += rqst->rq_snd_buf.len;
rqst->rq_bytes_sent = 0;
/* An RPC with no reply will throw off credit accounting,
* so drop the connection to reset the credit grant.
*/
if (!rpc_reply_expected(rqst->rq_task))
goto drop_connection;
return 0;
failed_marshal:
if (rc != -ENOTCONN)
return rc;
drop_connection:
xprt_rdma_close(xprt);
return -ENOTCONN;
}
void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
long idle_time = 0;
if (xprt_connected(xprt))
idle_time = (long)(jiffies - xprt->last_used) / HZ;
seq_puts(seq, "\txprt:\trdma ");
seq_printf(seq, "%u %lu %lu %lu %ld %lu %lu %lu %llu %llu ",
0, /* need a local port? */
xprt->stat.bind_count,
xprt->stat.connect_count,
xprt->stat.connect_time / HZ,
idle_time,
xprt->stat.sends,
xprt->stat.recvs,
xprt->stat.bad_xids,
xprt->stat.req_u,
xprt->stat.bklog_u);
seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %lu %lu %lu %lu ",
r_xprt->rx_stats.read_chunk_count,
r_xprt->rx_stats.write_chunk_count,
r_xprt->rx_stats.reply_chunk_count,
r_xprt->rx_stats.total_rdma_request,
r_xprt->rx_stats.total_rdma_reply,
r_xprt->rx_stats.pullup_copy_count,
r_xprt->rx_stats.fixup_copy_count,
r_xprt->rx_stats.hardway_register_count,
r_xprt->rx_stats.failed_marshal_count,
r_xprt->rx_stats.bad_reply_count,
r_xprt->rx_stats.nomsg_call_count);
seq_printf(seq, "%lu %lu %lu %lu %lu %lu\n",
r_xprt->rx_stats.mrs_recycled,
r_xprt->rx_stats.mrs_orphaned,
r_xprt->rx_stats.mrs_allocated,
r_xprt->rx_stats.local_inv_needed,
r_xprt->rx_stats.empty_sendctx_q,
r_xprt->rx_stats.reply_waits_for_send);
}
static int
xprt_rdma_enable_swap(struct rpc_xprt *xprt)
{
return 0;
}
static void
xprt_rdma_disable_swap(struct rpc_xprt *xprt)
{
}
/*
* Plumbing for rpc transport switch and kernel module
*/
static const struct rpc_xprt_ops xprt_rdma_procs = {
.reserve_xprt = xprt_reserve_xprt_cong,
.release_xprt = xprt_release_xprt_cong, /* sunrpc/xprt.c */
.alloc_slot = xprt_rdma_alloc_slot,
.free_slot = xprt_rdma_free_slot,
.release_request = xprt_release_rqst_cong, /* ditto */
.set_retrans_timeout = xprt_set_retrans_timeout_def, /* ditto */
.timer = xprt_rdma_timer,
.rpcbind = rpcb_getport_async, /* sunrpc/rpcb_clnt.c */
.set_port = xprt_rdma_set_port,
.connect = xprt_rdma_connect,
.buf_alloc = xprt_rdma_allocate,
.buf_free = xprt_rdma_free,
.send_request = xprt_rdma_send_request,
.close = xprt_rdma_close,
.destroy = xprt_rdma_destroy,
.print_stats = xprt_rdma_print_stats,
.enable_swap = xprt_rdma_enable_swap,
.disable_swap = xprt_rdma_disable_swap,
.inject_disconnect = xprt_rdma_inject_disconnect,
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
.bc_setup = xprt_rdma_bc_setup,
.bc_maxpayload = xprt_rdma_bc_maxpayload,
.bc_free_rqst = xprt_rdma_bc_free_rqst,
.bc_destroy = xprt_rdma_bc_destroy,
#endif
};
static struct xprt_class xprt_rdma = {
.list = LIST_HEAD_INIT(xprt_rdma.list),
.name = "rdma",
.owner = THIS_MODULE,
.ident = XPRT_TRANSPORT_RDMA,
.setup = xprt_setup_rdma,
};
void xprt_rdma_cleanup(void)
{
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
if (sunrpc_table_header) {
unregister_sysctl_table(sunrpc_table_header);
sunrpc_table_header = NULL;
}
#endif
xprt_unregister_transport(&xprt_rdma);
xprt_unregister_transport(&xprt_rdma_bc);
}
int xprt_rdma_init(void)
{
int rc;
rc = xprt_register_transport(&xprt_rdma);
if (rc)
return rc;
rc = xprt_register_transport(&xprt_rdma_bc);
if (rc) {
xprt_unregister_transport(&xprt_rdma);
return rc;
}
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
if (!sunrpc_table_header)
sunrpc_table_header = register_sysctl_table(sunrpc_table);
#endif
return 0;
}