/* * Ceph address space ops. * * There are a few funny things going on here. * * The page->private field is used to reference a struct * ceph_snap_context for _every_ dirty page. This indicates which * snapshot the page was logically dirtied in, and thus which snap * context needs to be associated with the osd write during writeback. * * Similarly, struct ceph_inode_info maintains a set of counters to * count dirty pages on the inode. In the absence of snapshots, * i_wrbuffer_ref == i_wrbuffer_ref_head == the dirty page count. * * When a snapshot is taken (that is, when the client receives * notification that a snapshot was taken), each inode with caps and * with dirty pages (dirty pages implies there is a cap) gets a new * ceph_cap_snap in the i_cap_snaps list (which is sorted in ascending * order, new snaps go to the tail). The i_wrbuffer_ref_head count is * moved to capsnap->dirty. (Unless a sync write is currently in * progress. In that case, the capsnap is said to be "pending", new * writes cannot start, and the capsnap isn't "finalized" until the * write completes (or fails) and a final size/mtime for the inode for * that snap can be settled upon.) i_wrbuffer_ref_head is reset to 0. * * On writeback, we must submit writes to the osd IN SNAP ORDER. So, * we look for the first capsnap in i_cap_snaps and write out pages in * that snap context _only_. Then we move on to the next capsnap, * eventually reaching the "live" or "head" context (i.e., pages that * are not yet snapped) and are writing the most recently dirtied * pages. * * Invalidate and so forth must take care to ensure the dirty page * accounting is preserved.
*/
/* * Reference snap context in folio->private. Also set * PagePrivate so that we get invalidate_folio callback.
*/
VM_WARN_ON_FOLIO(folio->private, folio);
folio_attach_private(folio, snapc);
/* * If we are truncating the full folio (i.e. offset == 0), adjust the * dirty folio counters appropriately. Only called if there is private * data on the folio.
*/ staticvoid ceph_invalidate_folio(struct folio *folio, size_t offset,
size_t length)
{ struct inode *inode = folio->mapping->host; struct ceph_client *cl = ceph_inode_to_client(inode); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_snap_context *snapc;
if (priv) { /* Readahead is disabled by posix_fadvise POSIX_FADV_RANDOM */ if (priv->file_ra_disabled)
max_pages = 0; else
max_pages = priv->file_ra_pages;
}
/* Readahead is disabled */ if (!max_pages) return;
max_len = max_pages << PAGE_SHIFT;
/* * Try to expand the length forward by rounding up it to the next * block, but do not exceed the file size, unless the original * request already exceeds it.
*/
new_end = umin(round_up(end, lo->stripe_unit), rreq->i_size); if (new_end > end && new_end <= rreq->start + max_len)
rreq->len = new_end - rreq->start;
/* Try to expand the start downward */
div_u64_rem(rreq->start, lo->stripe_unit, &blockoff); if (rreq->len + blockoff <= max_len) {
rreq->start -= blockoff;
rreq->len += blockoff;
}
}
/* Truncate the extent at the end of the current block */
ceph_calc_file_object_mapping(&ci->i_layout, subreq->start, subreq->len,
&objno, &objoff, &xlen);
rreq->io_streams[0].sreq_max_len = umin(xlen, fsc->mount_options->rsize); return 0;
}
if (ceph_inode_is_shutdown(inode)) {
err = -EIO; goto out;
}
if (ceph_has_inline_data(ci) && ceph_netfs_issue_op_inline(subreq)) return;
// TODO: This rounding here is slightly dodgy. It *should* work, for // now, as the cache only deals in blocks that are a multiple of // PAGE_SIZE and fscrypt blocks are at most PAGE_SIZE. What needs to // happen is for the fscrypt driving to be moved into netfslib and the // data in the cache also to be stored encrypted.
len = subreq->len;
ceph_fscrypt_adjust_off_and_len(inode, &off, &len);
/* * FIXME: For now, use CEPH_OSD_DATA_TYPE_PAGES instead of _ITER for * encrypted inodes. We'd need infrastructure that handles an iov_iter * instead of page arrays, and we don't have that as of yet. Once the * dust settles on the write helpers and encrypt/decrypt routines for * netfs, we should be able to rework this.
*/ if (IS_ENCRYPTED(inode)) { struct page **pages;
size_t page_off;
/* * FIXME: io_iter.count needs to be corrected to aligned * length. Otherwise, iov_iter_get_pages_alloc2() operates * with the initial unaligned length value. As a result, * ceph_msg_data_cursor_init() triggers BUG_ON() in the case * if msg->sparse_read_total > msg->data_length.
*/
subreq->io_iter.count = len;
/* * Get ref for the oldest snapc for an inode with dirty data... that is, the * only snap context we are allowed to write back.
*/ staticstruct ceph_snap_context *
get_oldest_context(struct inode *inode, struct ceph_writeback_ctl *ctl, struct ceph_snap_context *page_snapc)
{ struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_client *cl = ceph_inode_to_client(inode); struct ceph_snap_context *snapc = NULL; struct ceph_cap_snap *capsnap = NULL;
/* verify this is a writeable snap context */
snapc = page_snap_context(&folio->page); if (!snapc) {
doutc(cl, "%llx.%llx folio %p not dirty?\n", ceph_vinop(inode),
folio); return 0;
}
oldest = get_oldest_context(inode, &ceph_wbc, snapc); if (snapc->seq > oldest->seq) {
doutc(cl, "%llx.%llx folio %p snapc %p not writeable - noop\n",
ceph_vinop(inode), folio, snapc); /* we should only noop if called by kswapd */
WARN_ON(!(current->flags & PF_MEMALLOC));
ceph_put_snap_context(oldest);
folio_redirty_for_writepage(wbc, folio); return 0;
}
ceph_put_snap_context(oldest);
/* is this a partial page at end of file? */ if (page_off >= ceph_wbc.i_size) {
doutc(cl, "%llx.%llx folio at %lu beyond eof %llu\n",
ceph_vinop(inode), folio->index, ceph_wbc.i_size);
folio_invalidate(folio, 0, folio_size(folio)); return 0;
}
if (ceph_wbc.i_size < page_off + len)
len = ceph_wbc.i_size - page_off;
/* * We lost the cache cap, need to truncate the page before * it is unlocked, otherwise we'd truncate it later in the * page truncation thread, possibly losing some data that * raced its way in
*/
remove_page = !(ceph_caps_issued(ci) &
(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
/* clean all pages */ for (i = 0; i < req->r_num_ops; i++) { if (req->r_ops[i].op != CEPH_OSD_OP_WRITE) {
pr_warn_client(cl, "%llx.%llx incorrect op %d req %p index %d tid %llu\n",
ceph_vinop(inode), req->r_ops[i].op, req, i,
req->r_tid); break;
}
/* find oldest snap context with dirty data */
ceph_wbc->snapc = get_oldest_context(inode, ceph_wbc, NULL); if (!ceph_wbc->snapc) { /* hmm, why does writepages get called when there
is no dirty data? */
doutc(cl, " no snap context with dirty data?\n"); return -ENODATA;
}
if (ceph_wbc->head_snapc && ceph_wbc->snapc != ceph_wbc->last_snapc) { /* where to start/end? */ if (wbc->range_cyclic) {
ceph_wbc->index = ceph_wbc->start_index;
ceph_wbc->end = -1; if (ceph_wbc->index > 0)
ceph_wbc->should_loop = true;
doutc(cl, " cyclic, start at %lu\n", ceph_wbc->index);
} else {
ceph_wbc->index = wbc->range_start >> PAGE_SHIFT;
ceph_wbc->end = wbc->range_end >> PAGE_SHIFT; if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
ceph_wbc->range_whole = true;
doutc(cl, " not cyclic, %lu to %lu\n",
ceph_wbc->index, ceph_wbc->end);
}
} elseif (!ceph_wbc->head_snapc) { /* Do not respect wbc->range_{start,end}. Dirty pages * in that range can be associated with newer snapc. * They are not writeable until we write all dirty pages
* associated with 'snapc' get written */ if (ceph_wbc->index > 0)
ceph_wbc->should_loop = true;
doutc(cl, " non-head snapc, range whole\n");
}
/* * We have something to write. If this is * the first locked page this time through, * calculate max possible write size and * allocate a page array
*/ if (ceph_wbc->locked_pages == 0) {
ceph_allocate_page_array(mapping, ceph_wbc, folio);
} elseif (!is_folio_index_contiguous(ceph_wbc, folio)) { if (is_num_ops_too_big(ceph_wbc)) {
folio_redirty_for_writepage(wbc, folio);
folio_unlock(folio); break;
}
if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) { for (i = 0; i < folio_batch_count(&ceph_wbc->fbatch); i++) { struct folio *folio = ceph_wbc->fbatch.folios[i];
/* Format the osd request message and submit the write */
len = 0;
ceph_wbc->data_pages = ceph_wbc->pages;
ceph_wbc->op_idx = 0; for (i = 0; i < ceph_wbc->locked_pages; i++) {
u64 cur_offset;
/* * Discontinuity in page range? Ceph can handle that by just passing * multiple extents in the write op.
*/ if (offset + len != cur_offset) { /* If it's full, stop here */ if (ceph_wbc->op_idx + 1 == req->r_num_ops) break;
/* Kick off an fscache write with what we have so far. */
ceph_fscache_write_to_cache(inode, offset, len, caching);
/* Start a new extent */
osd_req_op_extent_dup_last(req, ceph_wbc->op_idx,
cur_offset - offset);
doutc(cl, "got pages at %llu~%llu\n", offset, len);
if (ceph_wbc->size_stable) {
len = min(len, ceph_wbc->i_size - offset);
} elseif (i == ceph_wbc->locked_pages) { /* writepages_finish() clears writeback pages * according to the data length, so make sure
* data length covers all locked pages */
u64 min_len = len + 1 - thp_size(page);
len = get_writepages_data_length(inode,
ceph_wbc->pages[i - 1],
offset);
len = max(len, min_len);
}
if (IS_ENCRYPTED(inode))
len = round_up(len, CEPH_FSCRYPT_BLOCK_SIZE);
doutc(cl, "got pages at %llu~%llu\n", offset, len);
if (is_forced_umount(mapping)) { /* we're in a forced umount, don't write! */ return -EIO;
}
ceph_init_writeback_ctl(mapping, wbc, &ceph_wbc);
if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) {
rc = -EIO; goto out;
}
retry:
rc = ceph_define_writeback_range(mapping, wbc, &ceph_wbc); if (rc == -ENODATA) { /* hmm, why does writepages get called when there
is no dirty data? */
rc = 0; goto dec_osd_stopping_blocker;
}
if (wbc->sync_mode == WB_SYNC_ALL || wbc->tagged_writepages)
tag_pages_for_writeback(mapping, ceph_wbc.index, ceph_wbc.end);
/* * We stop writing back only if we are not doing * integrity sync. In case of integrity sync we have to * keep going until we have written all the pages * we tagged for writeback prior to entering this loop.
*/ if (wbc->nr_to_write <= 0 && wbc->sync_mode == WB_SYNC_NONE)
ceph_wbc.done = true;
if (ceph_wbc.should_loop && !ceph_wbc.done) { /* more to do; loop back to beginning of file */
doutc(cl, "looping back to beginning of file\n"); /* OK even when start_index == 0 */
ceph_wbc.end = ceph_wbc.start_index - 1;
/* to write dirty pages associated with next snapc,
* we need to wait until current writes complete */
ceph_wait_until_current_writes_complete(mapping, wbc, &ceph_wbc);
/* * See if a given @snapc is either writeable, or already written.
*/ staticint context_is_writeable_or_written(struct inode *inode, struct ceph_snap_context *snapc)
{ struct ceph_snap_context *oldest = get_oldest_context(inode, NULL, NULL); int ret = !oldest || snapc->seq <= oldest->seq;
ceph_put_snap_context(oldest); return ret;
}
/** * ceph_find_incompatible - find an incompatible context and return it * @folio: folio being dirtied * * We are only allowed to write into/dirty a folio if the folio is * clean, or already dirty within the same snap context. Returns a * conflicting context if there is one, NULL if there isn't, or a * negative error code on other errors. * * Must be called with folio lock held.
*/ staticstruct ceph_snap_context *
ceph_find_incompatible(struct folio *folio)
{ struct inode *inode = folio->mapping->host; struct ceph_client *cl = ceph_inode_to_client(inode); struct ceph_inode_info *ci = ceph_inode(inode);
if (ceph_inode_is_shutdown(inode)) {
doutc(cl, " %llx.%llx folio %p is shutdown\n",
ceph_vinop(inode), folio); return ERR_PTR(-ESTALE);
}
for (;;) { struct ceph_snap_context *snapc, *oldest;
folio_wait_writeback(folio);
snapc = page_snap_context(&folio->page); if (!snapc || snapc == ci->i_head_snapc) break;
/* * this folio is already dirty in another (older) snap * context! is it writeable now?
*/
oldest = get_oldest_context(inode, NULL, NULL); if (snapc->seq > oldest->seq) { /* not writeable -- return it for the caller to deal with */
ceph_put_snap_context(oldest);
doutc(cl, " %llx.%llx folio %p snapc %p not current or oldest\n",
ceph_vinop(inode), folio, snapc); return ceph_get_snap_context(snapc);
}
ceph_put_snap_context(oldest);
/* yay, writeable, do it now (without dropping folio lock) */
doutc(cl, " %llx.%llx folio %p snapc %p not current, but oldest\n",
ceph_vinop(inode), folio, snapc); if (folio_clear_dirty_for_io(folio)) { int r = write_folio_nounlock(folio, NULL); if (r < 0) return ERR_PTR(r);
}
} return NULL;
}
snapc = ceph_find_incompatible(*foliop); if (snapc) { int r;
folio_unlock(*foliop);
folio_put(*foliop);
*foliop = NULL; if (IS_ERR(snapc)) return PTR_ERR(snapc);
ceph_queue_writeback(inode);
r = wait_event_killable(ci->i_cap_wq,
context_is_writeable_or_written(inode, snapc));
ceph_put_snap_context(snapc); return r == 0 ? -EAGAIN : r;
} return 0;
}
/* * We are only allowed to write into/dirty the page if the page is * clean, or already dirty within the same snap context.
*/ staticint ceph_write_begin(conststruct kiocb *iocb, struct address_space *mapping,
loff_t pos, unsigned len, struct folio **foliop, void **fsdata)
{ struct file *file = iocb->ki_filp; struct inode *inode = file_inode(file); struct ceph_inode_info *ci = ceph_inode(inode); int r;
r = netfs_write_begin(&ci->netfs, file, inode->i_mapping, pos, len, foliop, NULL); if (r < 0) return r;
if (!folio_test_uptodate(folio)) { /* just return that nothing was copied on a short copy */ if (copied < len) {
copied = 0; goto out;
}
folio_mark_uptodate(folio);
}
/* did file size increase? */ if (pos+copied > i_size_read(inode))
check_cap = ceph_inode_set_size(inode, pos+copied);
folio_mark_dirty(folio);
out:
folio_unlock(folio);
folio_put(folio);
if (check_cap)
ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY);
doutc(cl, "%llx.%llx %llu got cap refs on %s\n", ceph_vinop(inode),
off, ceph_cap_string(got));
if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
!ceph_has_inline_data(ci)) {
CEPH_DEFINE_RW_CONTEXT(rw_ctx, got);
ceph_add_rw_context(fi, &rw_ctx);
ret = filemap_fault(vmf);
ceph_del_rw_context(fi, &rw_ctx);
doutc(cl, "%llx.%llx %llu drop cap refs %s ret %x\n",
ceph_vinop(inode), off, ceph_cap_string(got), ret);
} else
err = -EAGAIN;
ceph_put_cap_refs(ci, got);
if (err != -EAGAIN) goto out_restore;
/* read inline data */ if (off >= PAGE_SIZE) { /* does not support inline data > PAGE_SIZE */
ret = VM_FAULT_SIGBUS;
} else { struct address_space *mapping = inode->i_mapping; struct page *page;
filemap_invalidate_lock_shared(mapping);
page = find_or_create_page(mapping, 0,
mapping_gfp_constraint(mapping, ~__GFP_FS)); if (!page) {
ret = VM_FAULT_OOM; goto out_inline;
}
err = __ceph_do_getattr(inode, page,
CEPH_STAT_CAP_INLINE_DATA, true); if (err < 0 || off >= i_size_read(inode)) {
unlock_page(page);
put_page(page);
ret = vmf_error(err); goto out_inline;
} if (err < PAGE_SIZE)
zero_user_segment(page, err, PAGE_SIZE); else
flush_dcache_page(page);
SetPageUptodate(page);
vmf->page = page;
ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED;
out_inline:
filemap_invalidate_unlock_shared(mapping);
doutc(cl, "%llx.%llx %llu read inline data ret %x\n",
ceph_vinop(inode), off, ret);
}
out_restore:
ceph_restore_sigs(&oldset); if (err < 0)
ret = vmf_error(err);
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.