Page Menu
Home
FreeBSD
Search
Configure Global Search
Log In
Files
F144621054
D33144.1775830493.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
14 KB
Referenced Files
None
Subscribers
None
D33144.1775830493.diff
View Options
diff --git a/share/man/man4/aio.4 b/share/man/man4/aio.4
--- a/share/man/man4/aio.4
+++ b/share/man/man4/aio.4
@@ -27,7 +27,7 @@
.\"
.\" $FreeBSD$
.\"
-.Dd January 2, 2021
+.Dd December 5, 2021
.Dt AIO 4
.Os
.Sh NAME
@@ -170,6 +170,7 @@
to, its
.Va sigev_notify_kevent_flags
field may contain
+.Dv AIO_KEVENT_FLAG_REAP ,
.Dv EV_ONESHOT ,
.Dv EV_CLEAR , and/or
.Dv EV_DISPATCH , and its
@@ -185,8 +186,30 @@
.It Va udata Ta
value stored in
.Va aio_sigevent.sigev_value
+.It Va data Ta
+if
+.Dv AIO_KEVENT_FLAG_REAP
+is specified,
+error or return status, depending on
+.Dv EV_ERROR
+flag
.El
.Pp
+If
+.Dv AIO_KEVENT_FLAG_REAP
+is specified in
+.Va sigev_notify_kevent_flags ,
+then it is not necessary to call
+.Xr aio_error 2
+and
+.Xr aio_return 2
+in the common case.
+Note that it is still necessary to query the status
+of individual I/O operations explicitly with
+.Xr aio_error 2
+after a failed call to
+.Xr lio_listio 2 .
+.Pp
For
.Dv SIGEV_SIGNO
and
diff --git a/sys/kern/vfs_aio.c b/sys/kern/vfs_aio.c
--- a/sys/kern/vfs_aio.c
+++ b/sys/kern/vfs_aio.c
@@ -496,12 +496,30 @@
struct kaioinfo *ki;
struct aioliojob *lj;
struct proc *p;
+ struct thread *td;
p = job->userproc;
- MPASS(curproc == p);
ki = p->p_aioinfo;
MPASS(ki != NULL);
+ /*
+ * The thread argument here is used to find the owning process
+ * and is also passed to fo_close() which may pass it to various
+ * places such as devsw close() routines. Because of that, we
+ * need a thread pointer from the process owning the job that is
+ * persistent and won't disappear out from under us or move to
+ * another process.
+ *
+ * We use the thread that submitted the IO when freeing jobs
+ * asynchronously (ie AIO_KEVENT_FLAG_REAP), and we know that is safe
+ * because aio_thread_exit() waits for submitted IOs to finish. We
+ * don't want to make aio_thread_exit() wait for a user to call
+ * aio_return() though, so we drop that reference as soon as possible
+ * (namely, after updating the submitting thread's rusage counters),
+ * and use the calling thread here, in that case.
+ */
+ td = job->td ? job->td : curthread;
+
AIO_LOCK_ASSERT(ki, MA_OWNED);
MPASS(job->jobflags & KAIOCB_FINISHED);
@@ -521,7 +539,7 @@
if (lj->lioj_count == 0) {
TAILQ_REMOVE(&ki->kaio_liojoblist, lj, lioj_list);
/* lio is going away, we need to destroy any knotes */
- knlist_delete(&lj->klist, curthread, 1);
+ knlist_delete(&lj->klist, td, 1);
PROC_LOCK(p);
sigqueue_take(&lj->lioj_ksi);
PROC_UNLOCK(p);
@@ -530,35 +548,15 @@
}
/* job is going away, we need to destroy any knotes */
- knlist_delete(&job->klist, curthread, 1);
+ knlist_delete(&job->klist, td, 1);
PROC_LOCK(p);
sigqueue_take(&job->ksi);
PROC_UNLOCK(p);
AIO_UNLOCK(ki);
- /*
- * The thread argument here is used to find the owning process
- * and is also passed to fo_close() which may pass it to various
- * places such as devsw close() routines. Because of that, we
- * need a thread pointer from the process owning the job that is
- * persistent and won't disappear out from under us or move to
- * another process.
- *
- * Currently, all the callers of this function call it to remove
- * a kaiocb from the current process' job list either via a
- * syscall or due to the current process calling exit() or
- * execve(). Thus, we know that p == curproc. We also know that
- * curthread can't exit since we are curthread.
- *
- * Therefore, we use curthread as the thread to pass to
- * knlist_delete(). This does mean that it is possible for the
- * thread pointer at close time to differ from the thread pointer
- * at open time, but this is already true of file descriptors in
- * a multithreaded process.
- */
if (job->fd_file)
- fdrop(job->fd_file, curthread);
+ fdrop(job->fd_file, td);
crfree(job->cred);
if (job->uiop != &job->uio)
free(job->uiop, M_IOV);
@@ -970,6 +968,18 @@
&ki->kaio_sync_task);
}
+ /*
+ * If asynchronous reap was requested, the result was stored in the
+ * knote data member by KNOTE_UNLOCKED() above, and we can clear our
+ * knote list and free the job eagerly while it still has a reference
+ * to the submitting thread (needed for fdrop()).
+ */
+ if (job->uaiocb.aio_sigevent.sigev_notify == SIGEV_KEVENT &&
+ (job->uaiocb.aio_sigevent.sigev_notify_kevent_flags & AIO_KEVENT_FLAG_REAP)) {
+ knlist_clear(&job->klist, 1);
+ aio_free_entry(job);
+ }
+
/*
* Drop our reference to the submitting thread. This allows it to
* exit.
@@ -1074,6 +1084,12 @@
job->uaiocb._aiocb_private.error = error;
job->uaiocb._aiocb_private.status = status;
+ /* Write the result to the user space object. */
+ if (curproc->p_vmspace != job->userproc->p_vmspace)
+ aio_switch_vmspace(job);
+ job->ops->store_status(job->ujob, status);
+ job->ops->store_error(job->ujob, error);
+
/*
* Transfer the resource usage delta to the submitting thread's
* counters.
@@ -1633,6 +1649,12 @@
/* Save userspace address of the job info. */
job->ujob = ujob;
+ /*
+ * Record the ops table so that worker daemons can write result to user
+ * space with the correct ABI.
+ */
+ job->ops = ops;
+
/*
* Validate the opcode and fetch the file object for the specified
* file descriptor.
@@ -1707,7 +1729,7 @@
if (job->uaiocb.aio_sigevent.sigev_notify != SIGEV_KEVENT)
goto no_kqueue;
evflags = job->uaiocb.aio_sigevent.sigev_notify_kevent_flags;
- if ((evflags & ~(EV_CLEAR | EV_DISPATCH | EV_ONESHOT)) != 0) {
+ if ((evflags & ~(EV_CLEAR | EV_DISPATCH | EV_ONESHOT | AIO_KEVENT_FLAG_REAP)) != 0) {
error = EINVAL;
goto err3;
}
@@ -1758,6 +1780,8 @@
/*
* Take a reference to the submitting thread, so that worker daemons
* can update this thread's resource usage counters.
+ *
+ * It is also needed to free resources asynchronously, if requested.
*/
job->td = td;
atomic_add_int(&td->td_aio_count, 1);
@@ -1994,8 +2018,6 @@
td->td_retval[0] = status;
aio_free_entry(job);
AIO_UNLOCK(ki);
- ops->store_error(ujob, error);
- ops->store_status(ujob, status);
} else {
error = EINVAL;
AIO_UNLOCK(ki);
@@ -2180,41 +2202,12 @@
static int
kern_aio_error(struct thread *td, struct aiocb *ujob, struct aiocb_ops *ops)
{
- struct proc *p = td->td_proc;
- struct kaiocb *job;
- struct kaioinfo *ki;
- int status;
-
- ki = p->p_aioinfo;
- if (ki == NULL) {
- td->td_retval[0] = EINVAL;
- return (0);
- }
-
- AIO_LOCK(ki);
- TAILQ_FOREACH(job, &ki->kaio_all, allist) {
- if (job->ujob == ujob) {
- if (job->jobflags & KAIOCB_FINISHED)
- td->td_retval[0] =
- job->uaiocb._aiocb_private.error;
- else
- td->td_retval[0] = EINPROGRESS;
- AIO_UNLOCK(ki);
- return (0);
- }
- }
- AIO_UNLOCK(ki);
-
/*
- * Hack for failure of aio_aqueue.
+ * We return the value we last stored in the user space object,
+ * so that it's available after a partial failure of lio_listio().
*/
- status = ops->fetch_status(ujob);
- if (status == -1) {
- td->td_retval[0] = ops->fetch_error(ujob);
- return (0);
- }
+ td->td_retval[0] = ops->fetch_error(ujob);
- td->td_retval[0] = EINVAL;
return (0);
}
@@ -2622,8 +2615,6 @@
aio_free_entry(job);
AIO_UNLOCK(ki);
ops->store_aiocb(ujobp, ujob);
- ops->store_error(ujob, error);
- ops->store_status(ujob, status);
} else
AIO_UNLOCK(ki);
@@ -2693,6 +2684,17 @@
kn->kn_ptr.p_aio = job;
kn->kn_flags &= ~EV_FLAG1;
+ /*
+ * Since AIO_KEVENT_FLAG_REAP (EV_FLAG2) is overloaded with EV_ERROR,
+ * we clear it from flags (it's still visible inside the job object
+ * where it will affect our behavior), but implicitly enable
+ * EV_ONESHOT.
+ */
+ if (kn->kn_flags & AIO_KEVENT_FLAG_REAP) {
+ kn->kn_flags &= ~AIO_KEVENT_FLAG_REAP;
+ kn->kn_flags |= EV_ONESHOT;
+ }
+
knlist_add(&job->klist, kn, 0);
return (0);
@@ -2712,15 +2714,37 @@
}
/* kqueue filter function */
-/*ARGSUSED*/
static int
filt_aio(struct knote *kn, long hint)
{
struct kaiocb *job = kn->kn_ptr.p_aio;
- kn->kn_data = job->uaiocb._aiocb_private.error;
+ if (job->uaiocb.aio_sigevent.sigev_notify_kevent_flags & AIO_KEVENT_FLAG_REAP) {
+ /*
+ * When asynchronous reap is requested, "data" is set to either
+ * the error or the result, with a flag to say which it is.
+ * The user does not have to call aio_return().
+ */
+ if (job->uaiocb._aiocb_private.error > 0) {
+ kn->kn_flags |= EV_ERROR;
+ kn->kn_data = job->uaiocb._aiocb_private.error;
+ } else {
+ kn->kn_data = job->uaiocb._aiocb_private.status;
+ }
+ } else {
+ /*
+ * The user must call aio_return() to fetch the result and free
+ * kernel resources. To avoid the need to call aio_error(),
+ * the error is written into "data".
+ *
+ * XXX This behaviour is old but undocumented.
+ */
+ kn->kn_data = job->uaiocb._aiocb_private.error;
+ }
+
if (!(job->jobflags & KAIOCB_FINISHED))
return (0);
+
kn->kn_flags |= EV_EOF;
return (1);
}
diff --git a/sys/sys/aio.h b/sys/sys/aio.h
--- a/sys/sys/aio.h
+++ b/sys/sys/aio.h
@@ -112,6 +112,13 @@
#define aio_iov aio_buf /* I/O scatter/gather list */
#define aio_iovcnt aio_nbytes /* Length of aio_iov */
+/*
+ * A value used in aio_sigevent.sigev_kevent_flags to activate asynchronous
+ * reaping of completion events, with the result stored in kevent's data member
+ * and EV_ERROR flag.
+ */
+#define AIO_KEVENT_FLAG_REAP 0x4000 /* == EV_FLAG2 */
+
#ifdef _KERNEL
typedef void aio_cancel_fn_t(struct kaiocb *);
@@ -150,6 +157,7 @@
uint64_t seqno; /* (*) job number */
aio_cancel_fn_t *cancel_fn; /* (a) backend cancel function */
aio_handle_fn_t *handle_fn; /* (c) backend handle function */
+ struct aiocb_ops *ops; /* (a) ops for writing to user space */
union { /* Backend-specific data fields */
struct { /* BIO backend */
int nbio; /* Number of remaining bios */
diff --git a/tests/sys/aio/aio_test.c b/tests/sys/aio/aio_test.c
--- a/tests/sys/aio/aio_test.c
+++ b/tests/sys/aio/aio_test.c
@@ -1980,6 +1980,142 @@
close(fd);
}
+ATF_TC_WITHOUT_HEAD(aio_kqueue_noreap);
+ATF_TC_BODY(aio_kqueue_noreap, tc)
+{
+ int kq;
+ int pipe_fds[2];
+ struct aiocb iocb;
+ struct aiocb *piocb;
+ char buffer[] = "hello world";
+ struct kevent kev;
+ struct timespec timeout = {0, 0};
+
+ ATF_REQUIRE_KERNEL_MODULE("aio");
+ ATF_REQUIRE_UNSAFE_AIO();
+
+ kq = kqueue();
+ ATF_REQUIRE(kq >= 0);
+ ATF_REQUIRE_EQ(0, pipe(pipe_fds));
+
+ /* Submit a write that will succeed. */
+ memset(&iocb, 0, sizeof(iocb));
+ iocb.aio_sigevent.sigev_notify = SIGEV_KEVENT;
+ iocb.aio_sigevent.sigev_notify_kqueue = kq;
+ iocb.aio_fildes = pipe_fds[1];
+ iocb.aio_buf = buffer;
+ iocb.aio_nbytes = sizeof(buffer);
+ ATF_REQUIRE_EQ(0, aio_write(&iocb));
+
+ /* Wait for completion. */
+ ATF_REQUIRE_EQ(1, kevent(kq, NULL, 0, &kev, 1, NULL));
+ ATF_REQUIRE_EQ((uintptr_t) &iocb, kev.ident);
+ ATF_REQUIRE_EQ(0, kev.flags & EV_ERROR);
+ ATF_REQUIRE_EQ(0, kev.data); /* undocumented */
+ ATF_REQUIRE_EQ(0, aio_error(&iocb));
+
+ /* Reap the IO explicitly. */
+ ATF_REQUIRE_EQ(sizeof(buffer), aio_waitcomplete(&piocb, &timeout));
+ ATF_REQUIRE_EQ(&iocb, piocb);
+
+ /* Nothing left in the kernel's queue. */
+ ATF_REQUIRE_EQ(-1, aio_waitcomplete(&piocb, &timeout));
+ ATF_REQUIRE_EQ(NULL, piocb);
+
+ /* Submit a write that will fail. */
+ close(pipe_fds[0]);
+ memset(&iocb, 0, sizeof(iocb));
+ iocb.aio_sigevent.sigev_notify = SIGEV_KEVENT;
+ iocb.aio_sigevent.sigev_notify_kqueue = kq;
+ iocb.aio_fildes = pipe_fds[1];
+ iocb.aio_buf = buffer;
+ iocb.aio_nbytes = sizeof(buffer);
+ ATF_REQUIRE_EQ(0, aio_write(&iocb));
+
+ /* Wait for completion. */
+ ATF_REQUIRE_EQ(1, kevent(kq, NULL, 0, &kev, 1, NULL));
+ ATF_REQUIRE_EQ((uintptr_t) &iocb, kev.ident);
+ ATF_REQUIRE_EQ(0, kev.flags & EV_ERROR);
+ ATF_REQUIRE_EQ(EPIPE, kev.data); /* undocumented */
+ ATF_REQUIRE_EQ(EPIPE, aio_error(&iocb));
+
+ /* Reap the IO explicitly. */
+ ATF_REQUIRE_EQ(-1, aio_waitcomplete(&piocb, &timeout));
+ ATF_REQUIRE_EQ(&iocb, piocb);
+ ATF_REQUIRE_EQ(EPIPE, errno);
+
+ /* Nothing left in the kernel's queue. */
+ ATF_REQUIRE_EQ(-1, aio_waitcomplete(&piocb, &timeout));
+ ATF_REQUIRE_EQ(NULL, piocb);
+
+ close(pipe_fds[1]);
+ close(kq);
+}
+
+ATF_TC_WITHOUT_HEAD(aio_kqueue_reap);
+ATF_TC_BODY(aio_kqueue_reap, tc)
+{
+ int kq;
+ int pipe_fds[2];
+ struct aiocb iocb;
+ struct aiocb *piocb;
+ char buffer[] = "hello world";
+ struct kevent kev;
+ struct timespec timeout = {0, 0};
+
+ ATF_REQUIRE_KERNEL_MODULE("aio");
+ ATF_REQUIRE_UNSAFE_AIO();
+
+ kq = kqueue();
+ ATF_REQUIRE(kq >= 0);
+ ATF_REQUIRE_EQ(0, pipe(pipe_fds));
+
+ /* Submit a write that will succeed. */
+ memset(&iocb, 0, sizeof(iocb));
+ iocb.aio_sigevent.sigev_notify = SIGEV_KEVENT;
+ iocb.aio_sigevent.sigev_notify_kqueue = kq;
+ iocb.aio_sigevent.sigev_notify_kevent_flags = AIO_KEVENT_FLAG_REAP;
+ iocb.aio_fildes = pipe_fds[1];
+ iocb.aio_buf = buffer;
+ iocb.aio_nbytes = sizeof(buffer);
+ ATF_REQUIRE_EQ(0, aio_write(&iocb));
+
+ /* Wait for completion. */
+ ATF_REQUIRE_EQ(1, kevent(kq, NULL, 0, &kev, 1, NULL));
+ ATF_REQUIRE_EQ((uintptr_t) &iocb, kev.ident);
+ ATF_REQUIRE_EQ(0, kev.flags & EV_ERROR);
+ ATF_REQUIRE_EQ(sizeof(buffer), kev.data);
+ ATF_REQUIRE_EQ(0, aio_error(&iocb));
+
+ /* Nothing left in the kernel's queue. */
+ ATF_REQUIRE_EQ(-1, aio_waitcomplete(&piocb, &timeout));
+ ATF_REQUIRE_EQ(NULL, piocb);
+
+ /* Submit a write that will fail. */
+ close(pipe_fds[0]);
+ memset(&iocb, 0, sizeof(iocb));
+ iocb.aio_sigevent.sigev_notify = SIGEV_KEVENT;
+ iocb.aio_sigevent.sigev_notify_kqueue = kq;
+ iocb.aio_sigevent.sigev_notify_kevent_flags = AIO_KEVENT_FLAG_REAP;
+ iocb.aio_fildes = pipe_fds[1];
+ iocb.aio_buf = buffer;
+ iocb.aio_nbytes = sizeof(buffer);
+ ATF_REQUIRE_EQ(0, aio_write(&iocb));
+
+ /* Wait for completion. */
+ ATF_REQUIRE_EQ(1, kevent(kq, NULL, 0, &kev, 1, NULL));
+ ATF_REQUIRE_EQ((uintptr_t) &iocb, kev.ident);
+ ATF_REQUIRE_EQ(EV_ERROR, kev.flags & EV_ERROR);
+ ATF_REQUIRE_EQ(EPIPE, kev.data);
+ ATF_REQUIRE_EQ(EPIPE, aio_error(&iocb));
+
+ /* Nothing left in the kernel's queue. */
+ ATF_REQUIRE_EQ(-1, aio_waitcomplete(&piocb, &timeout));
+ ATF_REQUIRE_EQ(NULL, piocb);
+
+ close(pipe_fds[1]);
+ close(kq);
+}
ATF_TP_ADD_TCS(tp)
{
@@ -2038,6 +2174,8 @@
ATF_TP_ADD_TC(tp, vectored_socket_poll);
ATF_TP_ADD_TC(tp, vectored_thread);
ATF_TP_ADD_TC(tp, aio_threadexit);
+ ATF_TP_ADD_TC(tp, aio_kqueue_noreap);
+ ATF_TP_ADD_TC(tp, aio_kqueue_reap);
return (atf_no_error());
}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Fri, Apr 10, 2:14 PM (1 h, 22 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
28315332
Default Alt Text
D33144.1775830493.diff (14 KB)
Attached To
Mode
D33144: Reap AIO completions asynchronously when using kqueue.
Attached
Detach File
Event Timeline
Log In to Comment