[PATCH] ocfs2: fix hang in dlm lock resource mastery

fixes hangs in lock mastery related to refcounting on the mle structure

Signed-off-by: Kurt Hackel <kurt.hackel@oracle.com>
Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
This commit is contained in:
Kurt Hackel 2006-03-02 18:09:26 -08:00 committed by Mark Fasheh
parent a74e1f0e8a
commit 9c6510a5bf

View File

@ -792,7 +792,15 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
mlog_errno(ret);
if (mle->master != O2NM_MAX_NODES) {
/* found a master ! */
if (mle->master <= nodenum)
break;
/* if our master request has not reached the master
* yet, keep going until it does. this is how the
* master will know that asserts are needed back to
* the lower nodes. */
mlog(0, "%s:%.*s: requests only up to %u but master "
"is %u, keep going\n", dlm->name, namelen,
lockid, nodenum, mle->master);
}
}
@ -860,7 +868,19 @@ static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
/* check if another node has already become the owner */
spin_lock(&res->spinlock);
if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
res->lockname.len, res->lockname.name, res->owner);
spin_unlock(&res->spinlock);
/* this will cause the master to re-assert across
* the whole cluster, freeing up mles */
ret = dlm_do_master_request(mle, res->owner);
if (ret < 0) {
/* give recovery a chance to run */
mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
msleep(500);
goto recheck;
}
ret = 0;
goto leave;
}
spin_unlock(&res->spinlock);
@ -1244,13 +1264,14 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
{
u8 response = DLM_MASTER_RESP_MAYBE;
struct dlm_ctxt *dlm = data;
struct dlm_lock_resource *res;
struct dlm_lock_resource *res = NULL;
struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
char *name;
unsigned int namelen;
int found, ret;
int set_maybe;
int dispatch_assert = 0;
if (!dlm_grab(dlm))
return DLM_MASTER_RESP_NO;
@ -1287,7 +1308,6 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
}
if (res->owner == dlm->node_num) {
u32 flags = DLM_ASSERT_MASTER_MLE_CLEANUP;
spin_unlock(&res->spinlock);
// mlog(0, "this node is the master\n");
response = DLM_MASTER_RESP_YES;
@ -1300,16 +1320,7 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
* caused all nodes up to this one to
* create mles. this node now needs to
* go back and clean those up. */
mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
dlm->node_num, res->lockname.len, res->lockname.name);
ret = dlm_dispatch_assert_master(dlm, res, 1,
request->node_idx,
flags);
if (ret < 0) {
mlog(ML_ERROR, "failed to dispatch assert "
"master work\n");
response = DLM_MASTER_RESP_ERROR;
}
dispatch_assert = 1;
goto send_response;
} else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
spin_unlock(&res->spinlock);
@ -1357,9 +1368,13 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
}
} else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
set_maybe = 0;
if (tmpmle->master == dlm->node_num)
if (tmpmle->master == dlm->node_num) {
response = DLM_MASTER_RESP_YES;
else
/* this node will be the owner.
* go back and clean the mles on any
* other nodes */
dispatch_assert = 1;
} else
response = DLM_MASTER_RESP_NO;
} else {
// mlog(0, "this node is attempting to "
@ -1398,8 +1413,8 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
mle = (struct dlm_master_list_entry *)
kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL);
if (!mle) {
// bad bad bad... this sucks.
response = DLM_MASTER_RESP_ERROR;
mlog_errno(-ENOMEM);
goto send_response;
}
spin_lock(&dlm->spinlock);
@ -1418,25 +1433,19 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
// mlog(0, "mle was found\n");
set_maybe = 1;
spin_lock(&tmpmle->spinlock);
if (tmpmle->master == dlm->node_num) {
mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");
BUG();
}
if (tmpmle->type == DLM_MLE_BLOCK)
response = DLM_MASTER_RESP_NO;
else if (tmpmle->type == DLM_MLE_MIGRATION) {
mlog(0, "migration mle was found (%u->%u)\n",
tmpmle->master, tmpmle->new_master);
if (tmpmle->master == dlm->node_num) {
mlog(ML_ERROR, "no lockres, but migration mle "
"says that this node is master!\n");
BUG();
}
/* real master can respond on its own */
response = DLM_MASTER_RESP_NO;
} else {
if (tmpmle->master == dlm->node_num) {
response = DLM_MASTER_RESP_YES;
set_maybe = 0;
} else
response = DLM_MASTER_RESP_MAYBE;
}
if (set_maybe)
set_bit(request->node_idx, tmpmle->maybe_map);
spin_unlock(&tmpmle->spinlock);
@ -1449,6 +1458,24 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
dlm_put_mle(tmpmle);
}
send_response:
if (dispatch_assert) {
if (response != DLM_MASTER_RESP_YES)
mlog(ML_ERROR, "invalid response %d\n", response);
if (!res) {
mlog(ML_ERROR, "bad lockres while trying to assert!\n");
BUG();
}
mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
dlm->node_num, res->lockname.len, res->lockname.name);
ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx,
DLM_ASSERT_MASTER_MLE_CLEANUP);
if (ret < 0) {
mlog(ML_ERROR, "failed to dispatch assert master work\n");
response = DLM_MASTER_RESP_ERROR;
}
}
dlm_put(dlm);
return response;
}
@ -1471,8 +1498,11 @@ static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
int to, tmpret;
struct dlm_node_iter iter;
int ret = 0;
int reassert;
BUG_ON(namelen > O2NM_MAX_NAME_LEN);
again:
reassert = 0;
/* note that if this nodemap is empty, it returns 0 */
dlm_node_iter_init(nodemap, &iter);
@ -1504,9 +1534,17 @@ static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
"got %d.\n", namelen, lockname, to, r);
dlm_dump_lock_resources(dlm);
BUG();
} else if (r == EAGAIN) {
mlog(0, "%.*s: node %u create mles on other "
"nodes and requests a re-assert\n",
namelen, lockname, to);
reassert = 1;
}
}
if (reassert)
goto again;
return ret;
}
@ -1528,6 +1566,8 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
char *name;
unsigned int namelen;
u32 flags;
int master_request = 0;
int ret = 0;
if (!dlm_grab(dlm))
return 0;
@ -1642,11 +1682,22 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
// mlog(0, "woo! got an assert_master from node %u!\n",
// assert->node_idx);
if (mle) {
int extra_ref;
int extra_ref = 0;
int nn = -1;
spin_lock(&mle->spinlock);
extra_ref = !!(mle->type == DLM_MLE_BLOCK
|| mle->type == DLM_MLE_MIGRATION);
if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
extra_ref = 1;
else {
/* MASTER mle: if any bits set in the response map
* then the calling node needs to re-assert to clear
* up nodes that this node contacted */
while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES,
nn+1)) < O2NM_MAX_NODES) {
if (nn != dlm->node_num && nn != assert->node_idx)
master_request = 1;
}
}
mle->master = assert->node_idx;
atomic_set(&mle->woken, 1);
wake_up(&mle->wq);
@ -1677,10 +1728,15 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
}
done:
ret = 0;
if (res)
dlm_lockres_put(res);
dlm_put(dlm);
return 0;
if (master_request) {
mlog(0, "need to tell master to reassert\n");
ret = EAGAIN; // positive. negative would shoot down the node.
}
return ret;
kill:
/* kill the caller! */
@ -1713,6 +1769,10 @@ int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
item->u.am.request_from = request_from;
item->u.am.flags = flags;
if (ignore_higher)
mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len,
res->lockname.name);
spin_lock(&dlm->work_lock);
list_add_tail(&item->list, &dlm->work_list);
spin_unlock(&dlm->work_lock);