summaryrefslogtreecommitdiffstats
path: root/socket.c
diff options
context:
space:
mode:
authorKaz Kylheku <kaz@kylheku.com>2016-03-06 16:36:58 -0800
committerKaz Kylheku <kaz@kylheku.com>2016-03-06 16:36:58 -0800
commite396824ed2d6514a62302c5e5591cacfbb8cd29b (patch)
treeebae64c34700f4e9aa46292f6797f5ad189ec91d /socket.c
parent59b9b5d4f8499b45deaec7cc89ca7d3a54a9c215 (diff)
downloadtxr-e396824ed2d6514a62302c5e5591cacfbb8cd29b.tar.gz
txr-e396824ed2d6514a62302c5e5591cacfbb8cd29b.tar.bz2
txr-e396824ed2d6514a62302c5e5591cacfbb8cd29b.zip
Special implementation of dgram socket streams.
* socket.c (struct dgram_stream): New struct. (make_dgram_sock_stream, dgram_print, dgram_mark, dgram_destroy, dgram_overflow, dgram_put_byte_callback, dgram_put_string, dgram_put_char, dgram_put_byte, dgram_get_byte_callback, dgram_get_char, dgram_get_byte, dgram_unget_char, dgram_unget_byte, dgram_flush, dgram_close, dgram_get_prop, dgram_get_fd, dgram_get_sock_family, dgram_get_sock_type, dgram_get_sock_peer, dgram_set_sock_peer, dgram_get_error, dgram_get_error_str, dgram_clear_error): New static functions. (dgram_strm_ops): New static structure. (sock_listen): Check against datagram sockets which have a peer; otherwise a no-op for datagram sockets. (sock_accept): Special logic for dgram sockets. (open_sockfd): Function moved here from stream.c, and augmented to open dgram stream for dgram sockets. (open_socket): Function moved here from stream.c. (sock_load_init): Fill in some operations in dgram_strm_ops. * stream.c (generic_get_line, make_sock_stream, errno_to_string): Statics become external. (open_sockfd, open_socket): Functions removed from here, moved to socket.c. * stream.h (generic_get_line, make_sock_stream, errno_to_string): Declared.
Diffstat (limited to 'socket.c')
-rw-r--r--socket.c516
1 files changed, 488 insertions, 28 deletions
diff --git a/socket.c b/socket.c
index 49b531a8..e4bbb61f 100644
--- a/socket.c
+++ b/socket.c
@@ -33,9 +33,10 @@
#include <signal.h>
#include <dirent.h>
#include <errno.h>
-#include "config.h"
+#include <unistd.h>
#include <sys/un.h>
#include <netdb.h>
+#include "config.h"
#include ALLOCA_H
#include "lib.h"
#include "stream.h"
@@ -49,6 +50,23 @@
#include "arith.h"
#include "socket.h"
+struct dgram_stream {
+ struct strm_base a;
+ val stream;
+ val family;
+ val peer;
+ val unget_c;
+ utf8_decoder_t ud;
+ struct sockaddr_storage peer_addr;
+ socklen_t pa_len;
+ int fd;
+ int err;
+ mem_t *rx_buf;
+ mem_t *tx_buf;
+ int rx_size, rx_pos;
+ int tx_pos;
+};
+
val sockaddr_in_s, sockaddr_in6_s, sockaddr_un_s, addrinfo_s;
val flags_s, family_s, socktype_s, protocol_s, addr_s, canonname_s;
val port_s, flow_info_s, scope_id_s, path_s;
@@ -91,7 +109,6 @@ static void ipv6_scope_id_from_num(struct sockaddr_in6 *dst, val scope)
scope, nao);
}
-
static val sockaddr_in_out(struct sockaddr_in *src)
{
args_decl(args, ARGS_MIN);
@@ -235,6 +252,347 @@ static void sockaddr_in(val sockaddr, val family,
}
}
+static_forward(struct strm_ops dgram_strm_ops);
+
+static val make_dgram_sock_stream(int fd, val family, val peer,
+ mem_t *dgram, int dgram_size,
+ struct sockaddr *peer_addr, socklen_t pa_len)
+{
+ struct dgram_stream *d = coerce(struct dgram_stream *,
+ chk_malloc(sizeof *d));
+ val stream;
+
+ strm_base_init(&d->a);
+ d->stream = nil;
+ d->fd = fd;
+ d->family = d->peer = d->unget_c = nil;
+ d->err = 0;
+ d->rx_buf = dgram;
+ d->rx_size = dgram_size;
+ d->rx_pos = 0;
+ d->tx_buf = 0;
+ d->tx_pos = 0;
+ utf8_decoder_init(&d->ud);
+ if (peer_addr != 0)
+ memcpy(&d->peer_addr, peer_addr, pa_len);
+ d->pa_len = pa_len;
+ stream = cobj(coerce(mem_t *, d), stream_s, &dgram_strm_ops.cobj_ops);
+ d->stream = stream;
+ d->family = family;
+ d->peer = peer;
+ return stream;
+}
+
+static void dgram_print(val stream, val out, val pretty)
+{
+ struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle);
+ struct strm_ops *ops = coerce(struct strm_ops *, stream->co.ops);
+ val name = static_str(ops->name);
+
+ (void) pretty;
+
+ format(out, lit("#<~a ~s ~p>"), name, num(d->fd), stream, nao);
+}
+
+static void dgram_mark(val stream)
+{
+ struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle);
+ strm_base_mark(&d->a);
+ /* h->stream == stream and so no need to mark h->stream */
+ gc_mark(d->family);
+ gc_mark(d->peer);
+ gc_mark(d->unget_c);
+}
+
+static void dgram_destroy(val stream)
+{
+ struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle);
+ free(d->rx_buf);
+ free(d->tx_buf);
+ d->rx_buf = d->tx_buf = 0;
+}
+
+static void dgram_overflow(val stream)
+{
+ /*
+ * Not called under present logic, because dgram_put_byte_callback keeps
+ * increasing the datagram size.
+ */
+ struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle);
+ d->err = ENOBUFS;
+ uw_throwf(socket_error_s, lit("dgram write overflow on ~s"), stream, nao);
+}
+
+static int dgram_put_byte_callback(int b, mem_t *ctx)
+{
+ struct dgram_stream *d = coerce(struct dgram_stream *, ctx);
+ d->tx_buf = chk_manage_vec(d->tx_buf, d->tx_pos, d->tx_pos + 1, 1, 0);
+ d->tx_buf[d->tx_pos++] = b;
+ return 1;
+}
+
+static val dgram_put_string(val stream, val str)
+{
+ struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle);
+ const wchar_t *s = c_str(str);
+
+ while (*s) {
+ if (!utf8_encode(*s++, dgram_put_byte_callback, coerce(mem_t *, d)))
+ dgram_overflow(stream);
+ }
+
+ return t;
+}
+
+static val dgram_put_char(val stream, val ch)
+{
+ struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle);
+ if (!utf8_encode(c_chr(ch), dgram_put_byte_callback, coerce(mem_t *, d)))
+ dgram_overflow(stream);
+ return t;
+}
+
+static val dgram_put_byte(val stream, int b)
+{
+ struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle);
+ if (!dgram_put_byte_callback(b, coerce(mem_t *, d)))
+ dgram_overflow(stream);
+ return t;
+}
+
+static int dgram_get_byte_callback(mem_t *ctx)
+{
+ struct dgram_stream *d = coerce(struct dgram_stream *, ctx);
+ if (d->rx_buf) {
+ return (d->rx_pos < d->rx_size) ? d->rx_buf[d->rx_pos++] : EOF;
+ } else {
+ const int dgram_size = 65536;
+ mem_t *dgram = chk_malloc(dgram_size);
+ ssize_t nbytes = -1;
+
+ uw_simple_catch_begin;
+
+ sig_save_enable;
+
+ nbytes = recv(d->fd, dgram, dgram_size, 0);
+
+ sig_restore_enable;
+
+ if (nbytes == -1) {
+ d->err = errno;
+ uw_throwf(socket_error_s,
+ lit("get-byte: recv on ~s failed: ~d/~s"),
+ d->stream, num(errno), string_utf8(strerror(errno)), nao);
+ }
+
+ uw_unwind {
+ if (nbytes == -1)
+ free(dgram);
+ }
+
+ uw_catch_end;
+
+ d->rx_buf = chk_realloc(dgram, nbytes);
+
+ if (!d->rx_buf)
+ d->rx_buf = dgram;
+
+ d->rx_size = nbytes;
+
+ return dgram_get_byte_callback(ctx);
+ }
+}
+
+static val dgram_get_char(val stream)
+{
+ struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle);
+ val uc = d->unget_c;
+ if (uc) {
+ d->unget_c = nil;
+ return uc;
+ } else {
+ wint_t ch = utf8_decode(&d->ud, dgram_get_byte_callback,
+ coerce(mem_t *, d));
+ return (ch != WEOF) ? chr(ch) : nil;
+ }
+}
+
+static val dgram_get_byte(val stream)
+{
+ struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle);
+ int b = dgram_get_byte_callback(coerce(mem_t *, d));
+ return b == EOF ? nil : num_fast(b);
+}
+
+static val dgram_unget_char(val stream, val ch)
+{
+ struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle);
+
+ if (d->unget_c){
+ d->err = EOVERFLOW;
+ uw_throwf(file_error_s, lit("unget-char overflow on ~a: "), stream, nao);
+ }
+
+ d->unget_c = ch;
+ return ch;
+}
+
+static val dgram_unget_byte(val stream, int byte)
+{
+ struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle);
+
+ if (d->rx_pos <= 0) {
+ d->err = EOVERFLOW;
+ uw_throwf(file_error_s,
+ lit("unget-byte: cannot push back past start of stream ~s"),
+ stream, nao);
+ }
+
+ d->rx_buf[--d->rx_pos] = byte;
+ return num_fast(byte);
+}
+
+static val dgram_flush(val stream)
+{
+ struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle);
+ if (d->fd != -1 && d->tx_buf) {
+ if (d->peer) {
+ int nwrit = sendto(d->fd, d->tx_buf, d->tx_pos, 0,
+ coerce(struct sockaddr *, &d->peer_addr), d->pa_len);
+
+ if (nwrit != d->tx_pos) {
+ d->err = (nwrit < 0) ? errno : ENOBUFS;
+ uw_throwf(socket_error_s,
+ lit("flush-stream: sendto on ~s ~a: ~d/~s"),
+ stream,
+ (nwrit < 0) ? lit("failed") : lit("truncated"),
+ num(errno), string_utf8(strerror(errno)), nao);
+ }
+
+ free(d->tx_buf);
+ d->tx_buf = 0;
+ d->tx_pos = 0;
+ } else {
+ d->err = ENOTCONN;
+ uw_throwf(socket_error_s,
+ lit("flush-stream: cannot transmit on ~s: peer not set"),
+ stream, nao);
+ }
+ }
+ return t;
+}
+
+static val dgram_close(val stream, val throw_on_error)
+{
+ struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle);
+ if (d->fd != -1) {
+ dgram_flush(stream);
+ close(d->fd);
+ d->fd = -1;
+ d->err = 0;
+ }
+
+ return t;
+}
+
+static val dgram_get_prop(val stream, val ind)
+{
+ struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle);
+
+ if (ind == fd_k)
+ return num(d->fd);
+
+ return nil;
+}
+
+static val dgram_get_error(val stream)
+{
+ struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle);
+ if (d->err)
+ return num(d->err);
+ if (d->rx_buf && d->rx_pos == d->rx_size)
+ return t;
+ return nil;
+}
+
+static val dgram_get_error_str(val stream)
+{
+ return errno_to_string(dgram_get_error(stream));
+}
+
+static val dgram_clear_error(val stream)
+{
+ val ret = dgram_get_error(stream);
+ struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle);
+
+ d->err = 0;
+
+ if (d->rx_pos == d->rx_size) {
+ d->rx_pos = d->rx_size = 0;
+ free(d->rx_buf);
+ d->rx_buf = 0;
+ }
+
+ return ret;
+}
+
+static val dgram_get_fd(val stream)
+{
+ struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle);
+ return num(d->fd);
+}
+
+static val dgram_get_sock_family(val stream)
+{
+ struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle);
+ return d->family;
+}
+
+static val dgram_get_sock_type(val stream)
+{
+ (void) stream;
+ return num_fast(SOCK_DGRAM);
+}
+
+static val dgram_get_sock_peer(val stream)
+{
+ struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle);
+ return d->peer;
+}
+
+static val dgram_set_sock_peer(val stream, val peer)
+{
+ struct dgram_stream *d = coerce(struct dgram_stream *, stream->co.handle);
+ sockaddr_in(peer, d->family, &d->peer_addr, &d->pa_len);
+ return d->peer = peer;
+}
+
+static_def(struct strm_ops dgram_strm_ops =
+ strm_ops_init(cobj_ops_init(eq,
+ dgram_print,
+ dgram_destroy,
+ dgram_mark,
+ cobj_hash_op),
+ wli("dgram-stream"),
+ dgram_put_string,
+ dgram_put_char,
+ dgram_put_byte,
+ generic_get_line,
+ dgram_get_char,
+ dgram_get_byte,
+ dgram_unget_char,
+ dgram_unget_byte,
+ dgram_close,
+ dgram_flush,
+ 0,
+ 0,
+ dgram_get_prop,
+ 0,
+ dgram_get_error,
+ dgram_get_error_str,
+ dgram_clear_error,
+ dgram_get_fd));
+
static val sock_bind(val sock, val sockaddr)
{
int sfd = c_num(stream_fd(sock));
@@ -274,49 +632,119 @@ static val sock_connect(val sock, val sockaddr)
static val sock_listen(val sock, val backlog)
{
- val sfd = stream_fd(sock);
+ if (sock_type(sock) == num_fast(SOCK_DGRAM)) {
+ if (sock_peer(sock)) {
+ errno = EISCONN;
+ goto failed;
+ }
+ } else {
+ val sfd = stream_fd(sock);
- if (listen(c_num(sfd), c_num(default_arg(backlog, num_fast(16)))))
- uw_throwf(socket_error_s, lit("sock-listen failed: ~d/~s"),
- num(errno), string_utf8(strerror(errno)), nao);
+ if (listen(c_num(sfd), c_num(default_arg(backlog, num_fast(16)))))
+ goto failed;
+ }
return t;
+failed:
+ uw_throwf(socket_error_s, lit("sock-listen failed: ~d/~s"),
+ num(errno), string_utf8(strerror(errno)), nao);
}
static val sock_accept(val sock, val mode_str)
{
val sfd = stream_fd(sock);
val family = sock_family(sock);
+ val type = sock_type(sock);
struct sockaddr_storage sa;
socklen_t salen;
- int afd;
- val peer;
+ val peer = nil;
+
+ if (type == num_fast(SOCK_DGRAM)) {
+ ssize_t nbytes = -1;
+ const int dgram_size = 65536;
+ mem_t *dgram = chk_malloc(dgram_size);
- sig_save_enable;
+ if (sock_peer(sock)) {
+ free(dgram);
+ errno = EISCONN;
+ goto failed;
+ }
- afd = accept(c_num(sfd), coerce(struct sockaddr *, &sa), &salen);
+ uw_simple_catch_begin;
- sig_restore_enable;
+ sig_save_enable;
- if (afd < 0)
- uw_throwf(socket_error_s, lit("accept failed: ~d/~s"),
- num(errno), string_utf8(strerror(errno)), nao);
+ nbytes = recvfrom(c_num(sfd), dgram, dgram_size, 0,
+ coerce(struct sockaddr *, &sa), &salen);
+
+ sig_restore_enable;
+
+ if (family == num_fast(AF_INET))
+ peer = sockaddr_in_out(coerce(struct sockaddr_in *, &sa));
+ else if (family == num_fast(AF_INET6))
+ peer = sockaddr_in6_out(coerce(struct sockaddr_in6 *, &sa));
+ else if (family == num_fast(AF_UNIX))
+ peer = unix_sockaddr_out(coerce(struct sockaddr_un *, &sa));
+ else {
+ free(dgram);
+ dgram = 0;
+ uw_throwf(socket_error_s, lit("sock-accept: ~s isn't a supported socket family"),
+ family, nao);
+ }
- if (family == num_fast(AF_INET))
- peer = sockaddr_in_out(coerce(struct sockaddr_in *, &sa));
- else if (family == num_fast(AF_INET6))
- peer = sockaddr_in6_out(coerce(struct sockaddr_in6 *, &sa));
- else if (family == num_fast(AF_UNIX))
- peer = unix_sockaddr_out(coerce(struct sockaddr_un *, &sa));
- else
- uw_throwf(socket_error_s, lit("accept: ~s isn't a supported socket family"),
- family, nao);
-
- {
- val stream = open_sockfd(num(afd), family, num_fast(SOCK_STREAM), mode_str);
- sock_set_peer(stream, peer);
- return stream;
+ uw_unwind {
+ if (nbytes == -1)
+ free(dgram);
+ }
+
+ uw_catch_end;
+
+ {
+ int afd = dup(c_num(sfd));
+ mem_t *shrink = chk_realloc(dgram, nbytes);
+ if (shrink)
+ dgram = shrink;
+
+ if (afd == -1) {
+ free(dgram);
+ dgram = 0;
+ uw_throwf(socket_error_s, lit("sock-accept: unable to "),
+ family, nao);
+ }
+
+ return make_dgram_sock_stream(afd, family, peer, dgram, nbytes,
+ coerce(struct sockaddr *, &sa), salen);
+ }
+ } else {
+ int afd = -1;
+ sig_save_enable;
+
+ afd = accept(c_num(sfd), coerce(struct sockaddr *, &sa), &salen);
+
+ sig_restore_enable;
+
+ if (afd < 0)
+ goto failed;
+
+ if (family == num_fast(AF_INET))
+ peer = sockaddr_in_out(coerce(struct sockaddr_in *, &sa));
+ else if (family == num_fast(AF_INET6))
+ peer = sockaddr_in6_out(coerce(struct sockaddr_in6 *, &sa));
+ else if (family == num_fast(AF_UNIX))
+ peer = unix_sockaddr_out(coerce(struct sockaddr_un *, &sa));
+ else
+ uw_throwf(socket_error_s, lit("accept: ~s isn't a supported socket family"),
+ family, nao);
+
+ {
+ val stream = open_sockfd(num(afd), family, num_fast(SOCK_STREAM), mode_str);
+ sock_set_peer(stream, peer);
+ return stream;
+ }
}
+failed:
+ uw_throwf(socket_error_s, lit("accept failed: ~d/~s"),
+ num(errno), string_utf8(strerror(errno)), nao);
}
static val sock_shutdown(val sock, val how)
@@ -332,6 +760,32 @@ static val sock_shutdown(val sock, val how)
return t;
}
+val open_sockfd(val fd, val family, val type, val mode_str_in)
+{
+ if (type == num_fast(SOCK_DGRAM)) {
+ return make_dgram_sock_stream(c_num(fd), family, nil, 0, 0, 0, 0);
+ } else {
+ struct stdio_mode m;
+ val mode_str = default_arg(mode_str_in, lit("r+"));
+ FILE *f = (errno = 0, w_fdopen(c_num(fd), c_str(normalize_mode(&m, mode_str))));
+
+ if (!f) {
+ close(c_num(fd));
+ uw_throwf(file_error_s, lit("error creating stream for socket ~a: ~d/~s"),
+ fd, num(errno), string_utf8(strerror(errno)), nao);
+ }
+
+ setvbuf(f, (char *) NULL, _IOLBF, 0);
+ return set_mode_props(m, make_sock_stream(f, family, type));
+ }
+}
+
+val open_socket(val family, val type, val mode_str)
+{
+ int fd = socket(c_num(family), c_num(type), 0);
+ return open_sockfd(num(fd), family, type, mode_str);
+}
+
void sock_load_init(void)
{
sockaddr_in_s = intern(lit("sockaddr-in"), user_package);
@@ -384,4 +838,10 @@ void sock_load_init(void)
reg_fun(intern(lit("sock-listen"), user_package), func_n2o(sock_listen, 1));
reg_fun(intern(lit("sock-accept"), user_package), func_n2o(sock_accept, 1));
reg_fun(intern(lit("sock-shutdown"), user_package), func_n2o(sock_shutdown, 1));
+
+ fill_stream_ops(&dgram_strm_ops);
+ dgram_strm_ops.get_sock_family = dgram_get_sock_family;
+ dgram_strm_ops.get_sock_type = dgram_get_sock_type;
+ dgram_strm_ops.get_sock_peer = dgram_get_sock_peer;
+ dgram_strm_ops.set_sock_peer = dgram_set_sock_peer;
}