From 84af4da65a1c18bb7458050598be5fd602f1fa01 Mon Sep 17 00:00:00 2001 From: John Dyson Date: Sun, 30 Nov 1997 04:36:31 +0000 Subject: [PATCH] Finish up the vast majority of the AIO/LIO functionality. Proper signal support was missing in the previous version of the AIO code. More tunables added, and very efficient support for VCHR files has been added. Kernel threads are not used for VCHR files, all work for such files is done for the requesting process directly. Some attempt has been made to charge the requesting process for resource utilization, but more work is needed. aio_fsync is still missing (but the original fsync system call can be used for now.) aio_cancel is essentially a noop, but that is okay per POSIX. More aio_cancel functionality can be added later, if it is found to be needed. The functions implemented include: aio_read, aio_write, lio_listio, aio_error, aio_return, aio_cancel, aio_suspend. The code has been implemented to support the POSIX spec 1003.1b (formerly known as POSIX 1003.4 spec) features of the above. The async I/O features are truly async, with the VCHR mode of operation being essentially the same as physio (for appropriate files) for maximum efficiency. This code also supports the signal capability, is highly tunable, allowing management of resource usage, and has been written to allow a per process usage quota. Both the O'Reilly POSIX.4 book and the actual POSIX 1003.1b document were the reference specs used. Any filedescriptor can be used with these new system calls. I know of no exceptions where these system calls will not work. (TTY's will also probably work.) --- sys/kern/vfs_aio.c | 658 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 552 insertions(+), 106 deletions(-) diff --git a/sys/kern/vfs_aio.c b/sys/kern/vfs_aio.c index bbd08cb6c71b..c1f619a97e1b 100644 --- a/sys/kern/vfs_aio.c +++ b/sys/kern/vfs_aio.c @@ -13,20 +13,11 @@ * bad that happens because of using this software isn't the responsibility * of the author. This software is distributed AS-IS. * - * $Id: vfs_aio.c,v 1.12 1997/11/29 01:33:07 dyson Exp $ + * $Id: vfs_aio.c,v 1.13 1997/11/29 02:57:46 dyson Exp $ */ /* - * This file contains support for the POSIX.4 AIO facility. - * - * The initial version provides only the (bogus) synchronous semantics - * but will support async in the future. Note that a bit - * in a private field allows the user mode subroutine to adapt - * the kernel operations to true POSIX.4 for future compatibility. - * - * This code is used to support true POSIX.4 AIO/LIO with the help - * of a user mode subroutine package. Note that eventually more support - * will be pushed into the kernel. + * This file contains support for the POSIX.4 AIO/LIO facility. */ #include @@ -59,16 +50,12 @@ #include -#define AIOCBLIST_CANCELLED 0x1 -#define AIOCBLIST_RUNDOWN 0x4 -#define AIOCBLIST_ASYNCFREE 0x8 - #if 0 #define DEBUGAIO #define DIAGNOSTIC #endif -#define DEBUGAIO 1 +#define DEBUGAIO 0 static int jobrefid; @@ -80,12 +67,37 @@ static int jobrefid; #define JOBST_JOBQBUF 0x5 #define JOBST_JOBBFINISHED 0x6 +#ifndef MAX_AIO_PER_PROC #define MAX_AIO_PER_PROC 32 +#endif + +#ifndef MAX_AIO_QUEUE_PER_PROC #define MAX_AIO_QUEUE_PER_PROC 256 /* Bigger than AIO_LISTIO_MAX */ +#endif + +#ifndef MAX_AIO_PROCS #define MAX_AIO_PROCS 32 +#endif + +#ifndef MAX_AIO_QUEUE #define MAX_AIO_QUEUE 1024 /* Bigger than AIO_LISTIO_MAX */ -#define TARGET_AIO_PROCS 16 -#define MAX_AIO_BALLOW_PER_PROC 16 +#endif + +#ifndef TARGET_AIO_PROCS +#define TARGET_AIO_PROCS 0 +#endif + +#ifndef MAX_BUF_AIO +#define MAX_BUF_AIO 16 +#endif + +#ifndef AIOD_TIMEOUT_DEFAULT +#define AIOD_TIMEOUT_DEFAULT (10 * hz) +#endif + +#ifndef AIOD_LIFETIME_DEFAULT +#define AIOD_LIFETIME_DEFAULT (30 * hz) +#endif int max_aio_procs = MAX_AIO_PROCS; int num_aio_procs = 0; @@ -94,11 +106,13 @@ int max_queue_count = MAX_AIO_QUEUE; int num_queue_count = 0; int num_buf_aio = 0; int num_aio_resv_start = 0; +int aiod_timeout; +int aiod_lifetime; int max_aio_per_proc = MAX_AIO_PER_PROC, max_aio_queue_per_proc=MAX_AIO_QUEUE_PER_PROC; -int max_aio_ballow_per_proc = MAX_AIO_BALLOW_PER_PROC; +int max_buf_aio = MAX_BUF_AIO; SYSCTL_NODE(_vfs, OID_AUTO, aio, CTLFLAG_RW, 0, "AIO mgmt"); @@ -123,12 +137,19 @@ SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_queue, SYSCTL_INT(_vfs_aio, OID_AUTO, target_aio_procs, CTLFLAG_RW, &target_aio_procs, 0, ""); -SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_ballow_per_proc, - CTLFLAG_RW, &max_aio_ballow_per_proc, 0, ""); +SYSCTL_INT(_vfs_aio, OID_AUTO, max_buf_aio, + CTLFLAG_RW, &max_buf_aio, 0, ""); SYSCTL_INT(_vfs_aio, OID_AUTO, num_buf_aio, CTLFLAG_RD, &num_buf_aio, 0, ""); +SYSCTL_INT(_vfs_aio, OID_AUTO, aiod_lifetime, + CTLFLAG_RW, &aiod_lifetime, 0, ""); + +SYSCTL_INT(_vfs_aio, OID_AUTO, aiod_timeout, + CTLFLAG_RW, &aiod_timeout, 0, ""); + + #if DEBUGAIO > 0 static int debugaio; SYSCTL_INT(_vfs_aio, OID_AUTO, debugaio, CTLFLAG_RW, &debugaio, 0, ""); @@ -136,10 +157,17 @@ SYSCTL_INT(_vfs_aio, OID_AUTO, debugaio, CTLFLAG_RW, &debugaio, 0, ""); #define DEBUGFLOW (debugaio & 0xff) #define DEBUGREQ ((debugaio & 0xff00) >> 8) +#define DEBUGCHR (debugaio & 0x10000) /* * Job queue item */ + +#define AIOCBLIST_CANCELLED 0x1 +#define AIOCBLIST_RUNDOWN 0x4 +#define AIOCBLIST_ASYNCFREE 0x8 +#define AIOCBLIST_DONE 0x10 + struct aiocblist { TAILQ_ENTRY (aiocblist) list; /* List of jobs */ TAILQ_ENTRY (aiocblist) plist; /* List of jobs for proc */ @@ -149,15 +177,17 @@ struct aiocblist { struct buf *bp; /* buffer pointer */ struct proc *userproc; /* User process */ struct aioproclist *jobaioproc; /* AIO process descriptor */ + struct aio_liojob *lio; /* optional lio job */ struct aiocb uaiocb; /* Kernel I/O control block */ }; -#define AIOP_FREE 0x1 /* proc on free queue */ -#define AIOP_SCHED 0x2 /* proc explicitly scheduled */ /* * AIO process info */ +#define AIOP_FREE 0x1 /* proc on free queue */ +#define AIOP_SCHED 0x2 /* proc explicitly scheduled */ + struct aioproclist { int aioprocflags; /* AIO proc flags */ TAILQ_ENTRY(aioproclist) list; /* List of processes */ @@ -165,6 +195,25 @@ struct aioproclist { TAILQ_HEAD (,aiocblist) jobtorun; /* suggested job to run */ }; +/* + * data-structure for lio signal management + */ +struct aio_liojob { + int lioj_flags; + int lioj_buffer_count; + int lioj_buffer_finished_count; + int lioj_queue_count; + int lioj_queue_finished_count; + struct sigevent lioj_signal; /* signal on all I/O done */ + TAILQ_ENTRY (aio_liojob) lioj_list; + struct kaioinfo *lioj_ki; +}; +#define LIOJ_SIGNAL 0x1 /* signal on all done (lio) */ +#define LIOJ_SIGNAL_POSTED 0x2 /* signal has been posted */ + +/* + * per process aio data structure + */ struct kaioinfo { int kaio_flags; /* per process kaio flags */ int kaio_maxactive_count; /* maximum number of AIOs */ @@ -172,20 +221,26 @@ struct kaioinfo { int kaio_qallowed_count; /* maxiumu size of AIO queue */ int kaio_queue_count; /* size of AIO queue */ int kaio_ballowed_count; /* maximum number of buffers */ + int kaio_queue_finished_count; /* number of daemon jobs finished */ int kaio_buffer_count; /* number of physio buffers */ + int kaio_buffer_finished_count; /* count of I/O done */ + struct proc *kaio_p; /* process that uses this kaio block */ + TAILQ_HEAD (,aio_liojob) kaio_liojoblist; /* list of lio jobs */ TAILQ_HEAD (,aiocblist) kaio_jobqueue; /* job queue for process */ TAILQ_HEAD (,aiocblist) kaio_jobdone; /* done queue for process */ TAILQ_HEAD (,aiocblist) kaio_bufqueue; /* buffer job queue for process */ TAILQ_HEAD (,aiocblist) kaio_bufdone; /* buffer done queue for process */ }; -#define KAIO_RUNDOWN 0x1 -#define KAIO_WAKEUP 0x2 +#define KAIO_RUNDOWN 0x1 /* process is being run down */ +#define KAIO_WAKEUP 0x2 /* wakeup process when there is a significant + event */ + TAILQ_HEAD (,aioproclist) aio_freeproc, aio_activeproc; TAILQ_HEAD(,aiocblist) aio_jobs; /* Async job list */ TAILQ_HEAD(,aiocblist) aio_bufjobs; /* Phys I/O job list */ -TAILQ_HEAD(,aiocblist) aio_freejobs; +TAILQ_HEAD(,aiocblist) aio_freejobs; /* Pool of free jobs */ static void aio_init_aioinfo(struct proc *p) ; static void aio_onceonly(void *) ; @@ -200,7 +255,8 @@ static void aio_daemon(void *uproc); SYSINIT(aio, SI_SUB_VFS, SI_ORDER_ANY, aio_onceonly, NULL); -static vm_zone_t kaio_zone=0, aiop_zone=0, aiocb_zone=0, aiol_zone=0; +static vm_zone_t kaio_zone=0, aiop_zone=0, + aiocb_zone=0, aiol_zone=0, aiolio_zone=0; /* * Single AIOD vmspace shared amongst all of them @@ -222,11 +278,16 @@ aio_onceonly(void *na) aiop_zone = zinit("AIOP", sizeof (struct aioproclist), 0, 0, 1); aiocb_zone = zinit("AIOCB", sizeof (struct aiocblist), 0, 0, 1); aiol_zone = zinit("AIOL", AIO_LISTIO_MAX * sizeof (int), 0, 0, 1); + aiolio_zone = zinit("AIOLIO", + AIO_LISTIO_MAX * sizeof (struct aio_liojob), 0, 0, 1); + aiod_timeout = AIOD_TIMEOUT_DEFAULT; + aiod_lifetime = AIOD_LIFETIME_DEFAULT; jobrefid = 1; } /* * Init the per-process aioinfo structure. + * The aioinfo limits are set per-process for user limit (resource) management. */ void aio_init_aioinfo(struct proc *p) @@ -235,16 +296,20 @@ aio_init_aioinfo(struct proc *p) if (p->p_aioinfo == NULL) { ki = zalloc(kaio_zone); p->p_aioinfo = ki; + ki->kaio_flags = 0; ki->kaio_maxactive_count = max_aio_per_proc; ki->kaio_active_count = 0; ki->kaio_qallowed_count = max_aio_queue_per_proc; ki->kaio_queue_count = 0; - ki->kaio_ballowed_count = max_aio_ballow_per_proc; + ki->kaio_ballowed_count = max_buf_aio; ki->kaio_buffer_count = 0; + ki->kaio_buffer_finished_count = 0; + ki->kaio_p = p; TAILQ_INIT(&ki->kaio_jobdone); TAILQ_INIT(&ki->kaio_jobqueue); TAILQ_INIT(&ki->kaio_bufdone); TAILQ_INIT(&ki->kaio_bufqueue); + TAILQ_INIT(&ki->kaio_liojoblist); } } @@ -258,6 +323,7 @@ aio_free_entry(struct aiocblist *aiocbe) { struct kaioinfo *ki; struct aioproclist *aiop; + struct aio_liojob *lj; struct proc *p; int error; @@ -266,6 +332,7 @@ aio_free_entry(struct aiocblist *aiocbe) p = aiocbe->userproc; ki = p->p_aioinfo; + lj = aiocbe->lio; if (ki == NULL) panic("aio_free_entry: missing p->p_aioinfo"); @@ -283,8 +350,15 @@ aio_free_entry(struct aiocblist *aiocbe) if (num_queue_count <= 0) panic("aio_free_entry: system wide queue size <= 0"); - --ki->kaio_queue_count; - --num_queue_count; + if(lj) { + lj->lioj_queue_count--; + if (aiocbe->jobflags & AIOCBLIST_DONE) + lj->lioj_queue_finished_count--; + } + ki->kaio_queue_count--; + if (aiocbe->jobflags & AIOCBLIST_DONE) + ki->kaio_queue_finished_count--; + num_queue_count--; #if DEBUGAIO > 0 if (DEBUGFLOW > 0) @@ -292,8 +366,15 @@ aio_free_entry(struct aiocblist *aiocbe) ki->kaio_queue_count, num_queue_count); #endif } else { - --ki->kaio_buffer_count; - --num_buf_aio; + if(lj) { + lj->lioj_buffer_count--; + if (aiocbe->jobflags & AIOCBLIST_DONE) + lj->lioj_buffer_finished_count--; + } + if (aiocbe->jobflags & AIOCBLIST_DONE) + ki->kaio_buffer_finished_count--; + ki->kaio_buffer_count--; + num_buf_aio--; #if DEBUGAIO > 0 if (DEBUGFLOW > 0) @@ -308,7 +389,7 @@ aio_free_entry(struct aiocblist *aiocbe) ki->kaio_flags &= ~KAIO_WAKEUP; wakeup(p); } - + if ( aiocbe->jobstate == JOBST_JOBQBUF) { if ((error = aio_fphysio(p, aiocbe, 1)) != 0) return error; @@ -324,6 +405,15 @@ aio_free_entry(struct aiocblist *aiocbe) TAILQ_REMOVE(&ki->kaio_jobdone, aiocbe, plist); } else if ( aiocbe->jobstate == JOBST_JOBBFINISHED) { TAILQ_REMOVE(&ki->kaio_bufdone, aiocbe, plist); + if (aiocbe->bp) { + vunmapbuf(aiocbe->bp); + relpbuf(aiocbe->bp); + aiocbe->bp = NULL; + } + } + if (lj && (lj->lioj_buffer_count == 0) && (lj->lioj_queue_count == 0)) { + TAILQ_REMOVE(&ki->kaio_liojoblist, lj, lioj_list); + zfree(aiolio_zone, lj); } TAILQ_INSERT_HEAD(&aio_freejobs, aiocbe, list); aiocbe->jobstate = JOBST_NULL; @@ -336,16 +426,20 @@ aio_free_entry(struct aiocblist *aiocbe) void aio_proc_rundown(struct proc *p) { + int s; struct kaioinfo *ki; + struct aio_liojob *lj, *ljn; struct aiocblist *aiocbe, *aiocbn; ki = p->p_aioinfo; if (ki == NULL) return; - while ((ki->kaio_active_count > 0) || (ki->kaio_buffer_count > 0)) { + ki->kaio_flags |= LIOJ_SIGNAL_POSTED; + while ((ki->kaio_active_count > 0) || + (ki->kaio_buffer_count > ki->kaio_buffer_finished_count)) { ki->kaio_flags |= KAIO_RUNDOWN; - if (tsleep(p, PRIBIO, "kaiowt", 20 * hz)) + if (tsleep(p, PRIBIO, "kaiowt", aiod_timeout)) break; } @@ -372,6 +466,47 @@ aio_proc_rundown(struct proc *p) if (aio_free_entry(aiocbe)) goto restart2; } + +restart3: + s = splbio(); + while (TAILQ_FIRST(&ki->kaio_bufqueue)) { + ki->kaio_flags |= KAIO_WAKEUP; + tsleep (p, PRIBIO, "aioprn", 0); + splx(s); + goto restart3; + } + +restart4: + s = splbio(); + for ( aiocbe = TAILQ_FIRST(&ki->kaio_bufdone); + aiocbe; + aiocbe = aiocbn) { + aiocbn = TAILQ_NEXT(aiocbe, plist); + if (aio_free_entry(aiocbe)) { + splx(s); + goto restart4; + } + } + splx(s); + + for ( lj = TAILQ_FIRST(&ki->kaio_liojoblist); + lj; + lj = ljn) { + ljn = TAILQ_NEXT(lj, lioj_list); + if ((lj->lioj_buffer_count == 0) && (lj->lioj_queue_count == 0)) { + TAILQ_REMOVE(&ki->kaio_liojoblist, lj, lioj_list); + zfree(aiolio_zone, lj); + } + } + + for ( lj = TAILQ_FIRST(&ki->kaio_liojoblist); + lj; + lj = ljn) { + printf("LIO job not cleaned up: B:%d, BF:%d, Q:%d, QF:%d\n", + lj->lioj_buffer_count, lj->lioj_buffer_finished_count, + lj->lioj_queue_count, lj->lioj_queue_finished_count); + } + zfree(kaio_zone, ki); p->p_aioinfo = NULL; } @@ -514,11 +649,13 @@ error, cnt, cnt - auio.uio_resid, auio.uio_resid, (int) offset & 0xffffffff, } /* - * The AIO daemon. + * The AIO daemon, most of the actual work is done in aio_process, + * but the setup (and address space mgmt) is done in this routine. */ static void aio_daemon(void *uproc) { + int s; struct aioproclist *aiop; struct vmspace *myvm, *aiovm; struct proc *mycp; @@ -539,7 +676,7 @@ aio_daemon(void *uproc) */ if ((aiovm = aiovmspace) == NULL) { aiovmspace = myvm; - ++myvm->vm_refcnt; + myvm->vm_refcnt++; /* * Remove userland cruft from address space. */ @@ -551,7 +688,7 @@ aio_daemon(void *uproc) myvm->vm_dsize = 0; myvm->vm_ssize = 0; } else { - ++aiovm->vm_refcnt; + aiovm->vm_refcnt++; mycp->p_vmspace = aiovm; pmap_activate(mycp); vmspace_free(myvm); @@ -644,6 +781,7 @@ aio_daemon(void *uproc) struct proc *userp; struct aiocb *cb; struct kaioinfo *ki; + struct aio_liojob *lj; cb = &aiocbe->uaiocb; userp = aiocbe->userproc; @@ -663,7 +801,7 @@ aio_daemon(void *uproc) * Point to the new user address space, and refer to it. */ mycp->p_vmspace = userp->p_vmspace; - ++mycp->p_vmspace->vm_refcnt; + mycp->p_vmspace->vm_refcnt++; /* * Activate the new mapping. */ @@ -689,6 +827,11 @@ aio_daemon(void *uproc) } ki = userp->p_aioinfo; + lj = aiocbe->lio; + + /* + * Account for currently active jobs + */ ki->kaio_active_count++; #if DEBUGAIO > 0 if (DEBUGFLOW > 0) @@ -696,11 +839,28 @@ aio_daemon(void *uproc) cb->_aiocb_private.kernelinfo, userp->p_pid, ki->kaio_active_count, ki->kaio_queue_count); #endif + /* + * Do the I/O function + */ aiocbe->jobaioproc = aiop; aio_process(aiocbe); - --ki->kaio_active_count; + + /* + * decrement the active job count + */ + ki->kaio_active_count--; + + /* + * increment the completion count for wakeup/signal comparisons + */ + aiocbe->jobflags |= AIOCBLIST_DONE; + ki->kaio_queue_finished_count++; + if (lj) { + lj->lioj_queue_finished_count++; + } if ((ki->kaio_flags & KAIO_WAKEUP) || - (ki->kaio_flags & KAIO_RUNDOWN) && (ki->kaio_active_count == 0)) { + (ki->kaio_flags & KAIO_RUNDOWN) && + (ki->kaio_active_count == 0)) { ki->kaio_flags &= ~KAIO_WAKEUP; wakeup(userp); } @@ -711,6 +871,17 @@ aio_daemon(void *uproc) userp->p_pid, ki->kaio_active_count, ki->kaio_queue_count); #endif + s = splbio(); + if (lj && (lj->lioj_flags & (LIOJ_SIGNAL|LIOJ_SIGNAL_POSTED)) == + LIOJ_SIGNAL) { + if ((lj->lioj_queue_finished_count == lj->lioj_queue_count) && + (lj->lioj_buffer_finished_count == lj->lioj_buffer_count)) { + psignal(userp, lj->lioj_signal.sigev_signo); + lj->lioj_flags |= LIOJ_SIGNAL_POSTED; + } + } + splx(s); + aiocbe->jobstate = JOBST_JOBFINISHED; /* @@ -795,13 +966,14 @@ aio_daemon(void *uproc) * freeing resources. */ if (((aiop->aioprocflags & AIOP_SCHED) == 0) && - tsleep(mycp, PRIBIO, "aiordy", hz*10)) { + tsleep(mycp, PRIBIO, "aiordy", aiod_lifetime)) { if ((TAILQ_FIRST(&aio_jobs) == NULL) && (TAILQ_FIRST(&aiop->jobtorun) == NULL)) { - if (aiop->aioprocflags & AIOP_FREE) { + if ((aiop->aioprocflags & AIOP_FREE) && + (num_aio_procs > target_aio_procs)) { TAILQ_REMOVE(&aio_freeproc, aiop, list); zfree(aiop_zone, aiop); - --num_aio_procs; + num_aio_procs--; #if DEBUGAIO > 0 if (DEBUGFLOW > 2) printf("AIOD: Daemon exiting -- %d\n", mycp->p_pid); @@ -817,7 +989,8 @@ aio_daemon(void *uproc) } /* - * Create a new AIO daemon. + * Create a new AIO daemon. This is mostly a kernel-thread fork routine. + * The AIO daemon modifies it's environment itself. */ static int aio_newproc() @@ -845,20 +1018,22 @@ aio_newproc() * Wait until daemon is started, but continue on just in case (to * handle error conditions. */ - error = tsleep(np, PZERO, "aiosta", 5*hz); - ++num_aio_procs; + error = tsleep(np, PZERO, "aiosta", aiod_timeout); + num_aio_procs++; return error; } /* - * Try the high-performance physio method for eligible VCHR devices + * Try the high-performance physio method for eligible VCHR devices. This + * routine doesn't require the use of any additional threads, and have + * overhead. */ int -aio_qphysio(p, iocb) +aio_qphysio(p, aiocbe) struct proc *p; - struct aiocblist *iocb; + struct aiocblist *aiocbe; { int error; caddr_t sa; @@ -866,10 +1041,10 @@ aio_qphysio(p, iocb) struct file *fp; struct buf *bp; int bflags; - struct aiocblist *aiocbe; struct vnode *vp; struct kaioinfo *ki; struct filedesc *fdp; + struct aio_liojob *lj; int fd; int majordev; int s; @@ -877,42 +1052,123 @@ aio_qphysio(p, iocb) dev_t dev; int rw; d_strategy_t *fstrategy; - return -1; + struct cdevsw *cdev; + struct bdevsw *bdev; - cb = &iocb->uaiocb; - if (cb->aio_nbytes > MAXPHYS) + cb = &aiocbe->uaiocb; + if ((cb->aio_nbytes > MAXPHYS) && (num_buf_aio >= max_buf_aio)) { +#if DEBUGAIO > 0 + if (DEBUGCHR) { + printf("AIOP: failed CHR criteria: aio_nbytes: %d, num_buf_aio: %d\n", + cb->aio_nbytes, num_buf_aio); + } +#endif return -1; + } fdp = p->p_fd; fd = cb->aio_fildes; fp = fdp->fd_ofiles[fd]; + lj = aiocbe->lio; - if (fp->f_type != DTYPE_VNODE) + if (fp->f_type != DTYPE_VNODE) { +#if DEBUGAIO > 0 + if (DEBUGCHR) { + printf("AIOP: failed CHR criteria: type != DTYPE_VNODE\n"); + } +#endif return -1; + } vp = (struct vnode *)fp->f_data; - if (vp->v_type != VCHR || ((cb->aio_nbytes & (DEV_BSIZE - 1)) != 0)) + if (vp->v_type != VCHR || ((cb->aio_nbytes & (DEV_BSIZE - 1)) != 0)) { +#if DEBUGAIO > 0 + if (DEBUGCHR) { + printf("AIOP: failed CHR criteria: v_type: %d, nbytes: 0x%x\n", + vp->v_type, cb->aio_nbytes); + } +#endif return -1; + } - if ((vp->v_specinfo == NULL) || (vp->v_flag & VISTTY)) + if ((vp->v_specinfo == NULL) || (vp->v_flag & VISTTY)) { +#if DEBUGAIO > 0 + if (DEBUGCHR) { + printf("AIOP: failed CHR criteria: v_specinfo: %x, istty: %x\n", + vp->v_specinfo, (vp->v_flag & VISTTY)); + } +#endif return -1; + } majordev = major(vp->v_rdev); - if (majordev == NODEV) + if (majordev == NODEV) { +#if DEBUGAIO > 0 + if (DEBUGCHR) { + printf("AIOP: failed CHR criteria, NODEV?: 0x%x\n", vp->v_rdev); + } +#endif return -1; + } - if (chrtoblk(majordev) == NODEV) + cdev = cdevsw[major(vp->v_rdev)]; + if (cdev == NULL) { +#if DEBUGAIO > 0 + if (DEBUGCHR) { + printf("AIOP: failed CHR criteria: cdevsw entry missing\n"); + } +#endif return -1; + } + bdev = cdev->d_bdev; + if (bdev == NULL) { +#if DEBUGIO > 0 + if (DEBUGCHR) { + printf("AIOP: failed CHR criteria: bdevsw entry missing\n"); + } +#endif + return -1; + } ki = p->p_aioinfo; - if (ki->kaio_buffer_count >= ki->kaio_ballowed_count) + if (ki->kaio_buffer_count >= ki->kaio_ballowed_count) { +#if DEBUGAIO > 0 + if (DEBUGCHR) { + printf("AIOP: failed CHR criteria, buffer_count(%d) > ballowed_count(%d)\n", + ki->kaio_buffer_count, ki->kaio_ballowed_count); + } +#endif return -1; + } cnt = cb->aio_nbytes; - if (cnt > MAXPHYS) + if (cnt > MAXPHYS) { +#if DEBUGAIO > 0 + if (DEBUGCHR) { + printf("AIOP: failed CHR criteria, cnt(%d) > MAXPHYS\n", cnt); + } +#endif return -1; + } + + dev = makedev(bdev->d_maj, minor(vp->v_rdev)); +#if DEBUGAIO > 0 + if (DEBUGCHR) { + printf("AIOP: device: %x\n", dev); + } +#endif + + /* + * Physical I/O is charged directly to the process, so we don't have + * to fake it. + */ + aiocbe->inputcharge = 0; + aiocbe->outputcharge = 0; ki->kaio_buffer_count++; + if (lj) { + lj->lioj_buffer_count++; + } /* create and build a buffer header for a transfer */ bp = (struct buf *)getpbuf(); @@ -938,7 +1194,6 @@ aio_qphysio(p, iocb) bp->b_iodone = aio_physwakeup; bp->b_saveaddr = bp->b_data; bp->b_data = cb->aio_buf; - bp->b_blkno = btodb(cb->aio_offset); if (rw && !useracc(bp->b_data, bp->b_bufsize, B_WRITE)) { @@ -953,37 +1208,58 @@ aio_qphysio(p, iocb) /* bring buffer into kernel space */ vmapbuf(bp); + s = splbio(); aiocbe->bp = bp; bp->b_spc = (void *)aiocbe; TAILQ_INSERT_TAIL(&aio_bufjobs, aiocbe, list); - TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, aiocbe, plist); + TAILQ_INSERT_TAIL(&ki->kaio_bufqueue, aiocbe, plist); aiocbe->jobstate = JOBST_JOBQBUF; - ++num_buf_aio; - fstrategy = cdevsw[major(dev)]->d_strategy; + cb->_aiocb_private.status = cb->aio_nbytes; + num_buf_aio++; + fstrategy = bdev->d_strategy; bp->b_error = 0; + splx(s); /* perform transfer */ (*fstrategy)(bp); if (bp->b_error || (bp->b_flags & B_ERROR)) { + s = splbio(); error = bp->b_error; TAILQ_REMOVE(&aio_bufjobs, aiocbe, list); - TAILQ_REMOVE(&ki->kaio_jobqueue, aiocbe, plist); + TAILQ_REMOVE(&ki->kaio_bufqueue, aiocbe, plist); aiocbe->bp = NULL; aiocbe->jobstate = JOBST_NULL; + splx(s); vunmapbuf(bp); relpbuf(bp); - --num_buf_aio; + num_buf_aio--; +#if DEBUGAIO > 0 + if (DEBUGCHR) { + printf("AIOP: error: %d\n", error); + } +#endif + ki->kaio_buffer_count--; + if (lj) { + lj->lioj_buffer_count--; + } return error; } return 0; doerror: ki->kaio_buffer_count--; + if (lj) { + lj->lioj_buffer_count--; + } + aiocbe->bp = NULL; relpbuf(bp); return error; } +/* + * This waits/tests physio completion. + */ int aio_fphysio(p, iocb, flgwait) struct proc *p; @@ -1005,7 +1281,7 @@ aio_fphysio(p, iocb, flgwait) } while ((bp->b_flags & B_DONE) == 0) { - if (tsleep((caddr_t)bp, PCATCH|PRIBIO, "physstr", 0)) { + if (tsleep((caddr_t)bp, PRIBIO, "physstr", aiod_timeout)) { if ((bp->b_flags & B_DONE) == 0) { splx(s); return EINPROGRESS; @@ -1032,10 +1308,11 @@ aio_fphysio(p, iocb, flgwait) } /* - * Queue a new AIO request. + * Queue a new AIO request. Choosing either the threaded or direct physio + * VCHR technique is done in this code. */ static int -_aio_aqueue(struct proc *p, struct aiocb *job, int type) +_aio_aqueue(struct proc *p, struct aiocb *job, struct aio_liojob *lj, int type) { struct filedesc *fdp; struct file *fp; @@ -1164,7 +1441,7 @@ _aio_aqueue(struct proc *p, struct aiocb *job, int type) if (DEBUGFLOW > 2) printf("aio_aqueue: New job: %d... ", jobrefid); #endif - ++jobrefid; + jobrefid++; if (jobrefid > INT_MAX) jobrefid = 1; @@ -1195,6 +1472,8 @@ _aio_aqueue(struct proc *p, struct aiocb *job, int type) aiocbe->uaiocb._aiocb_private.error = EINPROGRESS; aiocbe->userproc = p; aiocbe->jobflags = 0; + aiocbe->lio = lj; + ki = p->p_aioinfo; if ((error = aio_qphysio(p, aiocbe)) == 0) { return 0; @@ -1205,13 +1484,20 @@ _aio_aqueue(struct proc *p, struct aiocb *job, int type) return error; } - ki = p->p_aioinfo; - ++ki->kaio_queue_count; + /* + * No buffer for daemon I/O + */ + aiocbe->bp = NULL; + + ki->kaio_queue_count++; + if (lj) { + lj->lioj_queue_count++; + } TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, aiocbe, plist); TAILQ_INSERT_TAIL(&aio_jobs, aiocbe, list); aiocbe->jobstate = JOBST_JOBQGLOBAL; - ++num_queue_count; + num_queue_count++; #if DEBUGAIO > 0 if (DEBUGREQ) { printf("PROC %s, fd: %d, offset: 0x%x, address: 0x%x, size: %d\n", @@ -1241,10 +1527,10 @@ _aio_aqueue(struct proc *p, struct aiocb *job, int type) ki->kaio_maxactive_count)) { num_aio_resv_start++; if ((error = aio_newproc()) == 0) { - --num_aio_resv_start; + num_aio_resv_start--; goto retryproc; } - --num_aio_resv_start; + num_aio_resv_start--; } return error; } @@ -1268,7 +1554,7 @@ aio_aqueue(struct proc *p, struct aiocb *job, int type) if (ki->kaio_queue_count >= ki->kaio_qallowed_count) return EAGAIN; - return _aio_aqueue(p, job, type); + return _aio_aqueue(p, job, NULL, type); } /* @@ -1278,8 +1564,9 @@ aio_aqueue(struct proc *p, struct aiocb *job, int type) int aio_return(struct proc *p, struct aio_return_args *uap) { + int s; int jobref, status; - struct aiocblist *cb; + struct aiocblist *cb, *ncb; struct kaioinfo *ki; struct proc *userp; @@ -1321,6 +1608,20 @@ aio_return(struct proc *p, struct aio_return_args *uap) } } + s = splbio(); + for (cb = TAILQ_FIRST(&ki->kaio_bufdone); + cb; + cb = ncb) { + ncb = TAILQ_NEXT(cb, plist); + if (((int) cb->uaiocb._aiocb_private.kernelinfo) == jobref) { + splx(s); + p->p_retval[0] = cb->uaiocb._aiocb_private.status; + aio_free_entry(cb); + return 0; + } + } + splx(s); + #if DEBUGAIO > 0 if (DEBUGFLOW > 0) printf("(not found) status: %d, error: %d\n", @@ -1349,6 +1650,7 @@ aio_suspend(struct proc *p, struct aio_suspend_args *uap) struct kaioinfo *ki; struct aiocblist *cb; int i; + int njoblist; int error, s, timo; int *joblist; @@ -1386,29 +1688,43 @@ aio_suspend(struct proc *p, struct aio_suspend_args *uap) if (ki == NULL) return EAGAIN; + njoblist = 0; joblist = zalloc(aiol_zone); cbptr = uap->aiocbp; - for(i=0;inent;i++) { + for(i = 0; i < uap->nent; i++) { cbp = (struct aiocb *) fuword((caddr_t) &cbptr[i]); -#if DEBUGAIO > 1 + if (cbp == 0) + continue; +#if DEBUGAIO > 0 if (DEBUGFLOW > 2) printf("cbp: %x\n", cbp); #endif joblist[i] = fuword(&cbp->_aiocb_private.kernelinfo); + njoblist++; } - + if (njoblist == 0) + return 0; while (1) { for (cb = TAILQ_FIRST(&ki->kaio_jobdone); - cb; - cb = TAILQ_NEXT(cb, plist)) { - for(i=0;inent;i++) { + cb; cb = TAILQ_NEXT(cb, plist)) { + for(i = 0; i < njoblist; i++) { if (((int) cb->uaiocb._aiocb_private.kernelinfo) == joblist[i]) { -/* - printf("suspend(awake): %d, offset: %d\n", joblist[i], (int) cb->uaiocb.aio_offset & 0xffffffff); -*/ + zfree(aiol_zone, joblist); + return 0; + } + } + } + + s = splbio(); + for (cb = TAILQ_FIRST(&ki->kaio_bufdone); + cb; cb = TAILQ_NEXT(cb, plist)) { + for(i = 0; i < njoblist; i++) { + if (((int) cb->uaiocb._aiocb_private.kernelinfo) == + joblist[i]) { + splx(s); zfree(aiol_zone, joblist); return 0; } @@ -1418,20 +1734,21 @@ aio_suspend(struct proc *p, struct aio_suspend_args *uap) #if DEBUGAIO > 0 if (DEBUGFLOW > 0) { printf("Suspend, timeout: %d clocks, jobs:", timo); - for(i=0;inent;i++) + for(i=0;i 2) { printf("Suspending -- waiting for all I/O's to complete: "); - for(i=0;inent;i++) + for(i=0;ikaio_flags |= KAIO_WAKEUP; error = tsleep(p, PRIBIO|PCATCH, "aiospn", timo); + splx(s); if (error == EINTR) { #if DEBUGAIO > 0 @@ -1477,6 +1794,7 @@ aio_cancel(struct proc *p, struct aio_cancel_args *uap) int aio_error(struct proc *p, struct aio_error_args *uap) { + int s; struct aiocblist *cb; struct kaioinfo *ki; int jobref; @@ -1510,6 +1828,30 @@ aio_error(struct proc *p, struct aio_error_args *uap) } } + s = splbio(); + for (cb = TAILQ_FIRST(&ki->kaio_bufdone); + cb; + cb = TAILQ_NEXT(cb, plist)) { + if (((int) cb->uaiocb._aiocb_private.kernelinfo) == jobref) { + p->p_retval[0] = cb->uaiocb._aiocb_private.error; + splx(s); + return 0; + } + } + + for (cb = TAILQ_FIRST(&ki->kaio_bufqueue); + cb; + cb = TAILQ_NEXT(cb, plist)) { + + if (((int) cb->uaiocb._aiocb_private.kernelinfo) == jobref) { + p->p_retval[0] = EINPROGRESS; + splx(s); + return 0; + } + } + splx(s); + + /* * Hack for lio */ @@ -1536,7 +1878,7 @@ aio_read(struct proc *p, struct aio_read_args *uap) pmodes = fuword(&uap->aiocbp->_aiocb_private.privatemodes); if ((pmodes & AIO_PMODE_SYNC) == 0) { -#if DEBUGAIO > 1 +#if DEBUGAIO > 0 if (DEBUGFLOW > 2) printf("queueing aio_read\n"); #endif @@ -1616,7 +1958,7 @@ aio_write(struct proc *p, struct aio_write_args *uap) */ pmodes = fuword(&uap->aiocbp->_aiocb_private.privatemodes); if ((pmodes & AIO_PMODE_SYNC) == 0) { -#if DEBUGAIO > 1 +#if DEBUGAIO > 0 if (DEBUGFLOW > 2) printf("queing aio_write\n"); #endif @@ -1679,9 +2021,11 @@ lio_listio(struct proc *p, struct lio_listio_args *uap) struct aiocb *iocb, * const *cbptr; struct aiocblist *cb; struct kaioinfo *ki; + struct aio_liojob *lj; int error, runningcode; int nerror; int i; + int s; if ((uap->mode != LIO_NOWAIT) && (uap->mode != LIO_WAIT)) { #if DEBUGAIO > 0 @@ -1726,6 +2070,32 @@ lio_listio(struct proc *p, struct lio_listio_args *uap) return EAGAIN; } + lj = zalloc(aiolio_zone); + if (!lj) { + return EAGAIN; + } + + lj->lioj_flags = 0; + lj->lioj_buffer_count = 0; + lj->lioj_buffer_finished_count = 0; + lj->lioj_queue_count = 0; + lj->lioj_queue_finished_count = 0; + lj->lioj_ki = ki; + TAILQ_INSERT_TAIL(&ki->kaio_liojoblist, lj, lioj_list); + + /* + * Setup signal + */ + if (uap->sig && (uap->mode == LIO_NOWAIT)) { + error = copyin(uap->sig, &lj->lioj_signal, sizeof lj->lioj_signal); + if (error) + return error; + lj->lioj_flags |= LIOJ_SIGNAL; + lj->lioj_flags &= ~LIOJ_SIGNAL_POSTED; + } else { + lj->lioj_flags &= ~LIOJ_SIGNAL; + } + /* * get pointers to the list of I/O requests */ @@ -1736,12 +2106,15 @@ lio_listio(struct proc *p, struct lio_listio_args *uap) for(i = 0; i < uap->nent; i++) { iocb = (struct aiocb *) fuword((caddr_t) &cbptr[i]); if (((int) iocb != -1) && ((int) iocb != NULL)) { - error = _aio_aqueue(p, iocb, 0); + error = _aio_aqueue(p, iocb, lj, 0); if (error == 0) { nentqueued++; } else { nerror++; - printf("_aio_aqueue: error: %d\n", error); +#if DEBUGAIO > 0 + if (DEBUGFLOW > 0) + printf("_aio_aqueue: error: %d\n", error); +#endif } } } @@ -1799,18 +2172,32 @@ lio_listio(struct proc *p, struct lio_listio_args *uap) cb = TAILQ_NEXT(cb, plist)) { if (((int) cb->uaiocb._aiocb_private.kernelinfo) == jobref) { + if (cb->uaiocb.aio_lio_opcode == LIO_WRITE) { + curproc->p_stats->p_ru.ru_oublock += + cb->outputcharge; + cb->outputcharge = 0; + } else if (cb->uaiocb.aio_lio_opcode == LIO_READ) { + curproc->p_stats->p_ru.ru_inblock += + cb->inputcharge; + cb->inputcharge = 0; + } found++; break; } } - if (cb->uaiocb.aio_lio_opcode == LIO_WRITE) { - curproc->p_stats->p_ru.ru_oublock += cb->outputcharge; - cb->outputcharge = 0; - } else if (cb->uaiocb.aio_lio_opcode == LIO_READ) { - curproc->p_stats->p_ru.ru_inblock += cb->inputcharge; - cb->inputcharge = 0; + s = splbio(); + for (cb = TAILQ_FIRST(&ki->kaio_bufdone); + cb; + cb = TAILQ_NEXT(cb, plist)) { + if (((int) cb->uaiocb._aiocb_private.kernelinfo) == + jobref) { + found++; + break; + } } + splx(s); + } /* @@ -1819,7 +2206,6 @@ lio_listio(struct proc *p, struct lio_listio_args *uap) if (found == nentqueued) { return runningcode; } - ki->kaio_flags |= KAIO_WAKEUP; error = tsleep(p, PRIBIO|PCATCH, "aiospn", 0); @@ -1836,23 +2222,83 @@ lio_listio(struct proc *p, struct lio_listio_args *uap) return runningcode; } +/* + * This is a wierd hack so that we can post a signal. It is safe + * to do so from a timeout routine, but *not* from an interrupt routine. + */ +static void +process_signal(void *ljarg) +{ + struct aio_liojob *lj = ljarg; + if (lj->lioj_signal.sigev_notify == SIGEV_SIGNAL) { + if (lj->lioj_queue_count == lj->lioj_queue_finished_count) { + psignal(lj->lioj_ki->kaio_p, lj->lioj_signal.sigev_signo); + lj->lioj_flags |= LIOJ_SIGNAL_POSTED; + } + } +} + +/* + * Interrupt handler for physio, performs the necessary process wakeups, + * and signals. + */ static void aio_physwakeup(bp) struct buf *bp; { - struct aiocbe *iocb; + struct aiocblist *aiocbe; struct proc *p; struct kaioinfo *ki; + struct aio_liojob *lj; wakeup((caddr_t) bp); bp->b_flags &= ~B_CALL; + bp->b_flags |= B_DONE; + + aiocbe = (struct aiocblist *)bp->b_spc; + if (aiocbe) { + p = bp->b_proc; + + aiocbe->jobstate = JOBST_JOBBFINISHED; + aiocbe->uaiocb._aiocb_private.status -= bp->b_resid; + aiocbe->uaiocb._aiocb_private.error = 0; + aiocbe->jobflags |= AIOCBLIST_DONE; + + if (bp->b_flags & B_ERROR) { + aiocbe->uaiocb._aiocb_private.error = bp->b_error; + } + + lj = aiocbe->lio; + if (lj) { + lj->lioj_buffer_finished_count++; + /* + * wakeup/signal if all of the interrupt jobs are done + */ + if (lj->lioj_buffer_finished_count == lj->lioj_buffer_count) { + /* + * post a signal if it is called for + */ + if ((lj->lioj_flags & (LIOJ_SIGNAL|LIOJ_SIGNAL_POSTED)) == + LIOJ_SIGNAL) { + lj->lioj_flags |= LIOJ_SIGNAL_POSTED; + timeout(process_signal, lj, 0); + } + } + } - iocb = (struct aiocbe *)bp->b_spc; - if (iocb) { ki = p->p_aioinfo; - if (ki && (ki->kaio_flags & (KAIO_RUNDOWN|KAIO_WAKEUP))) { - ki->kaio_flags &= ~KAIO_WAKEUP; - wakeup(p); + if (ki) { + ki->kaio_buffer_finished_count++; + TAILQ_REMOVE(&aio_bufjobs, aiocbe, list); + TAILQ_REMOVE(&ki->kaio_bufqueue, aiocbe, plist); + TAILQ_INSERT_TAIL(&ki->kaio_bufdone, aiocbe, plist); + /* + * and do the wakeup + */ + if (ki->kaio_flags & (KAIO_RUNDOWN|KAIO_WAKEUP)) { + ki->kaio_flags &= ~KAIO_WAKEUP; + wakeup(p); + } } } }