Skip to content

Commit bc1a118

Browse files
committed
add port/socketpair
1 parent a9bbba8 commit bc1a118

File tree

3 files changed

+103
-12
lines changed

3 files changed

+103
-12
lines changed

include/port/socketpair.h

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
#ifndef _socketpair_h_
2+
#define _socketpair_h_
3+
4+
#if !defined(OS_WINDOWS)
5+
#include <sys/types.h> /* See NOTES */
6+
#include <sys/socket.h>
7+
8+
// https://linux.die.net/man/3/socketpair
9+
#else
10+
11+
#include "sockutil.h"
12+
#include <stdlib.h>
13+
#include <string.h>
14+
15+
/// @param[in] domain AF_UNIX/AF_LOCAL only
16+
/// @param[in] type SOCK_STREAM/SOCK_DGRAM/SOCK_SEQPACKET only
17+
/// @param[in] protocol ignore, must be 0
18+
/// @return 0-ok, -1-error
19+
static inline int socketpair(int domain, int type, int protocol, socket_t sv[2])
20+
{
21+
socket_t pr[2];
22+
socklen_t addrlen;
23+
struct addrinfo hints, *ai;
24+
struct sockaddr_storage addr;
25+
26+
memset(&hints, 0, sizeof(hints));
27+
hints.ai_family = protocol;
28+
hints.ai_socktype = type;
29+
hints.ai_flags = AI_PASSIVE;
30+
if (0 != getaddrinfo(NULL, NULL, &hints, &ai))
31+
return -1;
32+
33+
addrlen = ai->ai_addrlen;
34+
memcpy(&addr, ai->ai_addr, ai->ai_addrlen);
35+
freeaddrinfo(ai);
36+
37+
pr[0] = socket(domain, type, protocol);
38+
pr[1] = socket(domain, type, protocol);
39+
if (socket_invalid == pr[0] || socket_invalid == pr[1])
40+
goto SOCKPAIRFAILED;
41+
42+
if(0 != socket_bind(pr[0], (struct sockaddr*)&addr, addrlen))
43+
goto SOCKPAIRFAILED;
44+
45+
if (0 != socket_listen(pr[0], 1))
46+
goto SOCKPAIRFAILED;
47+
48+
if(0 != socket_connect_by_time(pr[1], (struct sockaddr*)&addr, addrlen, 10))
49+
goto SOCKPAIRFAILED;
50+
51+
sv[0] = socket_accept_by_time(pr[0], &addr, &addrlen, 10);
52+
if(socket_invalid == sv[0])
53+
goto SOCKPAIRFAILED;
54+
55+
socket_close(pr[0]); // close daemon socket
56+
socket_setnonblock(sv[0], 1);
57+
sv[1] = pr[1];
58+
return 0;
59+
60+
SOCKPAIRFAILED:
61+
if (socket_invalid != pr[0])
62+
socket_close(pr[0]);
63+
if (socket_invalid != pr[1])
64+
socket_close(pr[1]);
65+
return -1;
66+
}
67+
#endif
68+
69+
#endif /* !_socketpair_h_ */

libaio/include/aio-poll.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,15 @@ int aio_poll_destroy(struct aio_poll_t* poll);
2323
/// @param[in] socket same as aio_poll_poll socket
2424
/// @param[in] flags AIO_POLL_IN/AIO_POLL_OUT
2525
/// @param[in] param same as aio_poll_poll param
26-
typedef void (*aio_poll_callback)(int code, socket_t socket, int flags, void* param);
26+
typedef void (*aio_poll_onpoll)(int code, socket_t socket, int flags, void* param);
2727

2828
/// Async poll
2929
/// @param[in] flags AIO_POLL_IN/AIO_POLL_OUT
3030
/// @param[in] timeout poll timeout, in MS
3131
/// @param[in] callback poll callback
3232
/// @param[in] param user defined callback parameter
3333
/// @return 0-ok, other-error
34-
int aio_poll_poll(struct aio_poll_t* poll, socket_t socket, int flags, int timeout, aio_poll_callback callback, void* param);
34+
int aio_poll_poll(struct aio_poll_t* poll, socket_t socket, int flags, int timeout, aio_poll_onpoll callback, void* param);
3535

3636
#ifdef __cplusplus
3737
}

libaio/src/aio-poll.c

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,14 @@
33
#include "sys/thread.h"
44
#include "sys/locker.h"
55
#include "sys/pollfd.h"
6+
#include "port/socketpair.h"
67
#include "sockutil.h"
78
#include "list.h"
89
#include <errno.h>
910
#include <stdlib.h>
1011
#include <assert.h>
1112

