fusefs: give priority to FUSE_INTERRUPT operations

When interrupting a FUSE operation, send the FUSE_INTERRUPT op to the daemon
ASAP, ahead of other unrelated operations.

PR:		236530
Sponsored by:	The FreeBSD Foundation
This commit is contained in:
Alan Somers 2019-04-19 21:50:23 +00:00
parent f0f7fc1be4
commit 268c28edbc
Notes: svn2git 2020-12-20 02:59:44 +00:00
svn path=/projects/fuse2/; revision=346418
5 changed files with 132 additions and 11 deletions

View file

@ -295,7 +295,7 @@ fuse_internal_fsync(struct vnode *vp,
} else {
fuse_insert_callback(fdi.tick,
fuse_internal_fsync_callback);
fuse_insert_message(fdi.tick);
fuse_insert_message(fdi.tick, false);
}
if (err == ENOSYS) {
/* ENOSYS means "success, and don't call again" */
@ -593,7 +593,7 @@ fuse_internal_forget_send(struct mount *mp,
ffi = fdi.indata;
ffi->nlookup = nlookup;
fuse_insert_message(fdi.tick);
fuse_insert_message(fdi.tick, false);
fdisp_destroy(&fdi);
}
@ -736,7 +736,7 @@ fuse_internal_send_init(struct fuse_data *data, struct thread *td)
fiii->flags = FUSE_POSIX_LOCKS;
fuse_insert_callback(fdi.tick, fuse_internal_init_callback);
fuse_insert_message(fdi.tick);
fuse_insert_message(fdi.tick, false);
fdisp_destroy(&fdi);
}

View file

