From 86ef6250fcce86403b322d4eb10d551c9f9b6d25 Mon Sep 17 00:00:00 2001 From: Amir Shehata Date: Mon, 15 Feb 2016 10:25:53 -0500 Subject: [PATCH] staging: lustre: Dynamic LNet Configuration (DLC) dynamic routing This is the second patch of a set of patches that enables DLC. This patch adds the following features to LNET. Currently these features are not driven by user space. - Enabling Routing on Demand. The default number of router buffers are allocated. - Disable Routing on demand. Unused router buffers are freed and used router buffers are freed when they are no longer in use. The following time routing is enabled the default router buffer values are used. It has been decided that remembering the user set router buffer values should be remembered and re-set by user space scripts. - Increase the number of router buffers on demand, by allocating new ones. - Decrease the number of router buffers. Exccess buffers are freed if they are not in use. Otherwise they are freed once they are no longer in use. Signed-off-by: Amir Shehata Intel-bug-id: https://jira.hpdd.intel.com/browse/LU-2456 Change-Id: Id07d4ad424d8f5ba72475d4149380afe2ac54e77 Reviewed-on: http://review.whamcloud.com/9831 Reviewed-by: James Simmons Reviewed-by: Doug Oucharek Reviewed-by: Liang Zhen Reviewed-by: Oleg Drokin Signed-off-by: Greg Kroah-Hartman --- .../lustre/include/linux/lnet/lib-lnet.h | 8 +- .../lustre/include/linux/lnet/lib-types.h | 8 +- drivers/staging/lustre/lnet/lnet/api-ni.c | 4 +- drivers/staging/lustre/lnet/lnet/lib-move.c | 89 +++++- drivers/staging/lustre/lnet/lnet/router.c | 281 ++++++++++++++---- 5 files changed, 306 insertions(+), 84 deletions(-) diff --git a/drivers/staging/lustre/include/linux/lnet/lib-lnet.h b/drivers/staging/lustre/include/linux/lnet/lib-lnet.h index 77d8e37c7aca..3a1cf61018ab 100644 --- a/drivers/staging/lustre/include/linux/lnet/lib-lnet.h +++ b/drivers/staging/lustre/include/linux/lnet/lib-lnet.h @@ -461,7 +461,11 @@ int lnet_get_route(int idx, __u32 *net, __u32 *hops, void lnet_router_debugfs_init(void); void lnet_router_debugfs_fini(void); int lnet_rtrpools_alloc(int im_a_router); -void lnet_rtrpools_free(void); +void lnet_destroy_rtrbuf(lnet_rtrbuf_t *rb, int npages); +int lnet_rtrpools_adjust(int tiny, int small, int large); +int lnet_rtrpools_enable(void); +void lnet_rtrpools_disable(void); +void lnet_rtrpools_free(int keep_pools); lnet_remotenet_t *lnet_find_net_locked(__u32 net); int lnet_islocalnid(lnet_nid_t nid); @@ -481,6 +485,8 @@ void lnet_prep_send(lnet_msg_t *msg, int type, lnet_process_id_t target, int lnet_send(lnet_nid_t nid, lnet_msg_t *msg, lnet_nid_t rtr_nid); void lnet_return_tx_credits_locked(lnet_msg_t *msg); void lnet_return_rx_credits_locked(lnet_msg_t *msg); +void lnet_schedule_blocked_locked(lnet_rtrbufpool_t *rbp); +void lnet_drop_routed_msgs_locked(struct list_head *list, int cpt); /* portals functions */ /* portals attributes */ diff --git a/drivers/staging/lustre/include/linux/lnet/lib-types.h b/drivers/staging/lustre/include/linux/lnet/lib-types.h index be650d4ba5f2..b0ba9d863dcd 100644 --- a/drivers/staging/lustre/include/linux/lnet/lib-types.h +++ b/drivers/staging/lustre/include/linux/lnet/lib-types.h @@ -285,6 +285,7 @@ typedef struct lnet_ni { #define LNET_PING_FEAT_INVAL (0) /* no feature */ #define LNET_PING_FEAT_BASE (1 << 0) /* just a ping */ #define LNET_PING_FEAT_NI_STATUS (1 << 1) /* return NI status */ +#define LNET_PING_FEAT_RTE_DISABLED (1 << 2) /* Routing enabled */ #define LNET_PING_FEAT_MASK (LNET_PING_FEAT_BASE | \ LNET_PING_FEAT_NI_STATUS) @@ -410,7 +411,12 @@ typedef struct { #define LNET_PEER_HASHSIZE 503 /* prime! */ -#define LNET_NRBPOOLS 3 /* # different router buffer pools */ +#define LNET_TINY_BUF_IDX 0 +#define LNET_SMALL_BUF_IDX 1 +#define LNET_LARGE_BUF_IDX 2 + +/* # different router buffer pools */ +#define LNET_NRBPOOLS (LNET_LARGE_BUF_IDX + 1) enum { /* Didn't match anything */ diff --git a/drivers/staging/lustre/lnet/lnet/api-ni.c b/drivers/staging/lustre/lnet/lnet/api-ni.c index cd68ca73e309..06046b216c93 100644 --- a/drivers/staging/lustre/lnet/lnet/api-ni.c +++ b/drivers/staging/lustre/lnet/lnet/api-ni.c @@ -638,7 +638,7 @@ lnet_unprepare(void) lnet_msg_containers_destroy(); lnet_peer_tables_destroy(); - lnet_rtrpools_free(); + lnet_rtrpools_free(0); if (the_lnet.ln_counters) { cfs_percpt_free(the_lnet.ln_counters); @@ -1501,6 +1501,8 @@ lnet_create_ping_info(void) pinfo->pi_pid = the_lnet.ln_pid; pinfo->pi_magic = LNET_PROTO_PING_MAGIC; pinfo->pi_features = LNET_PING_FEAT_NI_STATUS; + if (!the_lnet.ln_routing) + pinfo->pi_features |= LNET_PING_FEAT_RTE_DISABLED; for (i = 0; i < n; i++) { lnet_ni_status_t *ns = &pinfo->pi_ni[i]; diff --git a/drivers/staging/lustre/lnet/lnet/lib-move.c b/drivers/staging/lustre/lnet/lnet/lib-move.c index cc8c2c558389..f2b1116e9dd9 100644 --- a/drivers/staging/lustre/lnet/lnet/lib-move.c +++ b/drivers/staging/lustre/lnet/lnet/lib-move.c @@ -945,9 +945,6 @@ lnet_post_routed_recv_locked(lnet_msg_t *msg, int do_recv) rbp = lnet_msg2bufpool(msg); if (!msg->msg_rtrcredit) { - LASSERT((rbp->rbp_credits < 0) == - !list_empty(&rbp->rbp_msgs)); - msg->msg_rtrcredit = 1; rbp->rbp_credits--; if (rbp->rbp_credits < rbp->rbp_mincredits) @@ -1038,6 +1035,43 @@ lnet_return_tx_credits_locked(lnet_msg_t *msg) } } +void +lnet_schedule_blocked_locked(lnet_rtrbufpool_t *rbp) +{ + lnet_msg_t *msg; + + if (list_empty(&rbp->rbp_msgs)) + return; + msg = list_entry(rbp->rbp_msgs.next, + lnet_msg_t, msg_list); + list_del(&msg->msg_list); + + (void)lnet_post_routed_recv_locked(msg, 1); +} + +void +lnet_drop_routed_msgs_locked(struct list_head *list, int cpt) +{ + struct list_head drop; + lnet_msg_t *msg; + lnet_msg_t *tmp; + + INIT_LIST_HEAD(&drop); + + list_splice_init(list, &drop); + + lnet_net_unlock(cpt); + + list_for_each_entry_safe(msg, tmp, &drop, msg_list) { + lnet_ni_recv(msg->msg_rxpeer->lp_ni, msg->msg_private, NULL, + 0, 0, 0, msg->msg_hdr.payload_length); + list_del_init(&msg->msg_list); + lnet_finalize(NULL, msg, -ECANCELED); + } + + lnet_net_lock(cpt); +} + void lnet_return_rx_credits_locked(lnet_msg_t *msg) { @@ -1058,27 +1092,41 @@ lnet_return_rx_credits_locked(lnet_msg_t *msg) rb = list_entry(msg->msg_kiov, lnet_rtrbuf_t, rb_kiov[0]); rbp = rb->rb_pool; - LASSERT(rbp == lnet_msg2bufpool(msg)); msg->msg_kiov = NULL; msg->msg_rtrcredit = 0; - LASSERT((rbp->rbp_credits < 0) == - !list_empty(&rbp->rbp_msgs)); + LASSERT(rbp == lnet_msg2bufpool(msg)); + LASSERT((rbp->rbp_credits > 0) == !list_empty(&rbp->rbp_bufs)); - list_add(&rb->rb_list, &rbp->rbp_bufs); - rbp->rbp_credits++; - if (rbp->rbp_credits <= 0) { - msg2 = list_entry(rbp->rbp_msgs.next, - lnet_msg_t, msg_list); - list_del(&msg2->msg_list); + /* + * If routing is now turned off, we just drop this buffer and + * don't bother trying to return credits. + */ + if (!the_lnet.ln_routing) { + lnet_destroy_rtrbuf(rb, rbp->rbp_npages); + goto routing_off; + } - (void) lnet_post_routed_recv_locked(msg2, 1); + /* + * It is possible that a user has lowered the desired number of + * buffers in this pool. Make sure we never put back + * more buffers than the stated number. + */ + if (rbp->rbp_credits >= rbp->rbp_nbuffers) { + /* Discard this buffer so we don't have too many. */ + lnet_destroy_rtrbuf(rb, rbp->rbp_npages); + } else { + list_add(&rb->rb_list, &rbp->rbp_bufs); + rbp->rbp_credits++; + if (rbp->rbp_credits <= 0) + lnet_schedule_blocked_locked(rbp); } } +routing_off: if (msg->msg_peerrtrcredit) { /* give back peer router credits */ msg->msg_peerrtrcredit = 0; @@ -1087,7 +1135,14 @@ lnet_return_rx_credits_locked(lnet_msg_t *msg) !list_empty(&rxpeer->lp_rtrq)); rxpeer->lp_rtrcredits++; - if (rxpeer->lp_rtrcredits <= 0) { + /* + * drop all messages which are queued to be routed on that + * peer. + */ + if (!the_lnet.ln_routing) { + lnet_drop_routed_msgs_locked(&rxpeer->lp_rtrq, + msg->msg_rx_cpt); + } else if (rxpeer->lp_rtrcredits <= 0) { msg2 = list_entry(rxpeer->lp_rtrq.next, lnet_msg_t, msg_list); list_del(&msg2->msg_list); @@ -1646,6 +1701,9 @@ lnet_parse_forward_locked(lnet_ni_t *ni, lnet_msg_t *msg) { int rc = 0; + if (!the_lnet.ln_routing) + return -ECANCELED; + if (msg->msg_rxpeer->lp_rtrcredits <= 0 || lnet_msg2bufpool(msg)->rbp_credits <= 0) { if (!ni->ni_lnd->lnd_eager_recv) { @@ -1799,9 +1857,8 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid, if (the_lnet.ln_routing && ni->ni_last_alive != ktime_get_real_seconds()) { - lnet_ni_lock(ni); - /* NB: so far here is the only place to set NI status to "up */ + lnet_ni_lock(ni); ni->ni_last_alive = ktime_get_real_seconds(); if (ni->ni_status && ni->ni_status->ns_status == LNET_NI_STATUS_DOWN) diff --git a/drivers/staging/lustre/lnet/lnet/router.c b/drivers/staging/lustre/lnet/lnet/router.c index 67566ca2db02..e3a661129163 100644 --- a/drivers/staging/lustre/lnet/lnet/router.c +++ b/drivers/staging/lustre/lnet/lnet/router.c @@ -28,8 +28,11 @@ #define LNET_NRB_TINY (LNET_NRB_TINY_MIN * 4) #define LNET_NRB_SMALL_MIN 4096 /* min value for each CPT */ #define LNET_NRB_SMALL (LNET_NRB_SMALL_MIN * 4) +#define LNET_NRB_SMALL_PAGES 1 #define LNET_NRB_LARGE_MIN 256 /* min value for each CPT */ #define LNET_NRB_LARGE (LNET_NRB_LARGE_MIN * 4) +#define LNET_NRB_LARGE_PAGES ((LNET_MTU + PAGE_CACHE_SIZE - 1) >> \ + PAGE_CACHE_SHIFT) static char *forwarding = ""; module_param(forwarding, charp, 0444); @@ -570,7 +573,8 @@ lnet_get_route(int idx, __u32 *net, __u32 *hops, *hops = route->lr_hops; *priority = route->lr_priority; *gateway = route->lr_gateway->lp_nid; - *alive = route->lr_gateway->lp_alive; + *alive = route->lr_gateway->lp_alive && + !route->lr_downis; lnet_net_unlock(cpt); return 0; } @@ -608,7 +612,7 @@ lnet_parse_rc_info(lnet_rc_data_t *rcd) { lnet_ping_info_t *info = rcd->rcd_pinginfo; struct lnet_peer *gw = rcd->rcd_gateway; - lnet_route_t *rtr; + lnet_route_t *rte; if (!gw->lp_alive) return; @@ -634,11 +638,16 @@ lnet_parse_rc_info(lnet_rc_data_t *rcd) if (!(gw->lp_ping_feats & LNET_PING_FEAT_NI_STATUS)) return; /* can't carry NI status info */ - list_for_each_entry(rtr, &gw->lp_routes, lr_gwlist) { + list_for_each_entry(rte, &gw->lp_routes, lr_gwlist) { int down = 0; int up = 0; int i; + if (gw->lp_ping_feats & LNET_PING_FEAT_RTE_DISABLED) { + rte->lr_downis = 1; + continue; + } + for (i = 0; i < info->pi_nnis && i < LNET_MAX_RTR_NIS; i++) { lnet_ni_status_t *stat = &info->pi_ni[i]; lnet_nid_t nid = stat->ns_nid; @@ -659,7 +668,7 @@ lnet_parse_rc_info(lnet_rc_data_t *rcd) } if (stat->ns_status == LNET_NI_STATUS_UP) { - if (LNET_NIDNET(nid) == rtr->lr_net) { + if (LNET_NIDNET(nid) == rte->lr_net) { up = 1; break; } @@ -673,10 +682,10 @@ lnet_parse_rc_info(lnet_rc_data_t *rcd) } if (up) { /* ignore downed NIs if NI for dest network is up */ - rtr->lr_downis = 0; + rte->lr_downis = 0; continue; } - rtr->lr_downis = down; + rte->lr_downis = down; } } @@ -1226,7 +1235,7 @@ lnet_router_checker(void *arg) return 0; } -static void +void lnet_destroy_rtrbuf(lnet_rtrbuf_t *rb, int npages) { int sz = offsetof(lnet_rtrbuf_t, rb_kiov[npages]); @@ -1273,67 +1282,103 @@ lnet_new_rtrbuf(lnet_rtrbufpool_t *rbp, int cpt) } static void -lnet_rtrpool_free_bufs(lnet_rtrbufpool_t *rbp) +lnet_rtrpool_free_bufs(lnet_rtrbufpool_t *rbp, int cpt) { int npages = rbp->rbp_npages; - int nbuffers = 0; + struct list_head tmp; lnet_rtrbuf_t *rb; if (!rbp->rbp_nbuffers) /* not initialized or already freed */ return; - LASSERT(list_empty(&rbp->rbp_msgs)); - LASSERT(rbp->rbp_credits == rbp->rbp_nbuffers); - - while (!list_empty(&rbp->rbp_bufs)) { - LASSERT(rbp->rbp_credits > 0); - - rb = list_entry(rbp->rbp_bufs.next, - lnet_rtrbuf_t, rb_list); - list_del(&rb->rb_list); - lnet_destroy_rtrbuf(rb, npages); - nbuffers++; - } - - LASSERT(rbp->rbp_nbuffers == nbuffers); - LASSERT(rbp->rbp_credits == nbuffers); + INIT_LIST_HEAD(&tmp); + lnet_net_lock(cpt); + lnet_drop_routed_msgs_locked(&rbp->rbp_msgs, cpt); + list_splice_init(&rbp->rbp_bufs, &tmp); rbp->rbp_nbuffers = 0; rbp->rbp_credits = 0; + rbp->rbp_mincredits = 0; + lnet_net_unlock(cpt); + + /* Free buffers on the free list. */ + while (!list_empty(&tmp)) { + rb = list_entry(tmp.next, lnet_rtrbuf_t, rb_list); + list_del(&rb->rb_list); + lnet_destroy_rtrbuf(rb, npages); + } } static int -lnet_rtrpool_alloc_bufs(lnet_rtrbufpool_t *rbp, int nbufs, int cpt) +lnet_rtrpool_adjust_bufs(lnet_rtrbufpool_t *rbp, int nbufs, int cpt) { + struct list_head rb_list; lnet_rtrbuf_t *rb; - int i; + int num_rb; + int num_buffers = 0; + int npages = rbp->rbp_npages; - if (rbp->rbp_nbuffers) { - LASSERT(rbp->rbp_nbuffers == nbufs); + /* + * If we are called for less buffers than already in the pool, we + * just lower the nbuffers number and excess buffers will be + * thrown away as they are returned to the free list. Credits + * then get adjusted as well. + */ + if (nbufs <= rbp->rbp_nbuffers) { + lnet_net_lock(cpt); + rbp->rbp_nbuffers = nbufs; + lnet_net_unlock(cpt); return 0; } - for (i = 0; i < nbufs; i++) { - rb = lnet_new_rtrbuf(rbp, cpt); + INIT_LIST_HEAD(&rb_list); + /* + * allocate the buffers on a local list first. If all buffers are + * allocated successfully then join this list to the rbp buffer + * list. If not then free all allocated buffers. + */ + num_rb = rbp->rbp_nbuffers; + + while (num_rb < nbufs) { + rb = lnet_new_rtrbuf(rbp, cpt); if (!rb) { - CERROR("Failed to allocate %d router bufs of %d pages\n", - nbufs, rbp->rbp_npages); - return -ENOMEM; + CERROR("Failed to allocate %d route bufs of %d pages\n", + nbufs, npages); + goto failed; } - rbp->rbp_nbuffers++; - rbp->rbp_credits++; - rbp->rbp_mincredits++; - list_add(&rb->rb_list, &rbp->rbp_bufs); - - /* No allocation "under fire" */ - /* Otherwise we'd need code to schedule blocked msgs etc */ - LASSERT(!the_lnet.ln_routing); + list_add(&rb->rb_list, &rb_list); + num_buffers++; + num_rb++; } - LASSERT(rbp->rbp_credits == nbufs); + lnet_net_lock(cpt); + + list_splice_tail(&rb_list, &rbp->rbp_bufs); + rbp->rbp_nbuffers += num_buffers; + rbp->rbp_credits += num_buffers; + rbp->rbp_mincredits = rbp->rbp_credits; + /* + * We need to schedule blocked msg using the newly + * added buffers. + */ + while (!list_empty(&rbp->rbp_bufs) && + !list_empty(&rbp->rbp_msgs)) + lnet_schedule_blocked_locked(rbp); + + lnet_net_unlock(cpt); + return 0; + +failed: + while (!list_empty(&rb_list)) { + rb = list_entry(rb_list.next, lnet_rtrbuf_t, rb_list); + list_del(&rb->rb_list); + lnet_destroy_rtrbuf(rb, npages); + } + + return -ENOMEM; } static void @@ -1348,7 +1393,7 @@ lnet_rtrpool_init(lnet_rtrbufpool_t *rbp, int npages) } void -lnet_rtrpools_free(void) +lnet_rtrpools_free(int keep_pools) { lnet_rtrbufpool_t *rtrp; int i; @@ -1357,17 +1402,19 @@ lnet_rtrpools_free(void) return; cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) { - lnet_rtrpool_free_bufs(&rtrp[0]); - lnet_rtrpool_free_bufs(&rtrp[1]); - lnet_rtrpool_free_bufs(&rtrp[2]); + lnet_rtrpool_free_bufs(&rtrp[LNET_TINY_BUF_IDX], i); + lnet_rtrpool_free_bufs(&rtrp[LNET_SMALL_BUF_IDX], i); + lnet_rtrpool_free_bufs(&rtrp[LNET_LARGE_BUF_IDX], i); } - cfs_percpt_free(the_lnet.ln_rtrpools); - the_lnet.ln_rtrpools = NULL; + if (!keep_pools) { + cfs_percpt_free(the_lnet.ln_rtrpools); + the_lnet.ln_rtrpools = NULL; + } } static int -lnet_nrb_tiny_calculate(int npages) +lnet_nrb_tiny_calculate(void) { int nrbs = LNET_NRB_TINY; @@ -1386,7 +1433,7 @@ lnet_nrb_tiny_calculate(int npages) } static int -lnet_nrb_small_calculate(int npages) +lnet_nrb_small_calculate(void) { int nrbs = LNET_NRB_SMALL; @@ -1405,7 +1452,7 @@ lnet_nrb_small_calculate(int npages) } static int -lnet_nrb_large_calculate(int npages) +lnet_nrb_large_calculate(void) { int nrbs = LNET_NRB_LARGE; @@ -1427,16 +1474,12 @@ int lnet_rtrpools_alloc(int im_a_router) { lnet_rtrbufpool_t *rtrp; - int large_pages; - int small_pages = 1; int nrb_tiny; int nrb_small; int nrb_large; int rc; int i; - large_pages = (LNET_MTU + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT; - if (!strcmp(forwarding, "")) { /* not set either way */ if (!im_a_router) @@ -1451,15 +1494,15 @@ lnet_rtrpools_alloc(int im_a_router) return -EINVAL; } - nrb_tiny = lnet_nrb_tiny_calculate(0); + nrb_tiny = lnet_nrb_tiny_calculate(); if (nrb_tiny < 0) return -EINVAL; - nrb_small = lnet_nrb_small_calculate(small_pages); + nrb_small = lnet_nrb_small_calculate(); if (nrb_small < 0) return -EINVAL; - nrb_large = lnet_nrb_large_calculate(large_pages); + nrb_large = lnet_nrb_large_calculate(); if (nrb_large < 0) return -EINVAL; @@ -1473,18 +1516,23 @@ lnet_rtrpools_alloc(int im_a_router) } cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) { - lnet_rtrpool_init(&rtrp[0], 0); - rc = lnet_rtrpool_alloc_bufs(&rtrp[0], nrb_tiny, i); + lnet_rtrpool_init(&rtrp[LNET_TINY_BUF_IDX], 0); + rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_TINY_BUF_IDX], + nrb_tiny, i); if (rc) goto failed; - lnet_rtrpool_init(&rtrp[1], small_pages); - rc = lnet_rtrpool_alloc_bufs(&rtrp[1], nrb_small, i); + lnet_rtrpool_init(&rtrp[LNET_SMALL_BUF_IDX], + LNET_NRB_SMALL_PAGES); + rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_SMALL_BUF_IDX], + nrb_small, i); if (rc) goto failed; - lnet_rtrpool_init(&rtrp[2], large_pages); - rc = lnet_rtrpool_alloc_bufs(&rtrp[2], nrb_large, i); + lnet_rtrpool_init(&rtrp[LNET_LARGE_BUF_IDX], + LNET_NRB_LARGE_PAGES); + rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_LARGE_BUF_IDX], + nrb_large, i); if (rc) goto failed; } @@ -1496,10 +1544,113 @@ lnet_rtrpools_alloc(int im_a_router) return 0; failed: - lnet_rtrpools_free(); + lnet_rtrpools_free(0); return rc; } +int +lnet_rtrpools_adjust(int tiny, int small, int large) +{ + int nrb = 0; + int rc = 0; + int i; + lnet_rtrbufpool_t *rtrp; + + /* + * this function doesn't revert the changes if adding new buffers + * failed. It's up to the user space caller to revert the + * changes. + */ + + if (!the_lnet.ln_routing) + return 0; + + /* + * If the provided values for each buffer pool are different than the + * configured values, we need to take action. + */ + if (tiny >= 0 && tiny != tiny_router_buffers) { + tiny_router_buffers = tiny; + nrb = lnet_nrb_tiny_calculate(); + cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) { + rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_TINY_BUF_IDX], + nrb, i); + if (rc) + return rc; + } + } + if (small >= 0 && small != small_router_buffers) { + small_router_buffers = small; + nrb = lnet_nrb_small_calculate(); + cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) { + rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_SMALL_BUF_IDX], + nrb, i); + if (rc) + return rc; + } + } + if (large >= 0 && large != large_router_buffers) { + large_router_buffers = large; + nrb = lnet_nrb_large_calculate(); + cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) { + rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_LARGE_BUF_IDX], + nrb, i); + if (rc) + return rc; + } + } + + return 0; +} + +int +lnet_rtrpools_enable(void) +{ + int rc; + + if (the_lnet.ln_routing) + return 0; + + if (!the_lnet.ln_rtrpools) + /* + * If routing is turned off, and we have never + * initialized the pools before, just call the + * standard buffer pool allocation routine as + * if we are just configuring this for the first + * time. + */ + return lnet_rtrpools_alloc(1); + + rc = lnet_rtrpools_adjust(0, 0, 0); + if (rc) + return rc; + + lnet_net_lock(LNET_LOCK_EX); + the_lnet.ln_routing = 1; + + the_lnet.ln_ping_info->pi_features &= ~LNET_PING_FEAT_RTE_DISABLED; + lnet_net_unlock(LNET_LOCK_EX); + + return 0; +} + +void +lnet_rtrpools_disable(void) +{ + if (!the_lnet.ln_routing) + return; + + lnet_net_lock(LNET_LOCK_EX); + the_lnet.ln_routing = 0; + the_lnet.ln_ping_info->pi_features |= LNET_PING_FEAT_RTE_DISABLED; + + tiny_router_buffers = 0; + small_router_buffers = 0; + large_router_buffers = 0; + lnet_net_unlock(LNET_LOCK_EX); + lnet_rtrpools_free(1); +} + int lnet_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive, unsigned long when) {