CIFS: SMBD: Implement RDMA memory registration

Memory registration is used for transferring payload via RDMA read or write.
After I/O is done, memory registrations are recovered and reused. This
process can be time consuming and is done in a work queue.

Signed-off-by: Long Li <longli@microsoft.com>
Reviewed-by: Pavel Shilovsky <pshilov@microsoft.com>
Reviewed-by: Ronnie Sahlberg <lsahlber@redhat.com>
Signed-off-by: Steve French <smfrench@gmail.com>
This commit is contained in:
Long Li 2017-11-22 17:38:44 -07:00 committed by Steve French
parent 9762c2d080
commit c739858334
2 changed files with 474 additions and 0 deletions

View File

@ -48,6 +48,9 @@ static int smbd_post_send_page(struct smbd_connection *info,
struct page *page, unsigned long offset,
size_t size, int remaining_data_length);
static void destroy_mr_list(struct smbd_connection *info);
static int allocate_mr_list(struct smbd_connection *info);
/* SMBD version number */
#define SMBD_V1 0x0100
@ -198,6 +201,12 @@ static void smbd_destroy_rdma_work(struct work_struct *work)
wait_event(info->wait_send_payload_pending,
atomic_read(&info->send_payload_pending) == 0);
log_rdma_event(INFO, "freeing mr list\n");
wake_up_interruptible_all(&info->wait_mr);
wait_event(info->wait_for_mr_cleanup,
atomic_read(&info->mr_used_count) == 0);
destroy_mr_list(info);
/* It's not posssible for upper layer to get to reassembly */
log_rdma_event(INFO, "drain the reassembly queue\n");
do {
@ -453,6 +462,16 @@ static bool process_negotiation_response(
}
info->max_fragmented_send_size =
le32_to_cpu(packet->max_fragmented_size);
info->rdma_readwrite_threshold =
rdma_readwrite_threshold > info->max_fragmented_send_size ?
info->max_fragmented_send_size :
rdma_readwrite_threshold;
info->max_readwrite_size = min_t(u32,
le32_to_cpu(packet->max_readwrite_size),
info->max_frmr_depth * PAGE_SIZE);
info->max_frmr_depth = info->max_readwrite_size / PAGE_SIZE;
return true;
}
@ -748,6 +767,12 @@ static int smbd_ia_open(
rc = -EPROTONOSUPPORT;
goto out2;
}
info->max_frmr_depth = min_t(int,
smbd_max_frmr_depth,
info->id->device->attrs.max_fast_reg_page_list_len);
info->mr_type = IB_MR_TYPE_MEM_REG;
if (info->id->device->attrs.device_cap_flags & IB_DEVICE_SG_GAPS_REG)
info->mr_type = IB_MR_TYPE_SG_GAPS;
info->pd = ib_alloc_pd(info->id->device, 0);
if (IS_ERR(info->pd)) {
@ -1582,6 +1607,8 @@ struct smbd_connection *_smbd_get_connection(
struct rdma_conn_param conn_param;
struct ib_qp_init_attr qp_attr;
struct sockaddr_in *addr_in = (struct sockaddr_in *) dstaddr;
struct ib_port_immutable port_immutable;
u32 ird_ord_hdr[2];
info = kzalloc(sizeof(struct smbd_connection), GFP_KERNEL);
if (!info)
@ -1670,6 +1697,28 @@ struct smbd_connection *_smbd_get_connection(
memset(&conn_param, 0, sizeof(conn_param));
conn_param.initiator_depth = 0;
conn_param.responder_resources =
info->id->device->attrs.max_qp_rd_atom
< SMBD_CM_RESPONDER_RESOURCES ?
info->id->device->attrs.max_qp_rd_atom :
SMBD_CM_RESPONDER_RESOURCES;
info->responder_resources = conn_param.responder_resources;
log_rdma_mr(INFO, "responder_resources=%d\n",
info->responder_resources);
/* Need to send IRD/ORD in private data for iWARP */
info->id->device->get_port_immutable(
info->id->device, info->id->port_num, &port_immutable);
if (port_immutable.core_cap_flags & RDMA_CORE_PORT_IWARP) {
ird_ord_hdr[0] = info->responder_resources;
ird_ord_hdr[1] = 1;
conn_param.private_data = ird_ord_hdr;
conn_param.private_data_len = sizeof(ird_ord_hdr);
} else {
conn_param.private_data = NULL;
conn_param.private_data_len = 0;
}
conn_param.retry_count = SMBD_CM_RETRY;
conn_param.rnr_retry_count = SMBD_CM_RNR_RETRY;
conn_param.flow_control = 0;
@ -1734,8 +1783,19 @@ struct smbd_connection *_smbd_get_connection(
goto negotiation_failed;
}
rc = allocate_mr_list(info);
if (rc) {
log_rdma_mr(ERR, "memory registration allocation failed\n");
goto allocate_mr_failed;
}
return info;
allocate_mr_failed:
/* At this point, need to a full transport shutdown */
smbd_destroy(info);
return NULL;
negotiation_failed:
cancel_delayed_work_sync(&info->idle_timer_work);
destroy_caches_and_workqueue(info);
@ -2189,3 +2249,364 @@ int smbd_send(struct smbd_connection *info, struct smb_rqst *rqst)
return rc;
}
static void register_mr_done(struct ib_cq *cq, struct ib_wc *wc)
{
struct smbd_mr *mr;
struct ib_cqe *cqe;
if (wc->status) {
log_rdma_mr(ERR, "status=%d\n", wc->status);
cqe = wc->wr_cqe;
mr = container_of(cqe, struct smbd_mr, cqe);
smbd_disconnect_rdma_connection(mr->conn);
}
}
/*
* The work queue function that recovers MRs
* We need to call ib_dereg_mr() and ib_alloc_mr() before this MR can be used
* again. Both calls are slow, so finish them in a workqueue. This will not
* block I/O path.
* There is one workqueue that recovers MRs, there is no need to lock as the
* I/O requests calling smbd_register_mr will never update the links in the
* mr_list.
*/
static void smbd_mr_recovery_work(struct work_struct *work)
{
struct smbd_connection *info =
container_of(work, struct smbd_connection, mr_recovery_work);
struct smbd_mr *smbdirect_mr;
int rc;
list_for_each_entry(smbdirect_mr, &info->mr_list, list) {
if (smbdirect_mr->state == MR_INVALIDATED ||
smbdirect_mr->state == MR_ERROR) {
if (smbdirect_mr->state == MR_INVALIDATED) {
ib_dma_unmap_sg(
info->id->device, smbdirect_mr->sgl,
smbdirect_mr->sgl_count,
smbdirect_mr->dir);
smbdirect_mr->state = MR_READY;
} else if (smbdirect_mr->state == MR_ERROR) {
/* recover this MR entry */
rc = ib_dereg_mr(smbdirect_mr->mr);
if (rc) {
log_rdma_mr(ERR,
"ib_dereg_mr faield rc=%x\n",
rc);
smbd_disconnect_rdma_connection(info);
}
smbdirect_mr->mr = ib_alloc_mr(
info->pd, info->mr_type,
info->max_frmr_depth);
if (IS_ERR(smbdirect_mr->mr)) {
log_rdma_mr(ERR,
"ib_alloc_mr failed mr_type=%x "
"max_frmr_depth=%x\n",
info->mr_type,
info->max_frmr_depth);
smbd_disconnect_rdma_connection(info);
}
smbdirect_mr->state = MR_READY;
}
/* smbdirect_mr->state is updated by this function
* and is read and updated by I/O issuing CPUs trying
* to get a MR, the call to atomic_inc_return
* implicates a memory barrier and guarantees this
* value is updated before waking up any calls to
* get_mr() from the I/O issuing CPUs
*/
if (atomic_inc_return(&info->mr_ready_count) == 1)
wake_up_interruptible(&info->wait_mr);
}
}
}
static void destroy_mr_list(struct smbd_connection *info)
{
struct smbd_mr *mr, *tmp;
cancel_work_sync(&info->mr_recovery_work);
list_for_each_entry_safe(mr, tmp, &info->mr_list, list) {
if (mr->state == MR_INVALIDATED)
ib_dma_unmap_sg(info->id->device, mr->sgl,
mr->sgl_count, mr->dir);
ib_dereg_mr(mr->mr);
kfree(mr->sgl);
kfree(mr);
}
}
/*
* Allocate MRs used for RDMA read/write
* The number of MRs will not exceed hardware capability in responder_resources
* All MRs are kept in mr_list. The MR can be recovered after it's used
* Recovery is done in smbd_mr_recovery_work. The content of list entry changes
* as MRs are used and recovered for I/O, but the list links will not change
*/
static int allocate_mr_list(struct smbd_connection *info)
{
int i;
struct smbd_mr *smbdirect_mr, *tmp;
INIT_LIST_HEAD(&info->mr_list);
init_waitqueue_head(&info->wait_mr);
spin_lock_init(&info->mr_list_lock);
atomic_set(&info->mr_ready_count, 0);
atomic_set(&info->mr_used_count, 0);
init_waitqueue_head(&info->wait_for_mr_cleanup);
/* Allocate more MRs (2x) than hardware responder_resources */
for (i = 0; i < info->responder_resources * 2; i++) {
smbdirect_mr = kzalloc(sizeof(*smbdirect_mr), GFP_KERNEL);
if (!smbdirect_mr)
goto out;
smbdirect_mr->mr = ib_alloc_mr(info->pd, info->mr_type,
info->max_frmr_depth);
if (IS_ERR(smbdirect_mr->mr)) {
log_rdma_mr(ERR, "ib_alloc_mr failed mr_type=%x "
"max_frmr_depth=%x\n",
info->mr_type, info->max_frmr_depth);
goto out;
}
smbdirect_mr->sgl = kcalloc(
info->max_frmr_depth,
sizeof(struct scatterlist),
GFP_KERNEL);
if (!smbdirect_mr->sgl) {
log_rdma_mr(ERR, "failed to allocate sgl\n");
ib_dereg_mr(smbdirect_mr->mr);
goto out;
}
smbdirect_mr->state = MR_READY;
smbdirect_mr->conn = info;
list_add_tail(&smbdirect_mr->list, &info->mr_list);
atomic_inc(&info->mr_ready_count);
}
INIT_WORK(&info->mr_recovery_work, smbd_mr_recovery_work);
return 0;
out:
kfree(smbdirect_mr);
list_for_each_entry_safe(smbdirect_mr, tmp, &info->mr_list, list) {
ib_dereg_mr(smbdirect_mr->mr);
kfree(smbdirect_mr->sgl);
kfree(smbdirect_mr);
}
return -ENOMEM;
}
/*
* Get a MR from mr_list. This function waits until there is at least one
* MR available in the list. It may access the list while the
* smbd_mr_recovery_work is recovering the MR list. This doesn't need a lock
* as they never modify the same places. However, there may be several CPUs
* issueing I/O trying to get MR at the same time, mr_list_lock is used to
* protect this situation.
*/
static struct smbd_mr *get_mr(struct smbd_connection *info)
{
struct smbd_mr *ret;
int rc;
again:
rc = wait_event_interruptible(info->wait_mr,
atomic_read(&info->mr_ready_count) ||
info->transport_status != SMBD_CONNECTED);
if (rc) {
log_rdma_mr(ERR, "wait_event_interruptible rc=%x\n", rc);
return NULL;
}
if (info->transport_status != SMBD_CONNECTED) {
log_rdma_mr(ERR, "info->transport_status=%x\n",
info->transport_status);
return NULL;
}
spin_lock(&info->mr_list_lock);
list_for_each_entry(ret, &info->mr_list, list) {
if (ret->state == MR_READY) {
ret->state = MR_REGISTERED;
spin_unlock(&info->mr_list_lock);
atomic_dec(&info->mr_ready_count);
atomic_inc(&info->mr_used_count);
return ret;
}
}
spin_unlock(&info->mr_list_lock);
/*
* It is possible that we could fail to get MR because other processes may
* try to acquire a MR at the same time. If this is the case, retry it.
*/
goto again;
}
/*
* Register memory for RDMA read/write
* pages[]: the list of pages to register memory with
* num_pages: the number of pages to register
* tailsz: if non-zero, the bytes to register in the last page
* writing: true if this is a RDMA write (SMB read), false for RDMA read
* need_invalidate: true if this MR needs to be locally invalidated after I/O
* return value: the MR registered, NULL if failed.
*/
struct smbd_mr *smbd_register_mr(
struct smbd_connection *info, struct page *pages[], int num_pages,
int tailsz, bool writing, bool need_invalidate)
{
struct smbd_mr *smbdirect_mr;
int rc, i;
enum dma_data_direction dir;
struct ib_reg_wr *reg_wr;
struct ib_send_wr *bad_wr;
if (num_pages > info->max_frmr_depth) {
log_rdma_mr(ERR, "num_pages=%d max_frmr_depth=%d\n",
num_pages, info->max_frmr_depth);
return NULL;
}
smbdirect_mr = get_mr(info);
if (!smbdirect_mr) {
log_rdma_mr(ERR, "get_mr returning NULL\n");
return NULL;
}
smbdirect_mr->need_invalidate = need_invalidate;
smbdirect_mr->sgl_count = num_pages;
sg_init_table(smbdirect_mr->sgl, num_pages);
for (i = 0; i < num_pages - 1; i++)
sg_set_page(&smbdirect_mr->sgl[i], pages[i], PAGE_SIZE, 0);
sg_set_page(&smbdirect_mr->sgl[i], pages[i],
tailsz ? tailsz : PAGE_SIZE, 0);
dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
smbdirect_mr->dir = dir;
rc = ib_dma_map_sg(info->id->device, smbdirect_mr->sgl, num_pages, dir);
if (!rc) {
log_rdma_mr(INFO, "ib_dma_map_sg num_pages=%x dir=%x rc=%x\n",
num_pages, dir, rc);
goto dma_map_error;
}
rc = ib_map_mr_sg(smbdirect_mr->mr, smbdirect_mr->sgl, num_pages,
NULL, PAGE_SIZE);
if (rc != num_pages) {
log_rdma_mr(INFO,
"ib_map_mr_sg failed rc = %x num_pages = %x\n",
rc, num_pages);
goto map_mr_error;
}
ib_update_fast_reg_key(smbdirect_mr->mr,
ib_inc_rkey(smbdirect_mr->mr->rkey));
reg_wr = &smbdirect_mr->wr;
reg_wr->wr.opcode = IB_WR_REG_MR;
smbdirect_mr->cqe.done = register_mr_done;
reg_wr->wr.wr_cqe = &smbdirect_mr->cqe;
reg_wr->wr.num_sge = 0;
reg_wr->wr.send_flags = IB_SEND_SIGNALED;
reg_wr->mr = smbdirect_mr->mr;
reg_wr->key = smbdirect_mr->mr->rkey;
reg_wr->access = writing ?
IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
IB_ACCESS_REMOTE_READ;
/*
* There is no need for waiting for complemtion on ib_post_send
* on IB_WR_REG_MR. Hardware enforces a barrier and order of execution
* on the next ib_post_send when we actaully send I/O to remote peer
*/
rc = ib_post_send(info->id->qp, &reg_wr->wr, &bad_wr);
if (!rc)
return smbdirect_mr;
log_rdma_mr(ERR, "ib_post_send failed rc=%x reg_wr->key=%x\n",
rc, reg_wr->key);
/* If all failed, attempt to recover this MR by setting it MR_ERROR*/
map_mr_error:
ib_dma_unmap_sg(info->id->device, smbdirect_mr->sgl,
smbdirect_mr->sgl_count, smbdirect_mr->dir);
dma_map_error:
smbdirect_mr->state = MR_ERROR;
if (atomic_dec_and_test(&info->mr_used_count))
wake_up(&info->wait_for_mr_cleanup);
return NULL;
}
static void local_inv_done(struct ib_cq *cq, struct ib_wc *wc)
{
struct smbd_mr *smbdirect_mr;
struct ib_cqe *cqe;
cqe = wc->wr_cqe;
smbdirect_mr = container_of(cqe, struct smbd_mr, cqe);
smbdirect_mr->state = MR_INVALIDATED;
if (wc->status != IB_WC_SUCCESS) {
log_rdma_mr(ERR, "invalidate failed status=%x\n", wc->status);
smbdirect_mr->state = MR_ERROR;
}
complete(&smbdirect_mr->invalidate_done);
}
/*
* Deregister a MR after I/O is done
* This function may wait if remote invalidation is not used
* and we have to locally invalidate the buffer to prevent data is being
* modified by remote peer after upper layer consumes it
*/
int smbd_deregister_mr(struct smbd_mr *smbdirect_mr)
{
struct ib_send_wr *wr, *bad_wr;
struct smbd_connection *info = smbdirect_mr->conn;
int rc = 0;
if (smbdirect_mr->need_invalidate) {
/* Need to finish local invalidation before returning */
wr = &smbdirect_mr->inv_wr;
wr->opcode = IB_WR_LOCAL_INV;
smbdirect_mr->cqe.done = local_inv_done;
wr->wr_cqe = &smbdirect_mr->cqe;
wr->num_sge = 0;
wr->ex.invalidate_rkey = smbdirect_mr->mr->rkey;
wr->send_flags = IB_SEND_SIGNALED;
init_completion(&smbdirect_mr->invalidate_done);
rc = ib_post_send(info->id->qp, wr, &bad_wr);
if (rc) {
log_rdma_mr(ERR, "ib_post_send failed rc=%x\n", rc);
smbd_disconnect_rdma_connection(info);
goto done;
}
wait_for_completion(&smbdirect_mr->invalidate_done);
smbdirect_mr->need_invalidate = false;
} else
/*
* For remote invalidation, just set it to MR_INVALIDATED
* and defer to mr_recovery_work to recover the MR for next use
*/
smbdirect_mr->state = MR_INVALIDATED;
/*
* Schedule the work to do MR recovery for future I/Os
* MR recovery is slow and we don't want it to block the current I/O
*/
queue_work(info->workqueue, &info->mr_recovery_work);
done:
if (atomic_dec_and_test(&info->mr_used_count))
wake_up(&info->wait_for_mr_cleanup);
return rc;
}

View File

@ -90,6 +90,29 @@ struct smbd_connection {
int receive_credit_target;
int fragment_reassembly_remaining;
/* Memory registrations */
/* Maximum number of RDMA read/write outstanding on this connection */
int responder_resources;
/* Maximum number of SGEs in a RDMA write/read */
int max_frmr_depth;
/*
* If payload is less than or equal to the threshold,
* use RDMA send/recv to send upper layer I/O.
* If payload is more than the threshold,
* use RDMA read/write through memory registration for I/O.
*/
int rdma_readwrite_threshold;
enum ib_mr_type mr_type;
struct list_head mr_list;
spinlock_t mr_list_lock;
/* The number of available MRs ready for memory registration */
atomic_t mr_ready_count;
atomic_t mr_used_count;
wait_queue_head_t wait_mr;
struct work_struct mr_recovery_work;
/* Used by transport to wait until all MRs are returned */
wait_queue_head_t wait_for_mr_cleanup;
/* Activity accoutning */
/* Pending reqeusts issued from upper layer */
int smbd_send_pending;
@ -262,6 +285,36 @@ void smbd_destroy(struct smbd_connection *info);
int smbd_recv(struct smbd_connection *info, struct msghdr *msg);
int smbd_send(struct smbd_connection *info, struct smb_rqst *rqst);
enum mr_state {
MR_READY,
MR_REGISTERED,
MR_INVALIDATED,
MR_ERROR
};
struct smbd_mr {
struct smbd_connection *conn;
struct list_head list;
enum mr_state state;
struct ib_mr *mr;
struct scatterlist *sgl;
int sgl_count;
enum dma_data_direction dir;
union {
struct ib_reg_wr wr;
struct ib_send_wr inv_wr;
};
struct ib_cqe cqe;
bool need_invalidate;
struct completion invalidate_done;
};
/* Interfaces to register and deregister MR for RDMA read/write */
struct smbd_mr *smbd_register_mr(
struct smbd_connection *info, struct page *pages[], int num_pages,
int tailsz, bool writing, bool need_invalidate);
int smbd_deregister_mr(struct smbd_mr *mr);
#else
#define cifs_rdma_enabled(server) 0
struct smbd_connection {};