if (flags)
doutc(cl, "unused open flags: %x\n", flags);
return cpu_to_le32(wire_flags);
}
/* * Ceph file operations * * Implement basic open/close functionality, and implement * read/write. * * We implement three modes of file I/O: * - buffered uses the generic_file_aio_{read,write} helpers * * - synchronous is used when there is multi-client read/write * sharing, avoids the page cache, and synchronously waits for an * ack from the OSD. * * - direct io takes the variant of the sync path that references * user pages directly. * * fsync() flushes and waits on dirty pages, but just queues metadata * for writeback: since the MDS can recover size and mtime there is no * need to wait for MDS acknowledgement.
*/
/* * How many pages to get in one call to iov_iter_get_pages(). This * determines the size of the on-stack array used as a buffer.
*/ #define ITER_GET_BVECS_PAGES 64
/* * iov_iter_get_pages() only considers one iov_iter segment, no matter * what maxsize or maxpages are given. For ITER_BVEC that is a single * page. * * Attempt to get up to @maxsize bytes worth of pages from @iter. * Return the number of bytes in the created bio_vec array, or an error.
*/ static ssize_t iter_get_bvecs_alloc(struct iov_iter *iter, size_t maxsize, struct bio_vec **bvecs, int *num_bvecs)
{ struct bio_vec *bv;
size_t orig_count = iov_iter_count(iter);
ssize_t bytes; int npages;
/* * __iter_get_bvecs() may populate only part of the array -- zero it * out.
*/
bv = kvmalloc_array(npages, sizeof(*bv), GFP_KERNEL | __GFP_ZERO); if (!bv) return -ENOMEM;
bytes = __iter_get_bvecs(iter, maxsize, bv); if (bytes < 0) { /* * No pages were pinned -- just free the array.
*/
kvfree(bv); return bytes;
}
*bvecs = bv;
*num_bvecs = npages; return bytes;
}
staticvoid put_bvecs(struct bio_vec *bvecs, int num_bvecs, bool should_dirty)
{ int i;
for (i = 0; i < num_bvecs; i++) { if (bvecs[i].bv_page) { if (should_dirty)
set_page_dirty_lock(bvecs[i].bv_page);
put_page(bvecs[i].bv_page);
}
}
kvfree(bvecs);
}
/* * Prepare an open request. Preallocate ceph_cap to avoid an * inopportune ENOMEM later.
*/ staticstruct ceph_mds_request *
prepare_open_request(struct super_block *sb, int flags, int create_mode)
{ struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(sb); struct ceph_mds_request *req; int want_auth = USE_ANY_MDS; int op = (flags & O_CREAT) ? CEPH_MDS_OP_CREATE : CEPH_MDS_OP_OPEN;
if (flags & (O_WRONLY|O_RDWR|O_CREAT|O_TRUNC))
want_auth = USE_AUTH_MDS;
if ((file->f_mode & FMODE_WRITE) && ceph_has_inline_data(ci)) {
ret = ceph_uninline_data(file); if (ret < 0) goto error;
}
return 0;
error:
ceph_fscache_unuse_cookie(inode, file->f_mode & FMODE_WRITE);
ceph_put_fmode(ci, fi->fmode, 1);
kmem_cache_free(ceph_file_cachep, fi); /* wake up anyone waiting for caps on this inode */
wake_up_all(&ci->i_cap_wq); return ret;
}
/* * initialize private struct file data. * if we fail, clean up by dropping fmode reference on the ceph_inode
*/ staticint ceph_init_file(struct inode *inode, struct file *file, int fmode)
{ struct ceph_client *cl = ceph_inode_to_client(inode); int ret = 0;
switch (inode->i_mode & S_IFMT) { case S_IFREG:
ceph_fscache_use_cookie(inode, file->f_mode & FMODE_WRITE);
fallthrough; case S_IFDIR:
ret = ceph_init_file_info(inode, file, fmode,
S_ISDIR(inode->i_mode)); break;
default:
doutc(cl, "%p %llx.%llx %p 0%o (special)\n", inode,
ceph_vinop(inode), file, inode->i_mode); /* * we need to drop the open ref now, since we don't * have .release set to ceph_release.
*/
BUG_ON(inode->i_fop->release == ceph_release);
/* call the proper open fop */
ret = inode->i_fop->open(inode, file);
} return ret;
}
/* * try renew caps after session gets killed.
*/ int ceph_renew_caps(struct inode *inode, int fmode)
{ struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); struct ceph_client *cl = mdsc->fsc->client; struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_mds_request *req; int err, flags, wanted;
/* * If we already have the requisite capabilities, we can satisfy * the open request locally (no need to request new caps from the * MDS). We do, however, need to inform the MDS (asynchronously) * if our wanted caps set expands.
*/ int ceph_open(struct inode *inode, struct file *file)
{ struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_sb_to_fs_client(inode->i_sb); struct ceph_client *cl = fsc->client; struct ceph_mds_client *mdsc = fsc->mdsc; struct ceph_mds_request *req; struct ceph_file_info *fi = file->private_data; int err; int flags, fmode, wanted; struct dentry *dentry; char *path; bool do_sync = false; int mask = MAY_READ;
if (fi) {
doutc(cl, "file %p is already opened\n", file); return 0;
}
/* filter out O_CREAT|O_EXCL; vfs did that already. yuck. */
flags = file->f_flags & ~(O_CREAT|O_EXCL); if (S_ISDIR(inode->i_mode)) {
flags = O_DIRECTORY; /* mds likes to know */
} elseif (S_ISREG(inode->i_mode)) {
err = fscrypt_file_open(inode, file); if (err) return err;
}
/* For none EACCES cases will let the MDS do the mds auth check */ if (err == -EACCES) { return err;
} elseif (err < 0) {
do_sync = true;
err = 0;
}
}
/* snapped files are read-only */ if (ceph_snap(inode) != CEPH_NOSNAP && (file->f_mode & FMODE_WRITE)) return -EROFS;
/* trivially open snapdir */ if (ceph_snap(inode) == CEPH_SNAPDIR) { return ceph_init_file(inode, file, fmode);
}
/* * No need to block if we have caps on the auth MDS (for * write) or any MDS (for read). Update wanted set * asynchronously.
*/
spin_lock(&ci->i_ceph_lock); if (!do_sync && __ceph_is_any_real_caps(ci) &&
(((fmode & CEPH_FILE_MODE_WR) == 0) || ci->i_auth_cap)) { int mds_wanted = __ceph_caps_mds_wanted(ci, true); int issued = __ceph_caps_issued(ci, NULL);
/* Clone the layout from a synchronous create, if the dir now has Dc caps */ staticvoid
cache_file_layout(struct inode *dst, struct inode *src)
{ struct ceph_inode_info *cdst = ceph_inode(dst); struct ceph_inode_info *csrc = ceph_inode(src);
/* * Try to set up an async create. We need caps, a file layout, and inode number, * and either a lease on the dentry or complete dir info. If any of those * criteria are not satisfied, then return false and the caller can go * synchronous.
*/ staticint try_prep_async_create(struct inode *dir, struct dentry *dentry, struct ceph_file_layout *lo, u64 *pino)
{ struct ceph_inode_info *ci = ceph_inode(dir); struct ceph_dentry_info *di = ceph_dentry(dentry); int got = 0, want = CEPH_CAP_FILE_EXCL | CEPH_CAP_DIR_CREATE;
u64 ino;
spin_lock(&ci->i_ceph_lock); /* No auth cap means no chance for Dc caps */ if (!ci->i_auth_cap) goto no_async;
/* Any delegated inos? */ if (xa_empty(&ci->i_auth_cap->session->s_delegated_inos)) goto no_async;
if (!ceph_file_layout_is_valid(&ci->i_cached_layout)) goto no_async;
if ((__ceph_caps_issued(ci, NULL) & want) != want) goto no_async;
spin_lock(&ci->i_ceph_lock); if (ci->i_auth_cap)
s = ceph_get_mds_session(ci->i_auth_cap->session);
spin_unlock(&ci->i_ceph_lock); if (s) { int err = ceph_restore_deleg_ino(s, ino); if (err)
pr_warn_client(cl, "unable to restore delegated ino 0x%llx to session: %d\n",
ino, err);
ceph_put_mds_session(s);
}
}
ceph_file_layout_to_legacy(lo, &in.layout); /* lo is private, so pool_ns can't change */
pool_ns = rcu_dereference_raw(lo->pool_ns); if (pool_ns) {
iinfo.pool_ns_len = pool_ns->len;
iinfo.pool_ns_data = pool_ns->str;
}
down_read(&mdsc->snap_rwsem);
ret = ceph_fill_inode(inode, NULL, &iinfo, NULL, req->r_session,
req->r_fmode, NULL);
up_read(&mdsc->snap_rwsem); if (ret) {
doutc(cl, "failed to fill inode: %d\n", ret);
ceph_dir_clear_complete(dir); if (!d_unhashed(dentry))
d_drop(dentry);
discard_new_inode(inode);
} else { struct dentry *dn;
doutc(cl, "d_adding new inode 0x%llx to 0x%llx/%s\n",
vino.ino, ceph_ino(dir), dentry->d_name.name);
ceph_dir_clear_ordered(dir);
ceph_init_inode_acls(inode, as_ctx); if (inode->i_state & I_NEW) { /* * If it's not I_NEW, then someone created this before * we got here. Assume the server is aware of it at * that point and don't worry about setting * CEPH_I_ASYNC_CREATE.
*/
ceph_inode(inode)->i_ceph_flags = CEPH_I_ASYNC_CREATE;
unlock_new_inode(inode);
} if (d_in_lookup(dentry) || d_really_is_negative(dentry)) { if (!d_unhashed(dentry))
d_drop(dentry);
dn = d_splice_alias(inode, dentry);
WARN_ON_ONCE(dn && dn != dentry);
}
file->f_mode |= FMODE_CREATED;
ret = finish_open(file, dentry, ceph_open);
}
if (dentry->d_name.len > NAME_MAX) return -ENAMETOOLONG;
err = ceph_wait_on_conflict_unlink(dentry); if (err) return err; /* * Do not truncate the file, since atomic_open is called before the * permission check. The caller will do the truncation afterward.
*/
flags &= ~O_TRUNC;
/* * Completely synchronous read and write methods. Direct from __user * buffer to osd, or directly to user pages (if O_DIRECT). * * If the read spans object boundary, just do multiple reads. (That's not * atomic, but good enough for now.) * * If we get a short result from the OSD, check against i_size; we need to * only return a short read to the caller if we hit EOF.
*/
ssize_t __ceph_sync_read(struct inode *inode, loff_t *ki_pos, struct iov_iter *to, int *retry_op,
u64 *last_objver)
{ struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); struct ceph_client *cl = fsc->client; struct ceph_osd_client *osdc = &fsc->client->osdc;
ssize_t ret;
u64 off = *ki_pos;
u64 len = iov_iter_count(to);
u64 i_size = i_size_read(inode); bool sparse = IS_ENCRYPTED(inode) || ceph_test_mount_opt(fsc, SPARSEREAD);
u64 objver = 0;
if (!len || !i_size) return 0; /* * flush any page cache pages in this range. this * will make concurrent normal and sync io slow, * but it will at least behave sensibly when they are * in sequence.
*/
ret = filemap_write_and_wait_range(inode->i_mapping,
off, off + len - 1); if (ret < 0) return ret;
ret = 0; while ((len = iov_iter_count(to)) > 0) { struct ceph_osd_request *req; struct page **pages; int num_pages;
size_t page_off; bool more; int idx = 0;
size_t left; struct ceph_osd_req_op *op;
u64 read_off = off;
u64 read_len = len; int extent_cnt;
/* determine new offset/length if encrypted */
ceph_fscrypt_adjust_off_and_len(inode, &read_off, &read_len);
/* adjust len downward if the request truncated the len */ if (off + len > read_off + read_len)
len = read_off + read_len - off;
more = len < iov_iter_count(to);
op = &req->r_ops[0]; if (sparse) {
extent_cnt = __ceph_sparse_read_ext_count(inode, read_len);
ret = ceph_alloc_sparse_ext_map(op, extent_cnt); if (ret) {
ceph_osdc_put_request(req); break;
}
}
num_pages = calc_pages_for(read_off, read_len);
page_off = offset_in_page(off);
pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); if (IS_ERR(pages)) {
ceph_osdc_put_request(req);
ret = PTR_ERR(pages); break;
}
/* Fix it to go to end of extent map */ if (sparse && ret >= 0)
ret = ceph_sparse_ext_map_end(op); elseif (ret == -ENOENT)
ret = 0;
if (ret < 0) {
ceph_osdc_put_request(req); if (ret == -EBLOCKLISTED)
fsc->blocklisted = true; break;
}
if (IS_ENCRYPTED(inode)) { int fret;
fret = ceph_fscrypt_decrypt_extents(inode, pages,
read_off, op->extent.sparse_ext,
op->extent.sparse_ext_cnt); if (fret < 0) {
ret = fret;
ceph_osdc_put_request(req); break;
}
/* account for any partial block at the beginning */
fret -= (off - read_off);
/* * Short read after big offset adjustment? * Nothing is usable, just call it a zero * len read.
*/
fret = max(fret, 0);
/* account for partial block at the end */
ret = min_t(ssize_t, fret, len);
}
/* Short read but not EOF? Zero out the remainder. */ if (ret < len && (off + ret < i_size)) { int zlen = min(len - ret, i_size - off - ret); int zoff = page_off + ret;
doutc(cl, "zero gap %llu~%llu\n", off + ret,
off + ret + zlen);
ceph_zero_page_vector_range(zoff, zlen, pages);
ret += zlen;
}
if (off + ret > i_size)
left = (i_size > off) ? i_size - off : 0; else
left = ret;
while (left > 0) {
size_t plen, copied;
plen = min_t(size_t, left, PAGE_SIZE - page_off);
SetPageUptodate(pages[idx]);
copied = copy_page_to_iter(pages[idx++],
page_off, plen, to);
off += copied;
left -= copied;
page_off = 0; if (copied < plen) {
ret = -EFAULT; break;
}
}
ceph_osdc_put_request(req);
if (off >= i_size || !more) break;
}
if (ret > 0) { if (off >= i_size) {
*retry_op = CHECK_EOF;
ret = i_size - *ki_pos;
*ki_pos = i_size;
} else {
ret = off - *ki_pos;
*ki_pos = off;
}
/* r_start_latency == 0 means the request was not submitted */ if (req->r_start_latency) { if (aio_req->write)
ceph_update_write_metrics(metric, req->r_start_latency,
req->r_end_latency, len, rc); else
ceph_update_read_metrics(metric, req->r_start_latency,
req->r_end_latency, len, rc);
}
/* * To simplify error handling, allow AIO when IO within i_size * or IO can be satisfied by single OSD request.
*/ if (pos == iocb->ki_pos && !is_sync_kiocb(iocb) &&
(len == count || pos + count <= i_size_read(inode))) {
aio_req = kzalloc(sizeof(*aio_req), GFP_KERNEL); if (aio_req) {
aio_req->iocb = iocb;
aio_req->write = write;
aio_req->should_dirty = should_dirty;
INIT_LIST_HEAD(&aio_req->osd_reqs); if (write) {
aio_req->mtime = mtime;
swap(aio_req->prealloc_cf, *pcf);
}
} /* ignore error */
}
if (write) { /* * throw out any page cache pages in this range. this * may block.
*/
truncate_inode_pages_range(inode->i_mapping, pos,
PAGE_ALIGN(pos + len) - 1);
req->r_mtime = mtime;
}
if (aio_req) {
aio_req->total_len += len;
aio_req->num_reqs++;
atomic_inc(&aio_req->pending_reqs);
/* clamp the length to the end of first object */
ceph_calc_file_object_mapping(&ci->i_layout, write_pos,
write_len, &objnum, &objoff,
&xlen);
write_len = xlen;
/* adjust len downward if it goes beyond current object */ if (pos + len > write_pos + write_len)
len = write_pos + write_len - pos;
/* * If we had to adjust the length or position to align with a * crypto block, then we must do a read/modify/write cycle. We * use a version assertion to redrive the thing if something * changes in between.
*/
first = pos != write_pos;
last = (pos + len) != (write_pos + write_len);
rmw = first || last;
/* * The data is emplaced into the page as it would be if it were * in an array of pagecache pages.
*/
num_pages = calc_pages_for(write_pos, write_len);
pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); if (IS_ERR(pages)) {
ret = PTR_ERR(pages); break;
}
/* Do we need to preload the pages? */ if (rmw) {
u64 first_pos = write_pos;
u64 last_pos = (write_pos + write_len) - CEPH_FSCRYPT_BLOCK_SIZE;
u64 read_len = CEPH_FSCRYPT_BLOCK_SIZE; struct ceph_osd_req_op *op;
/* We should only need to do this for encrypted inodes */
WARN_ON_ONCE(!IS_ENCRYPTED(inode));
/* No need to do two reads if first and last blocks are same */ if (first && last_pos == first_pos)
last = false;
/* * Allocate a read request for one or two extents, * depending on how the request was aligned.
*/
req = ceph_osdc_new_request(osdc, &ci->i_layout,
ci->i_vino, first ? first_pos : last_pos,
&read_len, 0, (first && last) ? 2 : 1,
CEPH_OSD_OP_SPARSE_READ, CEPH_OSD_FLAG_READ,
NULL, ci->i_truncate_seq,
ci->i_truncate_size, false); if (IS_ERR(req)) {
ceph_release_page_vector(pages, num_pages);
ret = PTR_ERR(req); break;
}
/* Something is misaligned! */ if (read_len != CEPH_FSCRYPT_BLOCK_SIZE) {
ceph_osdc_put_request(req);
ceph_release_page_vector(pages, num_pages);
ret = -EIO; break;
}
/* Add extent for first block? */
op = &req->r_ops[0];
if (first) {
osd_req_op_extent_osd_data_pages(req, 0, pages,
CEPH_FSCRYPT_BLOCK_SIZE,
offset_in_page(first_pos), false, false); /* We only expect a single extent here */
ret = __ceph_alloc_sparse_ext_map(op, 1); if (ret) {
ceph_osdc_put_request(req);
ceph_release_page_vector(pages, num_pages); break;
}
}
/* Add extent for last block */ if (last) { /* Init the other extent if first extent has been used */ if (first) {
op = &req->r_ops[1];
osd_req_op_extent_init(req, 1,
CEPH_OSD_OP_SPARSE_READ,
last_pos, CEPH_FSCRYPT_BLOCK_SIZE,
ci->i_truncate_size,
ci->i_truncate_seq);
}
ret = __ceph_alloc_sparse_ext_map(op, 1); if (ret) {
ceph_osdc_put_request(req);
ceph_release_page_vector(pages, num_pages); break;
}
ceph_osdc_start_request(osdc, req);
ret = ceph_osdc_wait_request(osdc, req);
/* FIXME: length field is wrong if there are 2 extents */
ceph_update_read_metrics(&fsc->mdsc->metric,
req->r_start_latency,
req->r_end_latency,
read_len, ret);
/* Ok if object is not already present */ if (ret == -ENOENT) { /* * If there is no object, then we can't assert * on its version. Set it to 0, and we'll use an * exclusive create instead.
*/
ceph_osdc_put_request(req);
ret = 0;
/* * zero out the soon-to-be uncopied parts of the * first and last pages.
*/ if (first)
zero_user_segment(pages[0], 0,
offset_in_page(first_pos)); if (last)
zero_user_segment(pages[num_pages - 1],
offset_in_page(last_pos),
PAGE_SIZE);
} else { if (ret < 0) {
ceph_osdc_put_request(req);
ceph_release_page_vector(pages, num_pages); break;
}
op = &req->r_ops[0]; if (op->extent.sparse_ext_cnt == 0) { if (first)
zero_user_segment(pages[0], 0,
offset_in_page(first_pos)); else
zero_user_segment(pages[num_pages - 1],
offset_in_page(last_pos),
PAGE_SIZE);
} elseif (op->extent.sparse_ext_cnt != 1 ||
ceph_sparse_ext_map_end(op) !=
CEPH_FSCRYPT_BLOCK_SIZE) {
ret = -EIO;
ceph_osdc_put_request(req);
ceph_release_page_vector(pages, num_pages); break;
}
if (first && last) {
op = &req->r_ops[1]; if (op->extent.sparse_ext_cnt == 0) {
zero_user_segment(pages[num_pages - 1],
offset_in_page(last_pos),
PAGE_SIZE);
} elseif (op->extent.sparse_ext_cnt != 1 ||
ceph_sparse_ext_map_end(op) !=
CEPH_FSCRYPT_BLOCK_SIZE) {
ret = -EIO;
ceph_osdc_put_request(req);
ceph_release_page_vector(pages, num_pages); break;
}
}
/* Grab assert version. It must be non-zero. */
assert_ver = req->r_version;
WARN_ON_ONCE(ret > 0 && assert_ver == 0);
ceph_osdc_put_request(req); if (first) {
ret = ceph_fscrypt_decrypt_block_inplace(inode,
pages[0], CEPH_FSCRYPT_BLOCK_SIZE,
offset_in_page(first_pos),
first_pos >> CEPH_FSCRYPT_BLOCK_SHIFT); if (ret < 0) {
ceph_release_page_vector(pages, num_pages); break;
}
} if (last) {
ret = ceph_fscrypt_decrypt_block_inplace(inode,
pages[num_pages - 1],
CEPH_FSCRYPT_BLOCK_SIZE,
offset_in_page(last_pos),
last_pos >> CEPH_FSCRYPT_BLOCK_SHIFT); if (ret < 0) {
ceph_release_page_vector(pages, num_pages); break;
}
}
}
}
left = len;
off = offset_in_page(pos); for (n = 0; n < num_pages; n++) {
size_t plen = min_t(size_t, left, PAGE_SIZE - off);
/* copy the data */
ret = copy_page_from_iter(pages[n], off, plen, from); if (ret != plen) {
ret = -EFAULT; break;
}
off = 0;
left -= ret;
} if (ret < 0) {
doutc(cl, "write failed with %d\n", ret);
ceph_release_page_vector(pages, num_pages); break;
}
if (IS_ENCRYPTED(inode)) {
ret = ceph_fscrypt_encrypt_pages(inode, pages,
write_pos, write_len); if (ret < 0) {
doutc(cl, "encryption failed with %d\n", ret);
ceph_release_page_vector(pages, num_pages); break;
}
}
/* Set up the assertion */ if (rmw) { /* * Set up the assertion. If we don't have a version * number, then the object doesn't exist yet. Use an * exclusive create instead of a version assertion in * that case.
*/ if (assert_ver) {
osd_req_op_init(req, 0, CEPH_OSD_OP_ASSERT_VER, 0);
req->r_ops[0].assert_ver.ver = assert_ver;
} else {
osd_req_op_init(req, 0, CEPH_OSD_OP_CREATE,
CEPH_OSD_OP_FLAG_EXCL);
}
}
ceph_osdc_start_request(osdc, req);
ret = ceph_osdc_wait_request(osdc, req);
ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
req->r_end_latency, len, ret);
ceph_osdc_put_request(req); if (ret != 0) {
doutc(cl, "osd write returned %d\n", ret); /* Version changed! Must re-do the rmw cycle */ if ((assert_ver && (ret == -ERANGE || ret == -EOVERFLOW)) ||
(!assert_ver && ret == -EEXIST)) { /* We should only ever see this on a rmw */
WARN_ON_ONCE(!rmw);
/* The version should never go backward */
WARN_ON_ONCE(ret == -EOVERFLOW);
*from = saved_iter;
/* FIXME: limit number of times we loop? */ continue;
}
ceph_set_error_write(ci); break;
}
ceph_clear_error_write(ci);
/* * We successfully wrote to a range of the file. Declare * that region of the pagecache invalid.
*/
ret = invalidate_inode_pages2_range(
inode->i_mapping,
pos >> PAGE_SHIFT,
(pos + len - 1) >> PAGE_SHIFT); if (ret < 0) {
doutc(cl, "invalidate_inode_pages2_range returned %d\n",
ret);
ret = 0;
}
pos += len;
written += len;
doutc(cl, "written %d\n", written); if (pos > i_size_read(inode)) {
check_caps = ceph_inode_set_size(inode, pos); if (check_caps)
ceph_check_caps(ceph_inode(inode),
CHECK_CAPS_AUTHONLY);
}
}
if (ret != -EOLDSNAPC && written > 0) {
ret = written;
iocb->ki_pos = pos;
}
doutc(cl, "returning %d\n", ret); return ret;
}
/* * Wrap generic_file_aio_read with checks for cap bits on the inode. * Atomically grab references, so that those bits are not released * back to the MDS mid-read. * * Hmm, the sync read case isn't actually async... should it be?
*/ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
{ struct file *filp = iocb->ki_filp; struct ceph_file_info *fi = filp->private_data;
size_t len = iov_iter_count(to); struct inode *inode = file_inode(filp); struct ceph_inode_info *ci = ceph_inode(inode); bool direct_lock = iocb->ki_flags & IOCB_DIRECT; struct ceph_client *cl = ceph_inode_to_client(inode);
ssize_t ret; int want = 0, got = 0; int retry_op = 0, read = 0;
again:
doutc(cl, "%llu~%u trying to get caps on %p %llx.%llx\n",
iocb->ki_pos, (unsigned)len, inode, ceph_vinop(inode));
if (ceph_inode_is_shutdown(inode)) return -ESTALE;
if (direct_lock)
ceph_start_io_direct(inode); else
ceph_start_io_read(inode);
if (!(fi->flags & CEPH_F_SYNC) && !direct_lock)
want |= CEPH_CAP_FILE_CACHE; if (fi->fmode & CEPH_FILE_MODE_LAZY)
want |= CEPH_CAP_FILE_LAZYIO;
ret = ceph_get_caps(filp, CEPH_CAP_FILE_RD, want, -1, &got); if (ret < 0) { if (direct_lock)
ceph_end_io_direct(inode); else
ceph_end_io_read(inode); return ret;
}
/* * Wrap filemap_splice_read with checks for cap bits on the inode. * Atomically grab references, so that those bits are not released * back to the MDS mid-read.
*/ static ssize_t ceph_splice_read(struct file *in, loff_t *ppos, struct pipe_inode_info *pipe,
size_t len, unsignedint flags)
{ struct ceph_file_info *fi = in->private_data; struct inode *inode = file_inode(in); struct ceph_inode_info *ci = ceph_inode(inode);
ssize_t ret; int want = 0, got = 0;
CEPH_DEFINE_RW_CONTEXT(rw_ctx, 0);
dout("splice_read %p %llx.%llx %llu~%zu trying to get caps on %p\n",
inode, ceph_vinop(inode), *ppos, len, inode);
if (ceph_inode_is_shutdown(inode)) return -ESTALE;
/* we might need to revert back to that point */
data = *from; if ((iocb->ki_flags & IOCB_DIRECT) && !IS_ENCRYPTED(inode))
written = ceph_direct_read_write(iocb, &data, snapc,
&prealloc_cf); else
written = ceph_sync_write(iocb, &data, pos, snapc); if (direct_lock)
ceph_end_io_direct(inode); else
ceph_end_io_write(inode); if (written > 0)
iov_iter_advance(from, written);
ceph_put_snap_context(snapc);
} else { /*
--> --------------------
--> maximum size reached
--> --------------------
Messung V0.5
¤ Dauer der Verarbeitung: 0.22 Sekunden
(vorverarbeitet)
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.