@ -238,7 +238,8 @@ fuse_interrupt_send(struct fuse_ticket *otick, int err)
fuse_insert_callback(fdi.tick, fuse_interrupt_callback);
otick->irq_unique = fdi.tick->tk_unique;
fuse_insert_message(fdi.tick);
/* Interrupt ops should be delivered ASAP */
fuse_insert_message(fdi.tick, true);
fdisp_destroy(&fdi);
} else {
/* This ticket has already been interrupted */
@ -660,8 +661,14 @@ fuse_insert_callback(struct fuse_ticket *ftick, fuse_handler_t * handler)
fuse_lck_mtx_unlock(ftick->tk_data->aw_mtx);
}
/*
* Insert a new upgoing ticket into the message queue
*
* If urgent is true, insert at the front of the queue. Otherwise, insert in
* FIFO order.
*/
void
fuse_insert_message(struct fuse_ticket *ftick)
fuse_insert_message(struct fuse_ticket *ftick, bool urgent)
{
if (ftick->tk_flag & FT_DIRTY) {
panic("FUSE: ticket reused without being refreshed");
@ -672,7 +679,10 @@ fuse_insert_message(struct fuse_ticket *ftick)
return;
}
fuse_lck_mtx_lock(ftick->tk_data->ms_mtx);
fuse_ms_push(ftick);
if (urgent)
fuse_ms_push_head(ftick);
else
fuse_ms_push(ftick);
wakeup_one(ftick->tk_data);
selwakeuppri(&ftick->tk_data->ks_rsel, PZERO + 1);
fuse_lck_mtx_unlock(ftick->tk_data->ms_mtx);
@ -972,7 +982,7 @@ fdisp_wait_answ(struct fuse_dispatcher *fdip)
fdip->answ_stat = 0;
fuse_insert_callback(fdip->tick, fuse_standard_handler);
fuse_insert_message(fdip->tick);
fuse_insert_message(fdip->tick, false);
if ((err = fticket_wait_answer(fdip->tick))) {
fuse_lck_mtx_lock(fdip->tick->tk_aw_mtx);

View file

@ -285,6 +285,7 @@ fsess_opt_brokenio(struct mount *mp)
return (fuse_fix_broken_io || (data->dataflags & FSESS_BROKENIO));
}
/* Insert a new upgoing message */
static inline void
fuse_ms_push(struct fuse_ticket *ftick)
{
@ -293,6 +294,15 @@ fuse_ms_push(struct fuse_ticket *ftick)
STAILQ_INSERT_TAIL(&ftick->tk_data->ms_head, ftick, tk_ms_link);
}
/* Insert a new upgoing message to the front of the queue */
static inline void
fuse_ms_push_head(struct fuse_ticket *ftick)
{
mtx_assert(&ftick->tk_data->ms_mtx, MA_OWNED);
refcount_acquire(&ftick->tk_refcount);
STAILQ_INSERT_HEAD(&ftick->tk_data->ms_head, ftick, tk_ms_link);
}
static inline struct fuse_ticket *
fuse_ms_pop(struct fuse_data *data)
{
@ -345,7 +355,7 @@ fuse_aw_pop(struct fuse_data *data)
struct fuse_ticket *fuse_ticket_fetch(struct fuse_data *data);
int fuse_ticket_drop(struct fuse_ticket *ftick);
void fuse_insert_callback(struct fuse_ticket *ftick, fuse_handler_t *handler);
void fuse_insert_message(struct fuse_ticket *ftick);
void fuse_insert_message(struct fuse_ticket *ftick, bool irq);
static inline bool
fuse_libabi_geq(struct fuse_data *data, uint32_t abi_maj, uint32_t abi_min)

View file

@ -592,7 +592,7 @@ fuse_vnop_create(struct vop_create_args *ap)
fri->fh = fh_id;
fri->flags = flags;
fuse_insert_callback(fdip->tick, fuse_internal_forget_callback);
fuse_insert_message(fdip->tick);
fuse_insert_message(fdip->tick, false);
goto out;
}
ASSERT_VOP_ELOCKED(*vpp, "fuse_vnop_create");

View file

@ -46,6 +46,8 @@ using namespace testing;
/* Initial size of files used by these tests */
const off_t FILESIZE = 1000;
static sem_t *signaled_semaphore;
/* Don't do anything; all we care about is that the syscall gets interrupted */
void sigusr2_handler(int __unused sig) {
if (verbosity > 1) {
@ -63,6 +65,8 @@ void* killer(void* target) {
if (verbosity > 1)
printf("Signalling! thread %p\n", target);
pthread_kill((pthread_t)target, SIGUSR2);
if (signaled_semaphore != NULL)
sem_post(signaled_semaphore);
return(NULL);
}
@ -112,13 +116,18 @@ void expect_write(uint64_t ino, uint64_t *write_unique)
}));
}
void setup_interruptor(pthread_t self)
void setup_interruptor(pthread_t target)
{
ASSERT_NE(SIG_ERR, signal(SIGUSR2, sigusr2_handler)) << strerror(errno);
ASSERT_EQ(0, pthread_create(&m_child, NULL, killer, (void*)self))
ASSERT_EQ(0, pthread_create(&m_child, NULL, killer, (void*)target))
<< strerror(errno);
}
void SetUp() {
signaled_semaphore = NULL;
FuseTest::SetUp();
}
void TearDown() {
struct sigaction sa;
@ -626,6 +635,98 @@ TEST_F(Interrupt, in_progress_read)
/* Deliberately leak fd. close(2) will be tested in release.cc */
}
/* FUSE_INTERRUPT operations should take priority over other pending ops */
TEST_F(Interrupt, priority)
{
const char FULLPATH0[] = "mountpoint/some_file.txt";
const char RELPATH0[] = "some_file.txt";
const char FULLPATH1[] = "mountpoint/other_file.txt";
const char RELPATH1[] = "other_file.txt";
const char *CONTENTS = "ijklmnop";
Sequence seq;
ssize_t bufsize = strlen(CONTENTS);
uint64_t ino0 = 42, ino1 = 43;
int fd0, fd1;
uint64_t write_unique;
pthread_t self, th0;
sem_t sem0, sem1;
ASSERT_EQ(0, sem_init(&sem0, 0, 0)) << strerror(errno);
ASSERT_EQ(0, sem_init(&sem1, 0, 0)) << strerror(errno);
self = pthread_self();
expect_lookup(RELPATH0, ino0);
expect_open(ino0, 0, 1);
expect_lookup(RELPATH1, ino1);
expect_open(ino1, 0, 1);
EXPECT_CALL(*m_mock, process(
ResultOf([=](auto in) {
return (in->header.opcode == FUSE_WRITE &&
in->header.nodeid == ino0);
}, Eq(true)),
_)
).InSequence(seq)
.WillOnce(Invoke(ReturnImmediate([&](auto in, auto out) {
write_unique = in->header.unique;
/* Let the next write proceed */
sem_post(&sem1);
/* Pause the daemon thread so it won't read the next op */
sem_wait(&sem0);
/* Finally, interrupt the original op */
out->header.error = -EINTR;
out->header.unique = write_unique;
out->header.len = sizeof(out->header);
})));
/*
* FUSE_INTERRUPT should be received before the second FUSE_WRITE, even
* though it was generated later
*/
EXPECT_CALL(*m_mock, process(
ResultOf([&](auto in) {
return (in->header.opcode == FUSE_INTERRUPT &&
in->body.interrupt.unique == write_unique);
}, Eq(true)),
_)
).InSequence(seq)
.WillOnce(Invoke(ReturnErrno(EAGAIN)));
EXPECT_CALL(*m_mock, process(
ResultOf([&](auto in) {
return (in->header.opcode == FUSE_WRITE &&
in->header.nodeid == ino1);
}, Eq(true)),
_)
).InSequence(seq)
.WillOnce(Invoke(ReturnImmediate([=](auto in , auto out) {
SET_OUT_HEADER_LEN(out, write);
out->body.write.size = in->body.write.size;
})));
fd0 = open(FULLPATH0, O_WRONLY);
ASSERT_LE(0, fd0) << strerror(errno);
fd1 = open(FULLPATH1, O_WRONLY);
ASSERT_LE(0, fd1) << strerror(errno);
/* Use a separate thread for the first write */
ASSERT_EQ(0, pthread_create(&th0, NULL, write0, (void*)(intptr_t)fd0))
<< strerror(errno);
signaled_semaphore = &sem0;
sem_wait(&sem1); /* Sequence the two writes */
setup_interruptor(th0);
ASSERT_EQ(bufsize, write(fd1, CONTENTS, bufsize)) << strerror(errno);
/* Wait awhile to make sure the signal generates no FUSE_INTERRUPT */
usleep(250'000);
pthread_join(th0, NULL);
sem_destroy(&sem1);
sem_destroy(&sem0);
}
/*
* If the FUSE filesystem receives the FUSE_INTERRUPT operation before
* processing the original, then it should wait for "some timeout" for the