rbd: define rbd_osd_req_format_op()

Define rbd_osd_req_format_op(), which encapsulates formatting
an osd op into an object request's osd request message.  Only
one op is supported right now.

Stop calling ceph_osdc_build_request() in rbd_osd_req_create().
Instead, call rbd_osd_req_format_op() in each of the callers of
rbd_osd_req_create().

This is to prepare for the next patch, in which the source ops for
an osd request will be held in the osd request itself.  Because of
that, we won't have the source op to work with until after the
request is created, so we can't format the op until then.

This an the next patch resolve:
    http://tracker.ceph.com/issues/4656

Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
This commit is contained in:
Alex Elder 2013-04-03 21:32:51 -05:00 committed by Sage Weil
parent 87060c1089
commit 430c28c3cb

View File

@ -1311,29 +1311,47 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
rbd_obj_request_complete(obj_request);
}
static void rbd_osd_req_format_op(struct rbd_obj_request *obj_request,
bool write_request,
struct ceph_osd_req_op *op)
{
struct rbd_img_request *img_request = obj_request->img_request;
struct ceph_snap_context *snapc = NULL;
u64 snap_id = CEPH_NOSNAP;
struct timespec *mtime = NULL;
struct timespec now;
rbd_assert(obj_request->osd_req != NULL);
if (write_request) {
now = CURRENT_TIME;
mtime = &now;
if (img_request)
snapc = img_request->snapc;
} else if (img_request) {
snap_id = img_request->snap_id;
}
ceph_osdc_build_request(obj_request->osd_req, obj_request->offset,
1, op, snapc, snap_id, mtime);
}
static struct ceph_osd_request *rbd_osd_req_create(
struct rbd_device *rbd_dev,
bool write_request,
struct rbd_obj_request *obj_request,
struct ceph_osd_req_op *op)
struct rbd_obj_request *obj_request)
{
struct rbd_img_request *img_request = obj_request->img_request;
struct ceph_snap_context *snapc = NULL;
struct ceph_osd_client *osdc;
struct ceph_osd_request *osd_req;
struct ceph_osd_data *osd_data;
struct timespec now;
struct timespec *mtime;
u64 snap_id = CEPH_NOSNAP;
u64 offset = obj_request->offset;
u64 length = obj_request->length;
if (img_request) {
rbd_assert(img_request->write_request == write_request);
if (img_request->write_request)
snapc = img_request->snapc;
else
snap_id = img_request->snap_id;
}
/* Allocate and initialize the request, for the single op */
@ -1360,16 +1378,10 @@ static struct ceph_osd_request *rbd_osd_req_create(
break;
}
if (write_request) {
if (write_request)
osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
now = CURRENT_TIME;
mtime = &now;
} else {
else
osd_req->r_flags = CEPH_OSD_FLAG_READ;
mtime = NULL; /* not needed for reads */
offset = 0; /* These are not used... */
length = 0; /* ...for osd read requests */
}
osd_req->r_callback = rbd_osd_req_callback;
osd_req->r_priv = obj_request;
@ -1380,11 +1392,6 @@ static struct ceph_osd_request *rbd_osd_req_create(
osd_req->r_file_layout = rbd_dev->layout; /* struct */
/* osd_req will get its own reference to snapc (if non-null) */
ceph_osdc_build_request(osd_req, offset, 1, op,
snapc, snap_id, mtime);
return osd_req;
}
@ -1538,6 +1545,7 @@ static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
struct rbd_device *rbd_dev = img_request->rbd_dev;
struct rbd_obj_request *obj_request = NULL;
struct rbd_obj_request *next_obj_request;
bool write_request = img_request->write_request;
unsigned int bio_offset;
u64 image_offset;
u64 resid;
@ -1545,8 +1553,7 @@ static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
opcode = img_request->write_request ? CEPH_OSD_OP_WRITE
: CEPH_OSD_OP_READ;
opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
bio_offset = 0;
image_offset = img_request->offset;
rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
@ -1579,17 +1586,14 @@ static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
if (!obj_request->bio_list)
goto out_partial;
/*
* Build up the op to use in building the osd
* request. Note that the contents of the op are
* copied by rbd_osd_req_create().
*/
osd_req_op_extent_init(&op, opcode, offset, length, 0, 0);
obj_request->osd_req = rbd_osd_req_create(rbd_dev,
img_request->write_request,
obj_request, &op);
write_request, obj_request);
if (!obj_request->osd_req)
goto out_partial;
osd_req_op_extent_init(&op, opcode, offset, length, 0, 0);
rbd_osd_req_format_op(obj_request, write_request, &op);
/* status and version are initially zero-filled */
rbd_img_obj_request_add(img_request, obj_request);
@ -1700,12 +1704,13 @@ static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
return -ENOMEM;
ret = -ENOMEM;
osd_req_op_watch_init(&op, CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver, 0);
obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
obj_request, &op);
obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
if (!obj_request->osd_req)
goto out;
osd_req_op_watch_init(&op, CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver, 0);
rbd_osd_req_format_op(obj_request, false, &op);
osdc = &rbd_dev->rbd_client->client->osdc;
obj_request->callback = rbd_obj_request_put;
ret = rbd_obj_request_submit(osdc, obj_request);
@ -1764,13 +1769,14 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
if (!obj_request)
goto out_cancel;
obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
if (!obj_request->osd_req)
goto out_cancel;
osd_req_op_watch_init(&op, CEPH_OSD_OP_WATCH,
rbd_dev->watch_event->cookie,
rbd_dev->header.obj_version, start);
obj_request->osd_req = rbd_osd_req_create(rbd_dev, true,
obj_request, &op);
if (!obj_request->osd_req)
goto out_cancel;
rbd_osd_req_format_op(obj_request, true, &op);
if (start)
ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
@ -1856,13 +1862,14 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
obj_request->pages = pages;
obj_request->page_count = page_count;
osd_req_op_cls_init(&op, CEPH_OSD_OP_CALL, class_name, method_name,
outbound, outbound_size);
obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
obj_request, &op);
obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
if (!obj_request->osd_req)
goto out;
osd_req_op_cls_init(&op, CEPH_OSD_OP_CALL, class_name, method_name,
outbound, outbound_size);
rbd_osd_req_format_op(obj_request, false, &op);
osdc = &rbd_dev->rbd_client->client->osdc;
ret = rbd_obj_request_submit(osdc, obj_request);
if (ret)
@ -2061,12 +2068,13 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
obj_request->pages = pages;
obj_request->page_count = page_count;
osd_req_op_extent_init(&op, CEPH_OSD_OP_READ, offset, length, 0, 0);
obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
obj_request, &op);
obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
if (!obj_request->osd_req)
goto out;
osd_req_op_extent_init(&op, CEPH_OSD_OP_READ, offset, length, 0, 0);
rbd_osd_req_format_op(obj_request, false, &op);
osdc = &rbd_dev->rbd_client->client->osdc;
ret = rbd_obj_request_submit(osdc, obj_request);
if (ret)