Linux Archive

Linux Archive (http://www.linux-archive.org/)
-   Device-mapper Development (http://www.linux-archive.org/device-mapper-development/)
-   -   aio: Rewrite refcounting (http://www.linux-archive.org/device-mapper-development/710740-aio-rewrite-refcounting.html)

Kent Overstreet 10-09-2012 06:39 AM

aio: Rewrite refcounting
 
The refcounting before wasn't very clear; there are two refcounts in
struct kioctx, with an unclear relationship between them (or between
them and ctx->dead).

Now, reqs_active holds a refcount on users (when reqs_active is
nonzero), and the initial refcount is taken on reqs_active - when
ctx->dead goes to 1, we drop that initial refcount.

Some other semi related cleanup too.

Signed-off-by: Kent Overstreet <koverstreet@google.com>
---
fs/aio.c | 187 +++++++++++++++++----------------------------------
include/linux/aio.h | 5 +-
2 files changed, 63 insertions(+), 129 deletions(-)

diff --git a/fs/aio.c b/fs/aio.c
index 95419c4..3ab12f6 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -86,8 +86,9 @@ static void aio_free_ring(struct kioctx *ctx)
put_page(info->ring_pages[i]);

if (info->mmap_size) {
- BUG_ON(ctx->mm != current->mm);
- vm_munmap(info->mmap_base, info->mmap_size);
+ down_write(&ctx->mm->mmap_sem);
+ do_munmap(ctx->mm, info->mmap_base, info->mmap_size);
+ up_write(&ctx->mm->mmap_sem);
}

if (info->ring_pages && info->ring_pages != info->internal_pages)
@@ -188,45 +189,37 @@ static int aio_setup_ring(struct kioctx *ctx)
kunmap_atomic((void *)((unsigned long)__event & PAGE_MASK));
} while(0)

-static void ctx_rcu_free(struct rcu_head *head)
-{
- struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
- kmem_cache_free(kioctx_cachep, ctx);
-}
-
-/* __put_ioctx
- * Called when the last user of an aio context has gone away,
- * and the struct needs to be freed.
- */
-static void __put_ioctx(struct kioctx *ctx)
+static void free_ioctx(struct work_struct *work)
{
- unsigned nr_events = ctx->max_reqs;
- BUG_ON(ctx->reqs_active);
+ struct kioctx *ctx = container_of(work, struct kioctx, free_work);

cancel_delayed_work_sync(&ctx->wq);
aio_free_ring(ctx);
mmdrop(ctx->mm);
- ctx->mm = NULL;
- if (nr_events) {
- spin_lock(&aio_nr_lock);
- BUG_ON(aio_nr - nr_events > aio_nr);
- aio_nr -= nr_events;
- spin_unlock(&aio_nr_lock);
- }
- pr_debug("__put_ioctx: freeing %p
", ctx);
- call_rcu(&ctx->rcu_head, ctx_rcu_free);
-}

-static inline int try_get_ioctx(struct kioctx *kioctx)
-{
- return atomic_inc_not_zero(&kioctx->users);
+ spin_lock(&aio_nr_lock);
+ BUG_ON(aio_nr - ctx->max_reqs > aio_nr);
+ aio_nr -= ctx->max_reqs;
+ spin_unlock(&aio_nr_lock);
+
+ synchronize_rcu();
+
+ pr_debug("__put_ioctx: freeing %p
", ctx);
+ kmem_cache_free(kioctx_cachep, ctx);
}

static inline void put_ioctx(struct kioctx *kioctx)
{
BUG_ON(atomic_read(&kioctx->users) <= 0);
if (unlikely(atomic_dec_and_test(&kioctx->users)))
- __put_ioctx(kioctx);
+ schedule_work(&kioctx->free_work);
+}
+
+static inline void req_put_ioctx(struct kioctx *kioctx)
+{
+ BUG_ON(atomic_read(&kioctx->reqs_active) <= 0);
+ if (unlikely(atomic_dec_and_test(&kioctx->reqs_active)))
+ put_ioctx(kioctx);
}

static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
@@ -280,12 +273,15 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
atomic_inc(&mm->mm_count);

atomic_set(&ctx->users, 2);
+ atomic_set(&ctx->reqs_active, 1);
+
spin_lock_init(&ctx->ctx_lock);
spin_lock_init(&ctx->ring_info.ring_lock);
init_waitqueue_head(&ctx->wait);

INIT_LIST_HEAD(&ctx->active_reqs);
INIT_LIST_HEAD(&ctx->run_list);
+ INIT_WORK(&ctx->free_work, free_ioctx);
INIT_DELAYED_WORK(&ctx->wq, aio_kick_handler);

if (aio_setup_ring(ctx) < 0)
@@ -327,36 +323,25 @@ out_freectx:
*/
static void kill_ctx(struct kioctx *ctx)
{
- struct task_struct *tsk = current;
- DECLARE_WAITQUEUE(wait, tsk);
+ struct kiocb *iocb, *t;
struct io_event res;
+ int put = 0;

spin_lock_irq(&ctx->ctx_lock);
- ctx->dead = 1;
- while (!list_empty(&ctx->active_reqs)) {
- struct list_head *pos = ctx->active_reqs.next;
- struct kiocb *iocb = list_kiocb(pos);
- list_del_init(&iocb->ki_list);

- kiocb_cancel(ctx, iocb, &res);
+ if (!ctx->dead) {
+ put = 1;
+ ctx->dead = 1;
+ hlist_del_rcu(&ctx->list);
}

- if (!ctx->reqs_active)
- goto out;
-
- add_wait_queue(&ctx->wait, &wait);
- set_task_state(tsk, TASK_UNINTERRUPTIBLE);
- while (ctx->reqs_active) {
- spin_unlock_irq(&ctx->ctx_lock);
- io_schedule();
- set_task_state(tsk, TASK_UNINTERRUPTIBLE);
- spin_lock_irq(&ctx->ctx_lock);
- }
- __set_task_state(tsk, TASK_RUNNING);
- remove_wait_queue(&ctx->wait, &wait);
+ list_for_each_entry_safe(iocb, t, &ctx->active_reqs, ki_list)
+ kiocb_cancel(ctx, iocb, &res);

-out:
spin_unlock_irq(&ctx->ctx_lock);
+
+ if (put)
+ req_put_ioctx(ctx);
}

/* wait_on_sync_kiocb:
@@ -385,18 +370,16 @@ EXPORT_SYMBOL(wait_on_sync_kiocb);
void exit_aio(struct mm_struct *mm)
{
struct kioctx *ctx;
+ struct hlist_node *p;

- while (!hlist_empty(&mm->ioctx_list)) {
- ctx = hlist_entry(mm->ioctx_list.first, struct kioctx, list);
- hlist_del_rcu(&ctx->list);
-
- kill_ctx(ctx);
+ spin_lock(&mm->ioctx_lock);

+ hlist_for_each_entry(ctx, p, &mm->ioctx_list, list) {
if (1 != atomic_read(&ctx->users))
printk(KERN_DEBUG
"exit_aio:ioctx still alive: %d %d %d
",
atomic_read(&ctx->users), ctx->dead,
- ctx->reqs_active);
+ atomic_read(&ctx->reqs_active));
/*
* We don't need to bother with munmap() here -
* exit_mmap(mm) is coming and it'll unmap everything.
@@ -408,39 +391,34 @@ void exit_aio(struct mm_struct *mm)
* all other callers have ctx->mm == current->mm.
*/
ctx->ring_info.mmap_size = 0;
- put_ioctx(ctx);
+ kill_ctx(ctx);
}
+
+ spin_unlock(&mm->ioctx_lock);
}

/* aio_get_req
- * Allocate a slot for an aio request. Increments the users count
+ * Allocate a slot for an aio request. Increments the ki_users count
* of the kioctx so that the kioctx stays around until all requests are
* complete. Returns NULL if no requests are free.
*
- * Returns with kiocb->users set to 2. The io submit code path holds
+ * Returns with kiocb->ki_users set to 2. The io submit code path holds
* an extra reference while submitting the i/o.
* This prevents races between the aio code path referencing the
* req (after submitting it) and aio_complete() freeing the req.
*/
static struct kiocb *__aio_get_req(struct kioctx *ctx)
{
- struct kiocb *req = NULL;
+ struct kiocb *req;

req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL);
if (unlikely(!req))
return NULL;

- req->ki_flags = 0;
+ memset(req, 0, sizeof(*req));
req->ki_users = 2;
- req->ki_key = 0;
req->ki_ctx = ctx;
- req->ki_cancel = NULL;
- req->ki_retry = NULL;
- req->ki_dtor = NULL;
- req->private = NULL;
- req->ki_iovec = NULL;
INIT_LIST_HEAD(&req->ki_run_list);
- req->ki_eventfd = NULL;

return req;
}
@@ -473,10 +451,8 @@ static void kiocb_batch_free(struct kioctx *ctx, struct kiocb_batch *batch)
list_del(&req->ki_batch);
list_del(&req->ki_list);
kmem_cache_free(kiocb_cachep, req);
- ctx->reqs_active--;
+ req_put_ioctx(ctx);
}
- if (unlikely(!ctx->reqs_active && ctx->dead))
- wake_up_all(&ctx->wait);
spin_unlock_irq(&ctx->ctx_lock);
}

