ceph: make readpages fully async
When we get a ->readpages() aop, submit async reads for all page ranges
in the provided page list. Lock the pages immediately, so that VFS/MM
will block until the reads complete.
Signed-off-by: Sage Weil <sage@newdream.net>
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 5a3953d..5bb39a5 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -228,36 +228,128 @@
}
/*
- * Build a vector of contiguous pages from the provided page list.
+ * Finish an async read(ahead) op.
*/
-static struct page **page_vector_from_list(struct list_head *page_list,
- unsigned *nr_pages)
+static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
{
+ struct inode *inode = req->r_inode;
+ struct ceph_osd_reply_head *replyhead;
+ int rc, bytes;
+ int i;
+
+ /* parse reply */
+ replyhead = msg->front.iov_base;
+ WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
+ rc = le32_to_cpu(replyhead->result);
+ bytes = le32_to_cpu(msg->hdr.data_len);
+
+ dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
+
+ /* unlock all pages, zeroing any data we didn't read */
+ for (i = 0; i < req->r_num_pages; i++, bytes -= PAGE_CACHE_SIZE) {
+ struct page *page = req->r_pages[i];
+
+ if (bytes < (int)PAGE_CACHE_SIZE) {
+ /* zero (remainder of) page */
+ int s = bytes < 0 ? 0 : bytes;
+ zero_user_segment(page, s, PAGE_CACHE_SIZE);
+ }
+ dout("finish_read %p uptodate %p idx %lu\n", inode, page,
+ page->index);
+ flush_dcache_page(page);
+ SetPageUptodate(page);
+ unlock_page(page);
+ page_cache_release(page);
+ }
+ kfree(req->r_pages);
+}
+
+/*
+ * start an async read(ahead) operation. return nr_pages we submitted
+ * a read for on success, or negative error code.
+ */
+static int start_read(struct inode *inode, struct list_head *page_list)
+{
+ struct ceph_osd_client *osdc =
+ &ceph_inode_to_client(inode)->client->osdc;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct page *page = list_entry(page_list->prev, struct page, lru);
+ struct ceph_osd_request *req;
+ u64 off;
+ u64 len;
+ int i;
struct page **pages;
- struct page *page;
- int next_index, contig_pages = 0;
+ pgoff_t next_index;
+ int nr_pages = 0;
+ int ret;
+
+ off = page->index << PAGE_CACHE_SHIFT;
+
+ /* count pages */
+ next_index = page->index;
+ list_for_each_entry_reverse(page, page_list, lru) {
+ if (page->index != next_index)
+ break;
+ nr_pages++;
+ next_index++;
+ }
+ len = nr_pages << PAGE_CACHE_SHIFT;
+ dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages,
+ off, len);
+
+ req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode),
+ off, &len,
+ CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
+ NULL, 0,
+ ci->i_truncate_seq, ci->i_truncate_size,
+ NULL, false, 1, 0);
+ if (!req)
+ return -ENOMEM;
/* build page vector */
- pages = kmalloc(sizeof(*pages) * *nr_pages, GFP_NOFS);
+ nr_pages = len >> PAGE_CACHE_SHIFT;
+ pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS);
+ ret = -ENOMEM;
if (!pages)
- return ERR_PTR(-ENOMEM);
-
- BUG_ON(list_empty(page_list));
- next_index = list_entry(page_list->prev, struct page, lru)->index;
- list_for_each_entry_reverse(page, page_list, lru) {
- if (page->index == next_index) {
- dout("readpages page %d %p\n", contig_pages, page);
- pages[contig_pages] = page;
- contig_pages++;
- next_index++;
- } else {
- break;
+ goto out;
+ for (i = 0; i < nr_pages; ++i) {
+ page = list_entry(page_list->prev, struct page, lru);
+ BUG_ON(PageLocked(page));
+ list_del(&page->lru);
+
+ dout("start_read %p adding %p idx %lu\n", inode, page,
+ page->index);
+ if (add_to_page_cache_lru(page, &inode->i_data, page->index,
+ GFP_NOFS)) {
+ page_cache_release(page);
+ dout("start_read %p add_to_page_cache failed %p\n",
+ inode, page);
+ nr_pages = i;
+ goto out_pages;
}
+ pages[i] = page;
}
- *nr_pages = contig_pages;
- return pages;
+ req->r_pages = pages;
+ req->r_num_pages = nr_pages;
+ req->r_callback = finish_read;
+ req->r_inode = inode;
+
+ dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len);
+ ret = ceph_osdc_start_request(osdc, req, false);
+ if (ret < 0)
+ goto out_pages;
+ ceph_osdc_put_request(req);
+ return nr_pages;
+
+out_pages:
+ ceph_release_page_vector(pages, nr_pages);
+ kfree(pages);
+out:
+ ceph_osdc_put_request(req);
+ return ret;
}
+
/*
* Read multiple pages. Leave pages we don't read + unlock in page_list;
* the caller (VM) cleans them up.
@@ -266,64 +358,17 @@
struct list_head *page_list, unsigned nr_pages)
{
struct inode *inode = file->f_dentry->d_inode;
- struct ceph_inode_info *ci = ceph_inode(inode);
- struct ceph_osd_client *osdc =
- &ceph_inode_to_client(inode)->client->osdc;
int rc = 0;
- struct page **pages;
- loff_t offset;
- u64 len;
- dout("readpages %p file %p nr_pages %d\n",
- inode, file, nr_pages);
-
- pages = page_vector_from_list(page_list, &nr_pages);
- if (IS_ERR(pages))
- return PTR_ERR(pages);
-
- /* guess read extent */
- offset = pages[0]->index << PAGE_CACHE_SHIFT;
- len = nr_pages << PAGE_CACHE_SHIFT;
- rc = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
- offset, &len,
- ci->i_truncate_seq, ci->i_truncate_size,
- pages, nr_pages, 0);
- if (rc == -ENOENT)
- rc = 0;
- if (rc < 0)
- goto out;
-
- for (; !list_empty(page_list) && len > 0;
- rc -= PAGE_CACHE_SIZE, len -= PAGE_CACHE_SIZE) {
- struct page *page =
- list_entry(page_list->prev, struct page, lru);
-
- list_del(&page->lru);
-
- if (rc < (int)PAGE_CACHE_SIZE) {
- /* zero (remainder of) page */
- int s = rc < 0 ? 0 : rc;
- zero_user_segment(page, s, PAGE_CACHE_SIZE);
- }
-
- if (add_to_page_cache_lru(page, mapping, page->index,
- GFP_NOFS)) {
- page_cache_release(page);
- dout("readpages %p add_to_page_cache failed %p\n",
- inode, page);
- continue;
- }
- dout("readpages %p adding %p idx %lu\n", inode, page,
- page->index);
- flush_dcache_page(page);
- SetPageUptodate(page);
- unlock_page(page);
- page_cache_release(page);
+ dout("readpages %p file %p nr_pages %d\n", inode, file, nr_pages);
+ while (!list_empty(page_list)) {
+ rc = start_read(inode, page_list);
+ if (rc < 0)
+ goto out;
+ BUG_ON(rc == 0);
}
- rc = 0;
-
out:
- kfree(pages);
+ dout("readpages %p file %p ret %d\n", inode, file, rc);
return rc;
}