Skip to content

Commit

Permalink
Implemented EV_CLOSED event for epoll backend (EPOLLRDHUP).
Browse files Browse the repository at this point in the history
- Added new EV_CLOSED event - detects premature connection close
  by clients without the necessity of reading all the pending
  data. Does not depend on EV_READ and/or EV_WRITE.

- Added new EV_FEATURE_EARLY_CLOSED feature for epoll.
  Must be supported for listening to EV_CLOSED event.

- Added new regression test: test-closed.c

- All regression tests passed (test/regress and test/test.sh)

- strace output of test-closed using EV_CLOSED:
    socketpair(PF_LOCAL, SOCK_STREAM, 0, [6, 7]) = 0
    sendto(6, "test string\0", 12, 0, NULL, 0) = 12
    shutdown(6, SHUT_WR)                    = 0
    epoll_ctl(3, EPOLL_CTL_ADD, 7, {EPOLLRDHUP, {u32=7, u64=7}}) = 0
    epoll_wait(3, {{EPOLLRDHUP, {u32=7, u64=7}}}, 32, 3000) = 1
    epoll_ctl(3, EPOLL_CTL_MOD, 7, {EPOLLRDHUP, {u32=7, u64=7}}) = 0
    fstat(1, {st_mode=S_IFCHR|0620, st_rdev=makedev(136, 4), ...})
    mmap(NULL, 4096, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYM...
    write(1, "closed_cb: detected connection close "..., 45) = 45
  • Loading branch information
dgiagio committed Jan 18, 2014
1 parent d240328 commit b1b69ac
Show file tree
Hide file tree
Showing 10 changed files with 1,297 additions and 137 deletions.
1 change: 1 addition & 0 deletions changelist-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ struct event_change {
* and write_change is unused. */
ev_uint8_t read_change;
ev_uint8_t write_change;
ev_uint8_t close_change;
};

/* Flags for read_change and write_change. */
Expand Down
1,149 changes: 1,067 additions & 82 deletions epoll.c

Large diffs are not rendered by default.

34 changes: 19 additions & 15 deletions event.c
Original file line number Diff line number Diff line change
Expand Up @@ -1526,10 +1526,11 @@ event_process_active_single_queue(struct event_base *base,
else
event_del_nolock_(ev, EVENT_DEL_NOBLOCK);
event_debug((
"event_process_active: event: %p, %s%scall %p",
"event_process_active: event: %p, %s%s%scall %p",
ev,
ev->ev_res & EV_READ ? "EV_READ " : " ",
ev->ev_res & EV_WRITE ? "EV_WRITE " : " ",
ev->ev_res & EV_CLOSED ? "EV_CLOSED " : " ",
ev->ev_callback));
} else {
event_queue_remove_active(base, evcb);
Expand Down Expand Up @@ -1931,7 +1932,7 @@ event_base_once(struct event_base *base, evutil_socket_t fd, short events,
eonce->cb = callback;
eonce->arg = arg;

if ((events & (EV_TIMEOUT|EV_SIGNAL|EV_READ|EV_WRITE)) == EV_TIMEOUT) {
if ((events & (EV_TIMEOUT|EV_SIGNAL|EV_READ|EV_WRITE|EV_CLOSED)) == EV_TIMEOUT) {
evtimer_assign(&eonce->ev, base, event_once_cb, eonce);

if (tv == NULL || ! evutil_timerisset(tv)) {
Expand All @@ -1941,8 +1942,8 @@ event_base_once(struct event_base *base, evutil_socket_t fd, short events,
* it fast (and order-preserving). */
activate = 1;
}
} else if (events & (EV_READ|EV_WRITE)) {
events &= EV_READ|EV_WRITE;
} else if (events & (EV_READ|EV_WRITE|EV_CLOSED)) {
events &= EV_READ|EV_WRITE|EV_CLOSED;

event_assign(&eonce->ev, base, fd, events, event_once_cb, eonce);
} else {
Expand Down Expand Up @@ -1992,9 +1993,9 @@ event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd, shor
ev->ev_pncalls = NULL;

if (events & EV_SIGNAL) {
if ((events & (EV_READ|EV_WRITE)) != 0) {
if ((events & (EV_READ|EV_WRITE|EV_CLOSED)) != 0) {
event_warnx("%s: EV_SIGNAL is not compatible with "
"EV_READ or EV_WRITE", __func__);
"EV_READ, EV_WRITE or EV_CLOSED", __func__);
return -1;
}
ev->ev_closure = EV_CLOSURE_EVENT_SIGNAL;
Expand Down Expand Up @@ -2244,13 +2245,13 @@ event_pending(const struct event *ev, short event, struct timeval *tv)
event_debug_assert_is_setup_(ev);

if (ev->ev_flags & EVLIST_INSERTED)
flags |= (ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL));
flags |= (ev->ev_events & (EV_READ|EV_WRITE|EV_CLOSED|EV_SIGNAL));
if (ev->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))
flags |= ev->ev_res;
if (ev->ev_flags & EVLIST_TIMEOUT)
flags |= EV_TIMEOUT;

event &= (EV_TIMEOUT|EV_READ|EV_WRITE|EV_SIGNAL);
event &= (EV_TIMEOUT|EV_READ|EV_WRITE|EV_CLOSED|EV_SIGNAL);

/* See if there is a timeout that we should report */
if (tv != NULL && (flags & event & EV_TIMEOUT)) {
Expand Down Expand Up @@ -2464,11 +2465,12 @@ event_add_nolock_(struct event *ev, const struct timeval *tv,
event_debug_assert_is_setup_(ev);

event_debug((
"event_add: event: %p (fd "EV_SOCK_FMT"), %s%s%scall %p",
"event_add: event: %p (fd "EV_SOCK_FMT"), %s%s%s%scall %p",
ev,
EV_SOCK_ARG(ev->ev_fd),
ev->ev_events & EV_READ ? "EV_READ " : " ",
ev->ev_events & EV_WRITE ? "EV_WRITE " : " ",
ev->ev_events & EV_CLOSED ? "EV_CLOSED " : " ",
tv ? "EV_TIMEOUT " : " ",
ev->ev_callback));

Expand Down Expand Up @@ -2502,9 +2504,9 @@ event_add_nolock_(struct event *ev, const struct timeval *tv,
}
#endif

if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
if ((ev->ev_events & (EV_READ|EV_WRITE|EV_CLOSED|EV_SIGNAL)) &&
!(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) {
if (ev->ev_events & (EV_READ|EV_WRITE))
if (ev->ev_events & (EV_READ|EV_WRITE|EV_CLOSED))
res = evmap_io_add_(base, ev->ev_fd, ev);
else if (ev->ev_events & EV_SIGNAL)
res = evmap_signal_add_(base, (int)ev->ev_fd, ev);
Expand Down Expand Up @@ -2731,7 +2733,7 @@ event_del_nolock_(struct event *ev, int blocking)

if (ev->ev_flags & EVLIST_INSERTED) {
event_queue_remove_inserted(base, ev);
if (ev->ev_events & (EV_READ|EV_WRITE))
if (ev->ev_events & (EV_READ|EV_WRITE|EV_CLOSED))
res = evmap_io_del_(base, ev->ev_fd, ev);
else
res = evmap_signal_del_(base, (int)ev->ev_fd, ev);
Expand Down Expand Up @@ -3602,10 +3604,11 @@ dump_inserted_event_fn(const struct event_base *base, const struct event *e, voi
if (! (e->ev_flags & (EVLIST_INSERTED|EVLIST_TIMEOUT)))
return 0;

fprintf(output, " %p [%s "EV_SOCK_FMT"]%s%s%s%s%s",
fprintf(output, " %p [%s "EV_SOCK_FMT"]%s%s%s%s%s%s",
(void*)e, gloss, EV_SOCK_ARG(e->ev_fd),
(e->ev_events&EV_READ)?" Read":"",
(e->ev_events&EV_WRITE)?" Write":"",
(e->ev_events&EV_CLOSED)?" EOF":"",
(e->ev_events&EV_SIGNAL)?" Signal":"",
(e->ev_events&EV_PERSIST)?" Persist":"",
(e->ev_flags&EVLIST_INTERNAL)?" Internal":"");
Expand Down Expand Up @@ -3634,10 +3637,11 @@ dump_active_event_fn(const struct event_base *base, const struct event *e, void
if (! (e->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)))
return 0;

fprintf(output, " %p [%s "EV_SOCK_FMT", priority=%d]%s%s%s%s active%s%s\n",
fprintf(output, " %p [%s "EV_SOCK_FMT", priority=%d]%s%s%s%s%s active%s%s\n",
(void*)e, gloss, EV_SOCK_ARG(e->ev_fd), e->ev_pri,
(e->ev_res&EV_READ)?" Read":"",
(e->ev_res&EV_WRITE)?" Write":"",
(e->ev_res&EV_CLOSED)?" EOF":"",
(e->ev_res&EV_SIGNAL)?" Signal":"",
(e->ev_res&EV_TIMEOUT)?" Timeout":"",
(e->ev_flags&EVLIST_INTERNAL)?" [Internal]":"",
Expand Down Expand Up @@ -3677,7 +3681,7 @@ void
event_base_active_by_fd(struct event_base *base, evutil_socket_t fd, short events)
{
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
evmap_io_active_(base, fd, events & (EV_READ|EV_WRITE));
evmap_io_active_(base, fd, events & (EV_READ|EV_WRITE|EV_CLOSED));
EVBASE_RELEASE_LOCK(base, th_base_lock);
}

Expand Down
46 changes: 40 additions & 6 deletions evmap.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ struct evmap_io {
struct event_dlist events;
ev_uint16_t nread;
ev_uint16_t nwrite;
ev_uint16_t nclose;
};

/* An entry for an evmap_signal list: notes all the events that want to know
Expand Down Expand Up @@ -255,6 +256,7 @@ evmap_io_init(struct evmap_io *entry)
LIST_INIT(&entry->events);
entry->nread = 0;
entry->nwrite = 0;
entry->nclose = 0;
}


Expand All @@ -266,7 +268,7 @@ evmap_io_add_(struct event_base *base, evutil_socket_t fd, struct event *ev)
const struct eventop *evsel = base->evsel;
struct event_io_map *io = &base->io;
struct evmap_io *ctx = NULL;
int nread, nwrite, retval = 0;
int nread, nwrite, nclose, retval = 0;
short res = 0, old = 0;
struct event *old_ev;

Expand All @@ -286,11 +288,14 @@ evmap_io_add_(struct event_base *base, evutil_socket_t fd, struct event *ev)

nread = ctx->nread;
nwrite = ctx->nwrite;
nclose = ctx->nclose;

if (nread)
old |= EV_READ;
if (nwrite)
old |= EV_WRITE;
if (nclose)
old |= EV_CLOSED;

if (ev->ev_events & EV_READ) {
if (++nread == 1)
Expand All @@ -300,7 +305,11 @@ evmap_io_add_(struct event_base *base, evutil_socket_t fd, struct event *ev)
if (++nwrite == 1)
res |= EV_WRITE;
}
if (EVUTIL_UNLIKELY(nread > 0xffff || nwrite > 0xffff)) {
if (ev->ev_events & EV_CLOSED) {
if (++nclose == 1)
res |= EV_CLOSED;
}
if (EVUTIL_UNLIKELY(nread > 0xffff || nwrite > 0xffff || nclose > 0xffff)) {
event_warnx("Too many events reading or writing on fd %d",
(int)fd);
return -1;
Expand All @@ -326,6 +335,7 @@ evmap_io_add_(struct event_base *base, evutil_socket_t fd, struct event *ev)

ctx->nread = (ev_uint16_t) nread;
ctx->nwrite = (ev_uint16_t) nwrite;
ctx->nclose = (ev_uint16_t) nclose;
LIST_INSERT_HEAD(&ctx->events, ev, ev_io_next);

return (retval);
Expand All @@ -339,7 +349,7 @@ evmap_io_del_(struct event_base *base, evutil_socket_t fd, struct event *ev)
const struct eventop *evsel = base->evsel;
struct event_io_map *io = &base->io;
struct evmap_io *ctx;
int nread, nwrite, retval = 0;
int nread, nwrite, nclose, retval = 0;
short res = 0, old = 0;

if (fd < 0)
Expand All @@ -356,11 +366,14 @@ evmap_io_del_(struct event_base *base, evutil_socket_t fd, struct event *ev)

nread = ctx->nread;
nwrite = ctx->nwrite;
nclose = ctx->nclose;

if (nread)
old |= EV_READ;
if (nwrite)
old |= EV_WRITE;
if (nclose)
old |= EV_CLOSED;

if (ev->ev_events & EV_READ) {
if (--nread == 0)
Expand All @@ -372,6 +385,11 @@ evmap_io_del_(struct event_base *base, evutil_socket_t fd, struct event *ev)
res |= EV_WRITE;
EVUTIL_ASSERT(nwrite >= 0);
}
if (ev->ev_events & EV_CLOSED) {
if (--nclose == 0)
res |= EV_CLOSED;
EVUTIL_ASSERT(nclose >= 0);
}

if (res) {
void *extra = ((char*)ctx) + sizeof(struct evmap_io);
Expand All @@ -384,6 +402,7 @@ evmap_io_del_(struct event_base *base, evutil_socket_t fd, struct event *ev)

ctx->nread = nread;
ctx->nwrite = nwrite;
ctx->nclose = nclose;
LIST_REMOVE(ev, ev_io_next);

return (retval);
Expand Down Expand Up @@ -589,6 +608,8 @@ evmap_io_reinit_iter_fn(struct event_base *base, evutil_socket_t fd,
events |= EV_READ;
if (ctx->nwrite)
events |= EV_WRITE;
if (ctx->nclose)
events |= EV_CLOSED;
if (evsel->fdinfo_len)
memset(extra, 0, evsel->fdinfo_len);
if (events &&
Expand Down Expand Up @@ -856,6 +877,10 @@ event_changelist_add_(struct event_base *base, evutil_socket_t fd, short old, sh
change->write_change = EV_CHANGE_ADD |
(events & (EV_ET|EV_PERSIST|EV_SIGNAL));
}
if (events & EV_CLOSED) {
change->close_change = EV_CHANGE_ADD |
(events & (EV_ET|EV_PERSIST|EV_SIGNAL));
}

event_changelist_check(base);
return (0);
Expand Down Expand Up @@ -902,6 +927,12 @@ event_changelist_del_(struct event_base *base, evutil_socket_t fd, short old, sh
else
change->write_change = EV_CHANGE_DEL;
}
if (events & EV_CLOSED) {
if (!(change->old_events & EV_CLOSED))
change->close_change = 0;
else
change->close_change = EV_CHANGE_DEL;
}

event_changelist_check(base);
return (0);
Expand All @@ -915,7 +946,7 @@ evmap_io_check_integrity_fn(struct event_base *base, evutil_socket_t fd,
struct evmap_io *io_info, void *arg)
{
struct event *ev;
int n_read = 0, n_write = 0;
int n_read = 0, n_write = 0, n_close = 0;

/* First, make sure the list itself isn't corrupt. Otherwise,
* running LIST_FOREACH could be an exciting adventure. */
Expand All @@ -925,15 +956,18 @@ evmap_io_check_integrity_fn(struct event_base *base, evutil_socket_t fd,
EVUTIL_ASSERT(ev->ev_flags & EVLIST_INSERTED);
EVUTIL_ASSERT(ev->ev_fd == fd);
EVUTIL_ASSERT(!(ev->ev_events & EV_SIGNAL));
EVUTIL_ASSERT((ev->ev_events & (EV_READ|EV_WRITE)));
EVUTIL_ASSERT((ev->ev_events & (EV_READ|EV_WRITE|EV_CLOSED)));
if (ev->ev_events & EV_READ)
++n_read;
if (ev->ev_events & EV_WRITE)
++n_write;
if (ev->ev_events & EV_CLOSED)
++n_close;
}

EVUTIL_ASSERT(n_read == io_info->nread);
EVUTIL_ASSERT(n_write == io_info->nwrite);
EVUTIL_ASSERT(n_close == io_info->nclose);

return 0;
}
Expand All @@ -952,7 +986,7 @@ evmap_signal_check_integrity_fn(struct event_base *base,
EVUTIL_ASSERT(ev->ev_flags & EVLIST_INSERTED);
EVUTIL_ASSERT(ev->ev_fd == signum);
EVUTIL_ASSERT((ev->ev_events & EV_SIGNAL));
EVUTIL_ASSERT(!(ev->ev_events & (EV_READ|EV_WRITE)));
EVUTIL_ASSERT(!(ev->ev_events & (EV_READ|EV_WRITE|EV_CLOSED)));
}
return 0;
}
Expand Down
10 changes: 9 additions & 1 deletion include/event2/event.h
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,10 @@ enum event_method_feature {
EV_FEATURE_O1 = 0x02,
/** Require an event method that allows file descriptors as well as
* sockets. */
EV_FEATURE_FDS = 0x04
EV_FEATURE_FDS = 0x04,
/** Require an event method that detects premature connection close by
* clients without the necessity of reading all the pending data. */
EV_FEATURE_EARLY_CLOSE = 0x08
};

/**
Expand Down Expand Up @@ -904,6 +907,11 @@ int event_base_got_break(struct event_base *);
* BECOMES STABLE.
**/
#define EV_FINALIZE 0x40
/**
* Detects premature connection close by clients without the necessity of
* reading all the pending data, if supported by the backend.
**/
#define EV_CLOSED 0x80
/**@}*/

/**
Expand Down
Loading

0 comments on commit b1b69ac

Please sign in to comment.