#include "uv.h"
#include "internal.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/uio.h>
#include <sys/un.h>
#include <unistd.h>
#include <limits.h> /* IOV_MAX */
#if defined(__APPLE__)
# include <sys/event.h>
# include <sys/time.h>
# include <sys/select.h>
typedef
struct
uv__stream_select_s uv__stream_select_t;
struct
uv__stream_select_s {
uv_stream_t* stream;
uv_thread_t
thread
;
uv_sem_t close_sem;
uv_sem_t async_sem;
uv_async_t async;
int
events;
int
fake_fd;
int
int_fd;
int
fd;
fd_set* sread;
size_t
sread_sz;
fd_set* swrite;
size_t
swrite_sz;
};
# define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR || errno == EPROTOTYPE)
# define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \
(
errno
== EAGAIN ||
errno
== EWOULDBLOCK ||
errno
== ENOBUFS || \
(
errno
== EMSGSIZE && send_handle != NULL))
#else
# define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR)
# define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \
(
errno
== EAGAIN ||
errno
== EWOULDBLOCK ||
errno
== ENOBUFS)
#endif /* defined(__APPLE__) */
static
void
uv__stream_connect(uv_stream_t*);
static
void
uv__write(uv_stream_t* stream);
static
void
uv__read(uv_stream_t* stream);
static
void
uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned
int
events);
static
void
uv__write_callbacks(uv_stream_t* stream);
static
size_t
uv__write_req_size(uv_write_t* req);
void
uv__stream_init(uv_loop_t* loop,
uv_stream_t* stream,
uv_handle_type type) {
int
err;
uv__handle_init(loop, (uv_handle_t*)stream, type);
stream->read_cb = NULL;
stream->alloc_cb = NULL;
stream->close_cb = NULL;
stream->connection_cb = NULL;
stream->connect_req = NULL;
stream->shutdown_req = NULL;
stream->accepted_fd = -1;
stream->queued_fds = NULL;
stream->delayed_error = 0;
QUEUE_INIT(&stream->write_queue);
QUEUE_INIT(&stream->write_completed_queue);
stream->write_queue_size = 0;
if
(loop->emfile_fd == -1) {
err = uv__open_cloexec(
"/dev/null"
, O_RDONLY);
if
(err < 0)
err = uv__open_cloexec(
"/"
, O_RDONLY);
if
(err >= 0)
loop->emfile_fd = err;
}
#if defined(__APPLE__)
stream->select = NULL;
#endif /* defined(__APPLE_) */
uv__io_init(&stream->io_watcher, uv__stream_io, -1);
}
static
void
uv__stream_osx_interrupt_select(uv_stream_t* stream) {
#if defined(__APPLE__)
uv__stream_select_t* s;
int
r;
s = stream->select;
if
(s == NULL)
return
;
do
r = write(s->fake_fd,
"x"
, 1);
while
(r == -1 &&
errno
== EINTR);
assert
(r == 1);
#else /* !defined(__APPLE__) */
#endif /* !defined(__APPLE__) */
}
#if defined(__APPLE__)
static
void
uv__stream_osx_select(
void
* arg) {
uv_stream_t* stream;
uv__stream_select_t* s;
char
buf[1024];
int
events;
int
fd;
int
r;
int
max_fd;
stream = arg;
s = stream->select;
fd = s->fd;
if
(fd > s->int_fd)
max_fd = fd;
else
max_fd = s->int_fd;
while
(1) {
if
(uv_sem_trywait(&s->close_sem) == 0)
break
;
memset
(s->sread, 0, s->sread_sz);
memset
(s->swrite, 0, s->swrite_sz);
if
(uv__io_active(&stream->io_watcher, POLLIN))
FD_SET(fd, s->sread);
if
(uv__io_active(&stream->io_watcher, POLLOUT))
FD_SET(fd, s->swrite);
FD_SET(s->int_fd, s->sread);
r = select(max_fd + 1, s->sread, s->swrite, NULL, NULL);
if
(r == -1) {
if
(
errno
== EINTR)
continue
;
abort
();
}
if
(r == 0)
continue
;
if
(FD_ISSET(s->int_fd, s->sread))
while
(1) {
r = read(s->int_fd, buf,
sizeof
(buf));
if
(r ==
sizeof
(buf))
continue
;
if
(r != -1)
break
;
if
(
errno
== EAGAIN ||
errno
== EWOULDBLOCK)
break
;
if
(
errno
== EINTR)
continue
;
abort
();
}
events = 0;
if
(FD_ISSET(fd, s->sread))
events |= POLLIN;
if
(FD_ISSET(fd, s->swrite))
events |= POLLOUT;
assert
(events != 0 || FD_ISSET(s->int_fd, s->sread));
if
(events != 0) {
ACCESS_ONCE(
int
, s->events) = events;
uv_async_send(&s->async);
uv_sem_wait(&s->async_sem);
assert
((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING));
}
}
}
static
void
uv__stream_osx_select_cb(uv_async_t* handle) {
uv__stream_select_t* s;
uv_stream_t* stream;
int
events;
s = container_of(handle, uv__stream_select_t, async);
stream = s->stream;
events = s->events;
ACCESS_ONCE(
int
, s->events) = 0;
assert
(events != 0);
assert
(events == (events & (POLLIN | POLLOUT)));
if
((events & POLLIN) && uv__io_active(&stream->io_watcher, POLLIN))
uv__stream_io(stream->loop, &stream->io_watcher, POLLIN);
if
((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT))
uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT);
if
(stream->flags & UV_HANDLE_CLOSING)
return
;
uv_sem_post(&s->async_sem);
}
static
void
uv__stream_osx_cb_close(uv_handle_t* async) {
uv__stream_select_t* s;
s = container_of(async, uv__stream_select_t, async);
uv__free(s);
}
int
uv__stream_try_select(uv_stream_t* stream,
int
* fd) {
struct
kevent filter[1];
struct
kevent events[1];
struct
timespec timeout;
uv__stream_select_t* s;
int
fds[2];
int
err;
int
ret;
int
kq;
int
old_fd;
int
max_fd;
size_t
sread_sz;
size_t
swrite_sz;
kq = kqueue();
if
(kq == -1) {
perror
(
"(libuv) kqueue()"
);
return
UV__ERR(
errno
);
}
EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
timeout.tv_sec = 0;
timeout.tv_nsec = 1;
do
ret = kevent(kq, filter, 1, events, 1, &timeout);
while
(ret == -1 &&
errno
== EINTR);
uv__close(kq);
if
(ret == -1)
return
UV__ERR(
errno
);
if
(ret == 0 || (events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL)
return
0;
if
(socketpair(AF_UNIX, SOCK_STREAM, 0, fds))
return
UV__ERR(
errno
);
max_fd = *fd;
if
(fds[1] > max_fd)
max_fd = fds[1];
sread_sz = ROUND_UP(max_fd + 1,
sizeof
(uint32_t) * NBBY) / NBBY;
swrite_sz = sread_sz;
s = uv__malloc(
sizeof
(*s) + sread_sz + swrite_sz);
if
(s == NULL) {
err = UV_ENOMEM;
goto
failed_malloc;
}
s->events = 0;
s->fd = *fd;
s->sread = (fd_set*) ((
char
*) s +
sizeof
(*s));
s->sread_sz = sread_sz;
s->swrite = (fd_set*) ((
char
*) s->sread + sread_sz);
s->swrite_sz = swrite_sz;
err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb);
if
(err)
goto
failed_async_init;
s->async.flags |= UV_HANDLE_INTERNAL;
uv__handle_unref(&s->async);
err = uv_sem_init(&s->close_sem, 0);
if
(err != 0)
goto
failed_close_sem_init;
err = uv_sem_init(&s->async_sem, 0);
if
(err != 0)
goto
failed_async_sem_init;
s->fake_fd = fds[0];
s->int_fd = fds[1];
old_fd = *fd;
s->stream = stream;
stream->select = s;
*fd = s->fake_fd;
err = uv_thread_create(&s->
thread
, uv__stream_osx_select, stream);
if
(err != 0)
goto
failed_thread_create;
return
0;
failed_thread_create:
s->stream = NULL;
stream->select = NULL;
*fd = old_fd;
uv_sem_destroy(&s->async_sem);
failed_async_sem_init:
uv_sem_destroy(&s->close_sem);
failed_close_sem_init:
uv__close(fds[0]);
uv__close(fds[1]);
uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
return
err;
failed_async_init:
uv__free(s);
failed_malloc:
uv__close(fds[0]);
uv__close(fds[1]);
return
err;
}
#endif /* defined(__APPLE__) */
int
uv__stream_open(uv_stream_t* stream,
int
fd,
int
flags) {
#if defined(__APPLE__)
int
enable;
#endif
if
(!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
return
UV_EBUSY;
assert
(fd >= 0);
stream->flags |= flags;
if
(stream->type == UV_TCP) {
if
((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
return
UV__ERR(
errno
);
if
((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
uv__tcp_keepalive(fd, 1, 60)) {
return
UV__ERR(
errno
);
}
}
#if defined(__APPLE__)
enable = 1;
if
(setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable,
sizeof
(enable)) &&
errno
!= ENOTSOCK &&
errno
!= EINVAL) {
return
UV__ERR(
errno
);
}
#endif
stream->io_watcher.fd = fd;
return
0;
}
void
uv__stream_flush_write_queue(uv_stream_t* stream,
int
error) {
uv_write_t* req;
QUEUE* q;
while
(!QUEUE_EMPTY(&stream->write_queue)) {
q = QUEUE_HEAD(&stream->write_queue);
QUEUE_REMOVE(q);
req = QUEUE_DATA(q, uv_write_t, queue);
req->error = error;
QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
}
}
void
uv__stream_destroy(uv_stream_t* stream) {
assert
(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT));
assert
(stream->flags & UV_HANDLE_CLOSED);
if
(stream->connect_req) {
uv__req_unregister(stream->loop, stream->connect_req);
stream->connect_req->cb(stream->connect_req, UV_ECANCELED);
stream->connect_req = NULL;
}
uv__stream_flush_write_queue(stream, UV_ECANCELED);
uv__write_callbacks(stream);
if
(stream->shutdown_req) {
uv__req_unregister(stream->loop, stream->shutdown_req);
stream->shutdown_req->cb(stream->shutdown_req, UV_ECANCELED);
stream->shutdown_req = NULL;
}
assert
(stream->write_queue_size == 0);
}
static
int
uv__emfile_trick(uv_loop_t* loop,
int
accept_fd) {
int
err;
int
emfile_fd;
if
(loop->emfile_fd == -1)
return
UV_EMFILE;
uv__close(loop->emfile_fd);
loop->emfile_fd = -1;
do
{
err = uv__accept(accept_fd);
if
(err >= 0)
uv__close(err);
}
while
(err >= 0 || err == UV_EINTR);
emfile_fd = uv__open_cloexec(
"/"
, O_RDONLY);
if
(emfile_fd >= 0)
loop->emfile_fd = emfile_fd;
return
err;
}
#if defined(UV_HAVE_KQUEUE)
# define UV_DEC_BACKLOG(w) w->rcount--;
#else
# define UV_DEC_BACKLOG(w) /* no-op */
#endif /* defined(UV_HAVE_KQUEUE) */
void
uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned
int
events) {
uv_stream_t* stream;
int
err;
stream = container_of(w, uv_stream_t, io_watcher);
assert
(events & POLLIN);
assert
(stream->accepted_fd == -1);
assert
(!(stream->flags & UV_HANDLE_CLOSING));
uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
while
(uv__stream_fd(stream) != -1) {
assert
(stream->accepted_fd == -1);
#if defined(UV_HAVE_KQUEUE)
if
(w->rcount <= 0)
return
;
#endif /* defined(UV_HAVE_KQUEUE) */
err = uv__accept(uv__stream_fd(stream));
if
(err < 0) {
if
(err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
return
;
if
(err == UV_ECONNABORTED)
continue
;
if
(err == UV_EMFILE || err == UV_ENFILE) {
err = uv__emfile_trick(loop, uv__stream_fd(stream));
if
(err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
break
;
}
stream->connection_cb(stream, err);
continue
;
}
UV_DEC_BACKLOG(w)
stream->accepted_fd = err;
stream->connection_cb(stream, 0);
if
(stream->accepted_fd != -1) {
uv__io_stop(loop, &stream->io_watcher, POLLIN);
return
;
}
if
(stream->type == UV_TCP &&
(stream->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {
struct
timespec timeout = { 0, 1 };
nanosleep(&timeout, NULL);
}
}
}
#undef UV_DEC_BACKLOG
int
uv_accept(uv_stream_t* server, uv_stream_t* client) {
int
err;
assert
(server->loop == client->loop);
if
(server->accepted_fd == -1)
return
UV_EAGAIN;
switch
(client->type) {
case
UV_NAMED_PIPE:
case
UV_TCP:
err = uv__stream_open(client,
server->accepted_fd,
UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
if
(err) {
uv__close(server->accepted_fd);
goto
done;
}
break
;
case
UV_UDP:
err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
if
(err) {
uv__close(server->accepted_fd);
goto
done;
}
break
;
default
:
return
UV_EINVAL;
}
client->flags |= UV_HANDLE_BOUND;
done:
if
(server->queued_fds != NULL) {
uv__stream_queued_fds_t* queued_fds;
queued_fds = server->queued_fds;
server->accepted_fd = queued_fds->fds[0];
assert
(queued_fds->offset > 0);
if
(--queued_fds->offset == 0) {
uv__free(queued_fds);
server->queued_fds = NULL;
}
else
{
memmove
(queued_fds->fds,
queued_fds->fds + 1,
queued_fds->offset *
sizeof
(*queued_fds->fds));
}
}
else
{
server->accepted_fd = -1;
if
(err == 0)
uv__io_start(server->loop, &server->io_watcher, POLLIN);
}
return
err;
}
int
uv_listen(uv_stream_t* stream,
int
backlog, uv_connection_cb cb) {
int
err;
switch
(stream->type) {
case
UV_TCP:
err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
break
;
case
UV_NAMED_PIPE:
err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb);
break
;
default
:
err = UV_EINVAL;
}
if
(err == 0)
uv__handle_start(stream);
return
err;
}
static
void
uv__drain(uv_stream_t* stream) {
uv_shutdown_t* req;
int
err;
assert
(QUEUE_EMPTY(&stream->write_queue));
uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
uv__stream_osx_interrupt_select(stream);
if
((stream->flags & UV_HANDLE_SHUTTING) &&
!(stream->flags & UV_HANDLE_CLOSING) &&
!(stream->flags & UV_HANDLE_SHUT)) {
assert
(stream->shutdown_req);
req = stream->shutdown_req;
stream->shutdown_req = NULL;
stream->flags &= ~UV_HANDLE_SHUTTING;
uv__req_unregister(stream->loop, req);
err = 0;
if
(shutdown(uv__stream_fd(stream), SHUT_WR))
err = UV__ERR(
errno
);
if
(err == 0)
stream->flags |= UV_HANDLE_SHUT;
if
(req->cb != NULL)
req->cb(req, err);
}
}
static
ssize_t uv__writev(
int
fd,
struct
iovec* vec,
size_t
n) {
if
(n == 1)
return
write(fd, vec->iov_base, vec->iov_len);
else
return
writev(fd, vec, n);
}
static
size_t
uv__write_req_size(uv_write_t* req) {
size_t
size;
assert
(req->bufs != NULL);
size = uv__count_bufs(req->bufs + req->write_index,
req->nbufs - req->write_index);
assert
(req->handle->write_queue_size >= size);
return
size;
}
static
int
uv__write_req_update(uv_stream_t* stream,
uv_write_t* req,
size_t
n) {
uv_buf_t* buf;
size_t
len;
assert
(n <= stream->write_queue_size);
stream->write_queue_size -= n;
buf = req->bufs + req->write_index;
do
{
len = n < buf->len ? n : buf->len;
buf->base += len;
buf->len -= len;
buf += (buf->len == 0);
n -= len;
}
while
(n > 0);
req->write_index = buf - req->bufs;
return
req->write_index == req->nbufs;
}
static
void
uv__write_req_finish(uv_write_t* req) {
uv_stream_t* stream = req->handle;
QUEUE_REMOVE(&req->queue);
if
(req->error == 0) {
if
(req->bufs != req->bufsml)
uv__free(req->bufs);
req->bufs = NULL;
}
QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
uv__io_feed(stream->loop, &stream->io_watcher);
}
static
int
uv__handle_fd(uv_handle_t* handle) {
switch
(handle->type) {
case
UV_NAMED_PIPE:
case
UV_TCP:
return
((uv_stream_t*) handle)->io_watcher.fd;
case
UV_UDP:
return
((uv_udp_t*) handle)->io_watcher.fd;
default
:
return
-1;
}
}
static
void
uv__write(uv_stream_t* stream) {
struct
iovec* iov;
QUEUE* q;
uv_write_t* req;
int
iovmax;
int
iovcnt;
ssize_t n;
int
err;
start:
assert
(uv__stream_fd(stream) >= 0);
if
(QUEUE_EMPTY(&stream->write_queue))
return
;
q = QUEUE_HEAD(&stream->write_queue);
req = QUEUE_DATA(q, uv_write_t, queue);
assert
(req->handle == stream);
assert
(
sizeof
(uv_buf_t) ==
sizeof
(
struct
iovec));
iov = (
struct
iovec*) &(req->bufs[req->write_index]);
iovcnt = req->nbufs - req->write_index;
iovmax = uv__getiovmax();
if
(iovcnt > iovmax)
iovcnt = iovmax;
if
(req->send_handle) {
int
fd_to_send;
struct
msghdr msg;
struct
cmsghdr *cmsg;
union
{
char
data[64];
struct
cmsghdr alias;
} scratch;
if
(uv__is_closing(req->send_handle)) {
err = UV_EBADF;
goto
error;
}
fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle);
memset
(&scratch, 0,
sizeof
(scratch));
assert
(fd_to_send >= 0);
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = iovcnt;
msg.msg_flags = 0;
msg.msg_control = &scratch.alias;
msg.msg_controllen = CMSG_SPACE(
sizeof
(fd_to_send));
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(
sizeof
(fd_to_send));
{
void
* pv = CMSG_DATA(cmsg);
int
* pi = pv;
*pi = fd_to_send;
}
do
n = sendmsg(uv__stream_fd(stream), &msg, 0);
while
(n == -1 && RETRY_ON_WRITE_ERROR(
errno
));
if
(n >= 0)
req->send_handle = NULL;
}
else
{
do
n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
while
(n == -1 && RETRY_ON_WRITE_ERROR(
errno
));
}
if
(n == -1 && !IS_TRANSIENT_WRITE_ERROR(
errno
, req->send_handle)) {
err = UV__ERR(
errno
);
goto
error;
}
if
(n >= 0 && uv__write_req_update(stream, req, n)) {
uv__write_req_finish(req);
return
;
}
if
(stream->flags & UV_HANDLE_BLOCKING_WRITES)
goto
start;
uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
uv__stream_osx_interrupt_select(stream);
return
;
error:
req->error = err;
uv__write_req_finish(req);
uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
if
(!uv__io_active(&stream->io_watcher, POLLIN))
uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
}
static
void
uv__write_callbacks(uv_stream_t* stream) {
uv_write_t* req;
QUEUE* q;
QUEUE pq;
if
(QUEUE_EMPTY(&stream->write_completed_queue))
return
;
QUEUE_MOVE(&stream->write_completed_queue, &pq);
while
(!QUEUE_EMPTY(&pq)) {
q = QUEUE_HEAD(&pq);
req = QUEUE_DATA(q, uv_write_t, queue);
QUEUE_REMOVE(q);
uv__req_unregister(stream->loop, req);
if
(req->bufs != NULL) {
stream->write_queue_size -= uv__write_req_size(req);
if
(req->bufs != req->bufsml)
uv__free(req->bufs);
req->bufs = NULL;
}
if
(req->cb)
req->cb(req, req->error);
}
}
uv_handle_type uv__handle_type(
int
fd) {
struct
sockaddr_storage ss;
socklen_t sslen;
socklen_t len;
int
type;
memset
(&ss, 0,
sizeof
(ss));
sslen =
sizeof
(ss);
if
(getsockname(fd, (
struct
sockaddr*)&ss, &sslen))
return
UV_UNKNOWN_HANDLE;
len =
sizeof
type;
if
(getsockopt(fd, SOL_SOCKET, SO_TYPE, &type, &len))
return
UV_UNKNOWN_HANDLE;
if
(type == SOCK_STREAM) {
#if defined(_AIX) || defined(__DragonFly__)
if
(sslen == 0)
return
UV_NAMED_PIPE;
#endif
switch
(ss.ss_family) {
case
AF_UNIX:
return
UV_NAMED_PIPE;
case
AF_INET:
case
AF_INET6:
return
UV_TCP;
}
}
if
(type == SOCK_DGRAM &&
(ss.ss_family == AF_INET || ss.ss_family == AF_INET6))
return
UV_UDP;
return
UV_UNKNOWN_HANDLE;
}
static
void
uv__stream_eof(uv_stream_t* stream,
const
uv_buf_t* buf) {
stream->flags |= UV_HANDLE_READ_EOF;
uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
if
(!uv__io_active(&stream->io_watcher, POLLOUT))
uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
stream->read_cb(stream, UV_EOF, buf);
stream->flags &= ~UV_HANDLE_READING;
}
static
int
uv__stream_queue_fd(uv_stream_t* stream,
int
fd) {
uv__stream_queued_fds_t* queued_fds;
unsigned
int
queue_size;
queued_fds = stream->queued_fds;
if
(queued_fds == NULL) {
queue_size = 8;
queued_fds = uv__malloc((queue_size - 1) *
sizeof
(*queued_fds->fds) +
sizeof
(*queued_fds));
if
(queued_fds == NULL)
return
UV_ENOMEM;
queued_fds->size = queue_size;
queued_fds->offset = 0;
stream->queued_fds = queued_fds;
}
else
if
(queued_fds->size == queued_fds->offset) {
queue_size = queued_fds->size + 8;
queued_fds = uv__realloc(queued_fds,
(queue_size - 1) *
sizeof
(*queued_fds->fds) +
sizeof
(*queued_fds));
if
(queued_fds == NULL)
return
UV_ENOMEM;
queued_fds->size = queue_size;
stream->queued_fds = queued_fds;
}
queued_fds->fds[queued_fds->offset++] = fd;
return
0;
}
#define UV__CMSG_FD_COUNT 64
#define UV__CMSG_FD_SIZE (UV__CMSG_FD_COUNT * sizeof(int))
static
int
uv__stream_recv_cmsg(uv_stream_t* stream,
struct
msghdr* msg) {
struct
cmsghdr* cmsg;
for
(cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) {
char
* start;
char
* end;
int
err;
void
* pv;
int
* pi;
unsigned
int
i;
unsigned
int
count;
if
(cmsg->cmsg_type != SCM_RIGHTS) {
fprintf
(stderr,
"ignoring non-SCM_RIGHTS ancillary data: %d\n"
,
cmsg->cmsg_type);
continue
;
}
pv = CMSG_DATA(cmsg);
pi = pv;
start = (
char
*) cmsg;
end = (
char
*) cmsg + cmsg->cmsg_len;
count = 0;
while
(start + CMSG_LEN(count *
sizeof
(*pi)) < end)
count++;
assert
(start + CMSG_LEN(count *
sizeof
(*pi)) == end);
for
(i = 0; i < count; i++) {
if
(stream->accepted_fd != -1) {
err = uv__stream_queue_fd(stream, pi[i]);
if
(err != 0) {
for
(; i < count; i++)
uv__close(pi[i]);
return
err;
}
}
else
{
stream->accepted_fd = pi[i];
}
}
}
return
0;
}
#ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wgnu-folding-constant"
# pragma clang diagnostic ignored "-Wvla-extension"
#endif
static
void
uv__read(uv_stream_t* stream) {
uv_buf_t buf;
ssize_t nread;
struct
msghdr msg;
char
cmsg_space[CMSG_SPACE(UV__CMSG_FD_SIZE)];
int
count;
int
err;
int
is_ipc;
stream->flags &= ~UV_HANDLE_READ_PARTIAL;
count = 32;
is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc;
while
(stream->read_cb
&& (stream->flags & UV_HANDLE_READING)
&& (count-- > 0)) {
assert
(stream->alloc_cb != NULL);
buf = uv_buf_init(NULL, 0);
stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
if
(buf.base == NULL || buf.len == 0) {
stream->read_cb(stream, UV_ENOBUFS, &buf);
return
;
}
assert
(buf.base != NULL);
assert
(uv__stream_fd(stream) >= 0);
if
(!is_ipc) {
do
{
nread = read(uv__stream_fd(stream), buf.base, buf.len);
}
while
(nread < 0 &&
errno
== EINTR);
}
else
{
msg.msg_flags = 0;
msg.msg_iov = (
struct
iovec*) &buf;
msg.msg_iovlen = 1;
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_controllen =
sizeof
(cmsg_space);
msg.msg_control = cmsg_space;
do
{
nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
}
while
(nread < 0 &&
errno
== EINTR);
}
if
(nread < 0) {
if
(
errno
== EAGAIN ||
errno
== EWOULDBLOCK) {
if
(stream->flags & UV_HANDLE_READING) {
uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
uv__stream_osx_interrupt_select(stream);
}
stream->read_cb(stream, 0, &buf);
#if defined(__CYGWIN__) || defined(__MSYS__)
}
else
if
(
errno
== ECONNRESET && stream->type == UV_NAMED_PIPE) {
uv__stream_eof(stream, &buf);
return
;
#endif
}
else
{
stream->read_cb(stream, UV__ERR(
errno
), &buf);
if
(stream->flags & UV_HANDLE_READING) {
stream->flags &= ~UV_HANDLE_READING;
uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
if
(!uv__io_active(&stream->io_watcher, POLLOUT))
uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
}
}
return
;
}
else
if
(nread == 0) {
uv__stream_eof(stream, &buf);
return
;
}
else
{
ssize_t buflen = buf.len;
if
(is_ipc) {
err = uv__stream_recv_cmsg(stream, &msg);
if
(err != 0) {
stream->read_cb(stream, err, &buf);
return
;
}
}
#if defined(__MVS__)
if
(is_ipc && msg.msg_controllen > 0) {
uv_buf_t blankbuf;
int
nread;
struct
iovec *old;
blankbuf.base = 0;
blankbuf.len = 0;
old = msg.msg_iov;
msg.msg_iov = (
struct
iovec*) &blankbuf;
nread = 0;
do
{
nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
err = uv__stream_recv_cmsg(stream, &msg);
if
(err != 0) {
stream->read_cb(stream, err, &buf);
msg.msg_iov = old;
return
;
}
}
while
(nread == 0 && msg.msg_controllen > 0);
msg.msg_iov = old;
}
#endif
stream->read_cb(stream, nread, &buf);
if
(nread < buflen) {
stream->flags |= UV_HANDLE_READ_PARTIAL;
return
;
}
}
}
}
#ifdef __clang__
# pragma clang diagnostic pop
#endif
#undef UV__CMSG_FD_COUNT
#undef UV__CMSG_FD_SIZE
int
uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
assert
(stream->type == UV_TCP ||
stream->type == UV_TTY ||
stream->type == UV_NAMED_PIPE);
if
(!(stream->flags & UV_HANDLE_WRITABLE) ||
stream->flags & UV_HANDLE_SHUT ||
stream->flags & UV_HANDLE_SHUTTING ||
uv__is_closing(stream)) {
return
UV_ENOTCONN;
}
assert
(uv__stream_fd(stream) >= 0);
uv__req_init(stream->loop, req, UV_SHUTDOWN);
req->handle = stream;
req->cb = cb;
stream->shutdown_req = req;
stream->flags |= UV_HANDLE_SHUTTING;
uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
uv__stream_osx_interrupt_select(stream);
return
0;
}
static
void
uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned
int
events) {
uv_stream_t* stream;
stream = container_of(w, uv_stream_t, io_watcher);
assert
(stream->type == UV_TCP ||
stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY);
assert
(!(stream->flags & UV_HANDLE_CLOSING));
if
(stream->connect_req) {
uv__stream_connect(stream);
return
;
}
assert
(uv__stream_fd(stream) >= 0);
if
(events & (POLLIN | POLLERR | POLLHUP))
uv__read(stream);
if
(uv__stream_fd(stream) == -1)
return
;
if
((events & POLLHUP) &&
(stream->flags & UV_HANDLE_READING) &&
(stream->flags & UV_HANDLE_READ_PARTIAL) &&
!(stream->flags & UV_HANDLE_READ_EOF)) {
uv_buf_t buf = { NULL, 0 };
uv__stream_eof(stream, &buf);
}
if
(uv__stream_fd(stream) == -1)
return
;
if
(events & (POLLOUT | POLLERR | POLLHUP)) {
uv__write(stream);
uv__write_callbacks(stream);
if
(QUEUE_EMPTY(&stream->write_queue))
uv__drain(stream);
}
}
static
void
uv__stream_connect(uv_stream_t* stream) {
int
error;
uv_connect_t* req = stream->connect_req;
socklen_t errorsize =
sizeof
(
int
);
assert
(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
assert
(req);
if
(stream->delayed_error) {
error = stream->delayed_error;
stream->delayed_error = 0;
}
else
{
assert
(uv__stream_fd(stream) >= 0);
getsockopt(uv__stream_fd(stream),
SOL_SOCKET,
SO_ERROR,
&error,
&errorsize);
error = UV__ERR(error);
}
if
(error == UV__ERR(EINPROGRESS))
return
;
stream->connect_req = NULL;
uv__req_unregister(stream->loop, req);
if
(error < 0 || QUEUE_EMPTY(&stream->write_queue)) {
uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
}
if
(req->cb)
req->cb(req, error);
if
(uv__stream_fd(stream) == -1)
return
;
if
(error < 0) {
uv__stream_flush_write_queue(stream, UV_ECANCELED);
uv__write_callbacks(stream);
}
}
int
uv_write2(uv_write_t* req,
uv_stream_t* stream,
const
uv_buf_t bufs[],
unsigned
int
nbufs,
uv_stream_t* send_handle,
uv_write_cb cb) {
int
empty_queue;
assert
(nbufs > 0);
assert
((stream->type == UV_TCP ||
stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY) &&
"uv_write (unix) does not yet support other types of streams"
);
if
(uv__stream_fd(stream) < 0)
return
UV_EBADF;
if
(!(stream->flags & UV_HANDLE_WRITABLE))
return
-EPIPE;
if
(send_handle) {
if
(stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
return
UV_EINVAL;
if
(uv__handle_fd((uv_handle_t*) send_handle) < 0)
return
UV_EBADF;
#if defined(__CYGWIN__) || defined(__MSYS__)
return
UV_ENOSYS;
#endif
}
empty_queue = (stream->write_queue_size == 0);
uv__req_init(stream->loop, req, UV_WRITE);
req->cb = cb;
req->handle = stream;
req->error = 0;
req->send_handle = send_handle;
QUEUE_INIT(&req->queue);
req->bufs = req->bufsml;
if
(nbufs > ARRAY_SIZE(req->bufsml))
req->bufs = uv__malloc(nbufs *
sizeof
(bufs[0]));
if
(req->bufs == NULL)
return
UV_ENOMEM;
memcpy
(req->bufs, bufs, nbufs *
sizeof
(bufs[0]));
req->nbufs = nbufs;
req->write_index = 0;
stream->write_queue_size += uv__count_bufs(bufs, nbufs);
QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
if
(stream->connect_req) {
}
else
if
(empty_queue) {
uv__write(stream);
}
else
{
assert
(!(stream->flags & UV_HANDLE_BLOCKING_WRITES));
uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
uv__stream_osx_interrupt_select(stream);
}
return
0;
}
int
uv_write(uv_write_t* req,
uv_stream_t* handle,
const
uv_buf_t bufs[],
unsigned
int
nbufs,
uv_write_cb cb) {
return
uv_write2(req, handle, bufs, nbufs, NULL, cb);
}
void
uv_try_write_cb(uv_write_t* req,
int
status) {
abort
();
}
int
uv_try_write(uv_stream_t* stream,
const
uv_buf_t bufs[],
unsigned
int
nbufs) {
int
r;
int
has_pollout;
size_t
written;
size_t
req_size;
uv_write_t req;
if
(stream->connect_req != NULL || stream->write_queue_size != 0)
return
UV_EAGAIN;
has_pollout = uv__io_active(&stream->io_watcher, POLLOUT);
r = uv_write(&req, stream, bufs, nbufs, uv_try_write_cb);
if
(r != 0)
return
r;
written = uv__count_bufs(bufs, nbufs);
if
(req.bufs != NULL)
req_size = uv__write_req_size(&req);
else
req_size = 0;
written -= req_size;
stream->write_queue_size -= req_size;
QUEUE_REMOVE(&req.queue);
uv__req_unregister(stream->loop, &req);
if
(req.bufs != req.bufsml)
uv__free(req.bufs);
req.bufs = NULL;
if
(!has_pollout) {
uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
uv__stream_osx_interrupt_select(stream);
}
if
(written == 0 && req_size != 0)
return
UV_EAGAIN;
else
return
written;
}
int
uv_read_start(uv_stream_t* stream,
uv_alloc_cb alloc_cb,
uv_read_cb read_cb) {
assert
(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY);
if
(stream->flags & UV_HANDLE_CLOSING)
return
UV_EINVAL;
if
(!(stream->flags & UV_HANDLE_READABLE))
return
-ENOTCONN;
stream->flags |= UV_HANDLE_READING;
assert
(uv__stream_fd(stream) >= 0);
assert
(alloc_cb);
stream->read_cb = read_cb;
stream->alloc_cb = alloc_cb;
uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
uv__handle_start(stream);
uv__stream_osx_interrupt_select(stream);
return
0;
}
int
uv_read_stop(uv_stream_t* stream) {
if
(!(stream->flags & UV_HANDLE_READING))
return
0;
stream->flags &= ~UV_HANDLE_READING;
uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
if
(!uv__io_active(&stream->io_watcher, POLLOUT))
uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
stream->read_cb = NULL;
stream->alloc_cb = NULL;
return
0;
}
int
uv_is_readable(
const
uv_stream_t* stream) {
return
!!(stream->flags & UV_HANDLE_READABLE);
}
int
uv_is_writable(
const
uv_stream_t* stream) {
return
!!(stream->flags & UV_HANDLE_WRITABLE);
}
#if defined(__APPLE__)
int
uv___stream_fd(
const
uv_stream_t* handle) {
const
uv__stream_select_t* s;
assert
(handle->type == UV_TCP ||
handle->type == UV_TTY ||
handle->type == UV_NAMED_PIPE);
s = handle->select;
if
(s != NULL)
return
s->fd;
return
handle->io_watcher.fd;
}
#endif /* defined(__APPLE__) */
void
uv__stream_close(uv_stream_t* handle) {
unsigned
int
i;
uv__stream_queued_fds_t* queued_fds;
#if defined(__APPLE__)
if
(handle->select != NULL) {
uv__stream_select_t* s;
s = handle->select;
uv_sem_post(&s->close_sem);
uv_sem_post(&s->async_sem);
uv__stream_osx_interrupt_select(handle);
uv_thread_join(&s->
thread
);
uv_sem_destroy(&s->close_sem);
uv_sem_destroy(&s->async_sem);
uv__close(s->fake_fd);
uv__close(s->int_fd);
uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
handle->select = NULL;
}
#endif /* defined(__APPLE__) */
uv__io_close(handle->loop, &handle->io_watcher);
uv_read_stop(handle);
uv__handle_stop(handle);
handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
if
(handle->io_watcher.fd != -1) {
if
(handle->io_watcher.fd > STDERR_FILENO)
uv__close(handle->io_watcher.fd);
handle->io_watcher.fd = -1;
}
if
(handle->accepted_fd != -1) {
uv__close(handle->accepted_fd);
handle->accepted_fd = -1;
}
if
(handle->queued_fds != NULL) {
queued_fds = handle->queued_fds;
for
(i = 0; i < queued_fds->offset; i++)
uv__close(queued_fds->fds[i]);
uv__free(handle->queued_fds);
handle->queued_fds = NULL;
}
assert
(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
}
int
uv_stream_set_blocking(uv_stream_t* handle,
int
blocking) {
return
uv__nonblock(uv__stream_fd(handle), !blocking);
}