@@ -506,7 +482,7 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
spin_lock_irq(&ctx->ctx_lock);
ring = kmap_atomic(ctx->ring_info.ring_pages[0]);

- avail = aio_ring_avail(&ctx->ring_info, ring) - ctx->reqs_active;
+ avail = aio_ring_avail(&ctx->ring_info, ring) - atomic_read(&ctx->reqs_active);
BUG_ON(avail < 0);
if (avail < allocated) {
/* Trim back the number of requests. */
@@ -521,7 +497,7 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
batch->count -= allocated;
list_for_each_entry(req, &batch->head, ki_batch) {
list_add(&req->ki_list, &ctx->active_reqs);
- ctx->reqs_active++;
+ atomic_inc(&ctx->reqs_active);
}

kunmap_atomic(ring);
@@ -546,8 +522,7 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx,

static inline void really_put_req(struct kioctx *ctx, struct kiocb *req)
{
- assert_spin_locked(&ctx->ctx_lock);
-
+ fput(req->ki_filp);
if (req->ki_eventfd != NULL)
eventfd_ctx_put(req->ki_eventfd);
if (req->ki_dtor)
@@ -555,10 +530,8 @@ static inline void really_put_req(struct kioctx *ctx, struct kiocb *req)
if (req->ki_iovec != &req->ki_inline_vec)
kfree(req->ki_iovec);
kmem_cache_free(kiocb_cachep, req);
- ctx->reqs_active--;

- if (unlikely(!ctx->reqs_active && ctx->dead))
- wake_up_all(&ctx->wait);
+ req_put_ioctx(ctx);
}

/* __aio_put_req
@@ -566,8 +539,8 @@ static inline void really_put_req(struct kioctx *ctx, struct kiocb *req)
*/
static void __aio_put_req(struct kioctx *ctx, struct kiocb *req)
{
- dprintk(KERN_DEBUG "aio_put(%p): f_count=%ld
",
- req, atomic_long_read(&req->ki_filp->f_count));
+ pr_debug("req=%p f_count=%ld
",
+ req, atomic_long_read(&req->ki_filp->f_count));

assert_spin_locked(&ctx->ctx_lock);

@@ -576,11 +549,7 @@ static void __aio_put_req(struct kioctx *ctx, struct kiocb *req)
if (likely(req->ki_users))
return;
list_del(&req->ki_list); /* remove from active_reqs */
- req->ki_cancel = NULL;
- req->ki_retry = NULL;

- fput(req->ki_filp);
- req->ki_filp = NULL;
really_put_req(ctx, req);
}

@@ -605,18 +574,13 @@ static struct kioctx *lookup_ioctx(unsigned long ctx_id)

rcu_read_lock();

- hlist_for_each_entry_rcu(ctx, n, &mm->ioctx_list, list) {
- /*
- * RCU protects us against accessing freed memory but
- * we have to be careful not to get a reference when the
- * reference count already dropped to 0 (ctx->dead test
- * is unreliable because of races).
- */
- if (ctx->user_id == ctx_id && !ctx->dead && try_get_ioctx(ctx)){
+ hlist_for_each_entry_rcu(ctx, n, &mm->ioctx_list, list)
+ if (ctx->user_id == ctx_id){
+ BUG_ON(ctx->dead);
+ atomic_inc(&ctx->users);
ret = ctx;
break;
}
- }

rcu_read_unlock();
return ret;
@@ -1162,7 +1126,7 @@ retry:
break;
/* Try to only show up in io wait if there are ops
* in flight */
- if (ctx->reqs_active)
+ if (atomic_read(&ctx->reqs_active) > 1)
io_schedule();
else
schedule();
@@ -1197,35 +1161,6 @@ out:
return i ? i : ret;
}

-/* Take an ioctx and remove it from the list of ioctx's. Protects
- * against races with itself via ->dead.
- */
-static void io_destroy(struct kioctx *ioctx)
-{
- struct mm_struct *mm = current->mm;
- int was_dead;
-
- /* delete the entry from the list is someone else hasn't already */
- spin_lock(&mm->ioctx_lock);
- was_dead = ioctx->dead;
- ioctx->dead = 1;
- hlist_del_rcu(&ioctx->list);
- spin_unlock(&mm->ioctx_lock);
-
- dprintk("aio_release(%p)
", ioctx);
- if (likely(!was_dead))
- put_ioctx(ioctx); /* twice for the list */
-
- kill_ctx(ioctx);
-
- /*
- * Wake up any waiters. The setting of ctx->dead must be seen
- * by other CPUs at this point. Right now, we rely on the
- * locking done by the above calls to ensure this consistency.
- */
- wake_up_all(&ioctx->wait);
-}
-
/* sys_io_setup:
* Create an aio_context capable of receiving at least nr_events.
* ctxp must not point to an aio_context that already exists, and
@@ -1261,7 +1196,7 @@ SYSCALL_DEFINE2(io_setup, unsigned, nr_events, aio_context_t __user *, ctxp)
if (!IS_ERR(ioctx)) {
ret = put_user(ioctx->user_id, ctxp);
if (ret)
- io_destroy(ioctx);
+ kill_ctx(ioctx);
put_ioctx(ioctx);
}

@@ -1279,7 +1214,7 @@ SYSCALL_DEFINE1(io_destroy, aio_context_t, ctx)
{
struct kioctx *ioctx = lookup_ioctx(ctx);
if (likely(NULL != ioctx)) {
- io_destroy(ioctx);
+ kill_ctx(ioctx);
put_ioctx(ioctx);
return 0;
}
diff --git a/include/linux/aio.h b/include/linux/aio.h
index 4cde86d..eb6e5e4 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
@@ -192,7 +192,7 @@ struct kioctx {

spinlock_t ctx_lock;

- int reqs_active;
+ atomic_t reqs_active;
struct list_head active_reqs; /* used for cancellation */
struct list_head run_list; /* used for kicked reqs */

@@ -202,8 +202,7 @@ struct kioctx {
struct aio_ring_info ring_info;

struct delayed_work wq;
-
- struct rcu_head rcu_head;
+ struct work_struct free_work;
};

/* prototypes */
--
1.7.10.4

--
dm-devel mailing list
dm-devel@redhat.com
https://www.redhat.com/mailman/listinfo/dm-devel

Zach Brown 10-09-2012 06:27 PM

aio: Rewrite refcounting
 
On Mon, Oct 08, 2012 at 11:39:18PM -0700, Kent Overstreet wrote:
> The refcounting before wasn't very clear; there are two refcounts in
> struct kioctx, with an unclear relationship between them (or between
> them and ctx->dead).
>
> Now, reqs_active holds a refcount on users (when reqs_active is
> nonzero), and the initial refcount is taken on reqs_active - when
> ctx->dead goes to 1, we drop that initial refcount.

I agree that it's a mess, but let's rethink this work on top of the
series I'm sending out that gets rid of the retry and cancel code. It
makes the code a lot easier to follow. (And Jens also has some patches
to take fewer locks in the submission path, we'll want to take them into
account too.)

- z

--
dm-devel mailing list
dm-devel@redhat.com
https://www.redhat.com/mailman/listinfo/dm-devel

Kent Overstreet 10-09-2012 10:21 PM

aio: Rewrite refcounting
 
On Tue, Oct 09, 2012 at 11:27:55AM -0700, Zach Brown wrote:
> On Mon, Oct 08, 2012 at 11:39:18PM -0700, Kent Overstreet wrote:
> > The refcounting before wasn't very clear; there are two refcounts in
> > struct kioctx, with an unclear relationship between them (or between
> > them and ctx->dead).
> >
> > Now, reqs_active holds a refcount on users (when reqs_active is
> > nonzero), and the initial refcount is taken on reqs_active - when
> > ctx->dead goes to 1, we drop that initial refcount.
>
> I agree that it's a mess, but let's rethink this work on top of the
> series I'm sending out that gets rid of the retry and cancel code. It
> makes the code a lot easier to follow. (And Jens also has some patches
> to take fewer locks in the submission path, we'll want to take them into
> account too.)

Alright... send it out then. Also, do you know which branch Jens has his
patches in?

--
dm-devel mailing list
dm-devel@redhat.com
https://www.redhat.com/mailman/listinfo/dm-devel

Zach Brown 10-09-2012 10:35 PM

aio: Rewrite refcounting
 
> Alright... send it out then.

Workin' on it! :)

> Also, do you know which branch Jens has his patches in?

http://git.kernel.dk/?p=linux-block.git;a=commit;h=6b6723fc3e4f24dbd80526df935ca 115ead578c6

https://plus.google.com/111643045511375507360/posts

As far as I know, he hasn't had a chance to work on it recently.

- z

--
dm-devel mailing list
dm-devel@redhat.com
https://www.redhat.com/mailman/listinfo/dm-devel

Kent Overstreet 10-10-2012 12:17 AM

aio: Rewrite refcounting
 
On Tue, Oct 09, 2012 at 03:35:04PM -0700, Zach Brown wrote:
> > Alright... send it out then.
>
> Workin' on it! :)
>
> > Also, do you know which branch Jens has his patches in?
>
> http://git.kernel.dk/?p=linux-block.git;a=commit;h=6b6723fc3e4f24dbd80526df935ca 115ead578c6
>
> https://plus.google.com/111643045511375507360/posts
>
> As far as I know, he hasn't had a chance to work on it recently.

Thanks - wasn't sure if I was looking at the right branch. Looking at it
now...

--
dm-devel mailing list
dm-devel@redhat.com
https://www.redhat.com/mailman/listinfo/dm-devel


All times are GMT. The time now is 03:58 AM.

VBulletin, Copyright ©2000 - 2014, Jelsoft Enterprises Ltd.
Content Relevant URLs by vBSEO ©2007, Crawlability, Inc.