12-
#define N_SOCKETS FD_SETSIZE
13+
#define N_SOCKETS (FD_SETSIZE-1)
1314

1415
struct aio_poll_socket_t
1516
{
@@ -19,13 +20,14 @@ struct aio_poll_socket_t
1920
int events;
2021
int revents;
2122
uint64_t expire;
22-
aio_poll_callback callback;
23+
aio_poll_onpoll callback;
2324
void* param;
2425
};
2526

2627
struct aio_poll_t
2728
{
2829
locker_t locker;
30+
socket_t pair[2];
2931
struct list_head root;
3032
struct list_head idles;
3133

@@ -79,6 +81,13 @@ struct aio_poll_t* aio_poll_create()
7981
{
8082
LIST_INIT_HEAD(&poll->root);
8183
LIST_INIT_HEAD(&poll->idles);
84+
aio_poll_init_idles(poll);
85+
86+
if (0 != socketpair(PF_INET, SOCK_DGRAM, 0, poll->pair))
87+
{
88+
free(poll);
89+
return NULL;
90+
}
8291

8392
poll->running = 1;
8493
locker_create(&poll->locker);
@@ -94,12 +103,16 @@ int aio_poll_destroy(struct aio_poll_t* poll)
94103

95104
assert(list_empty(&poll->root));
96105
poll->running = 0;
106+
socket_send_all_by_time(poll->pair[1], poll, 1, 0, 5000);
107+
97108
thread_destroy(poll->thread);
98109
locker_destroy(&poll->locker);
110+
socket_close(poll->pair[0]);
111+
socket_close(poll->pair[1]);
99112
return 0;
100113
}
101114

102-
int aio_poll_poll(struct aio_poll_t* poll, socket_t socket, int flags, int timeout, aio_poll_callback callback, void* param)
115+
int aio_poll_poll(struct aio_poll_t* poll, socket_t socket, int flags, int timeout, aio_poll_onpoll callback, void* param)
103116
{
104117
struct aio_poll_socket_t* s;
105118

@@ -120,6 +133,9 @@ int aio_poll_poll(struct aio_poll_t* poll, socket_t socket, int flags, int timeo
120133
locker_lock(&poll->locker);
121134
list_insert_after(&s->link, poll->root.prev);
122135
locker_unlock(&poll->locker);
136+
137+
// notify
138+
socket_send_all_by_time(poll->pair[1], s, 1, 0, 5000);
123139
return 0;
124140
}
125141

@@ -128,28 +144,34 @@ static int STDCALL aio_poll_worker(void* param)
128144
{
129145
int i, n, r;
130146
uint64_t now;
147+
char buf[128];
131148
struct list_head* ptr;
132149
struct aio_poll_t* poll;
133-
struct aio_poll_socket_t* links[N_SOCKETS];
150+
struct aio_poll_socket_t link0, *links[N_SOCKETS];
134151

135152
poll = (struct aio_poll_t*)param;
153+
link0.fd = poll->pair[0];
154+
link0.events = AIO_POLL_IN;
155+
136156
while (poll->running)
137157
{
158+
socket_recv_by_time(poll->pair[0], buf, sizeof(buf), 0, 0);
159+
138160
n = 0;
161+
links[n++] = &link0;
139162
locker_lock(&poll->locker);
140163
list_for_each(ptr, &poll->root)
141164
{
142165
links[n++] = list_entry(ptr, struct aio_poll_socket_t, link);
143166
}
144167
locker_unlock(&poll->locker);
145168

146-
// TODO: timeout
147-
r = aio_poll_do(links, n, 1000);
169+
r = aio_poll_do(links, n, 120000);
148170
if (r < 0)
149171
break;
150172

151173
now = system_clock();
152-
for (i = 0; i < n; i++)
174+
for (i = 1; i < n; i++)
153175
{
154176
if (0 != links[i]->revents)
155177
{
@@ -231,11 +253,11 @@ static int aio_poll_do(struct aio_poll_socket_t* s[], int n, int timeout)
231253

232254
for (r = i = 0; i < n && i < 64; i++)
233255
{
234-
s[i]->oflags = 0;
256+
s[i]->revents = 0;
235257
if (fds[i].revents & POLLIN)
236-
s[i]->oflags |= AIO_POLL_IN;
258+
s[i]->revents |= AIO_POLL_IN;
237259
if (fds[i].revents & POLLOUT)
238-
s[i]->oflags |= AIO_POLL_OUT;
260+
s[i]->revents |= AIO_POLL_OUT;
239261
}
240262

241263
return r;

0 commit comments

Comments
 (0)