Add socket options

This commit is contained in:
Felix Kollmann 2024-02-18 13:03:49 +01:00
parent e7026b37f7
commit 84bc9c6512

View File

@ -91,6 +91,26 @@ pub const ZSocketType = enum(c_int) {
Push = c.ZMQ_PUSH,
};
pub const ZSocketOptionTag = enum {
ReceiveTimeout,
};
pub const ZSocketOption = union(ZSocketOptionTag) {
/// ZMQ_RCVTIMEO: Maximum time before a recv operation returns with EAGAIN
///
/// Sets the timeout for receive operation on the socket.
/// If the value is 0, zmq_recv(3) will return immediately, with a EAGAIN error
/// if there is no message to receive. If the value is -1, it will block until a
/// message is available. For all other values, it will wait for a message for that
/// amount of time before returning with an EAGAIN error.
///
/// Unit: milliseconds
/// Default: -1 (infinite)
///
/// For more details, see http://api.zeromq.org/3-0:zmq-setsockopt
ReceiveTimeout: i32,
};
/// System level socket, which allows for opening outgoing and
/// accepting incoming connections.
///
@ -277,6 +297,22 @@ pub const ZSocket = struct {
return zframe.ZFrame{ .frame = frame.? };
}
/// Set an option on the socket. See `ZSocketOption` for details.
pub fn setSocketOption(self: *ZSocket, opt: ZSocketOption) !void {
switch (opt) {
.ReceiveTimeout => c.zsock_set_rcvtimeo(self.socket, @intCast(opt.ReceiveTimeout)),
//else => return error.UnknownOption,
}
}
/// Get an option of the socket. See `ZSocketOption` for details.
pub fn getSocketOption(self: *ZSocket, opt: *ZSocketOption) !void {
switch (opt.*) {
.ReceiveTimeout => opt.ReceiveTimeout = c.zsock_rcvtimeo(self.socket),
//else => return error.UnknownOption,
}
}
/// Destroy the socket and clean up
pub fn deinit(self: *ZSocket) void {
var socket: ?*c.zsock_t = self.socket;
@ -404,3 +440,31 @@ test "ZSocket - roundtrip json" {
try std.testing.expectEqualDeep(outgoingObj, incomingObj.value);
}
test "ZSocket - receive timeout" {
const allocator = std.testing.allocator;
// bind the incoming socket
var incoming = try ZSocket.init(allocator, ZSocketType.Rep);
defer incoming.deinit();
{
var timeout = ZSocketOption{ .ReceiveTimeout = undefined };
try incoming.getSocketOption(&timeout);
try std.testing.expectEqual(@as(i32, -1), timeout.ReceiveTimeout);
}
try incoming.setSocketOption(.{ .ReceiveTimeout = 500 });
{
var timeout = ZSocketOption{ .ReceiveTimeout = undefined };
try incoming.getSocketOption(&timeout);
try std.testing.expectEqual(@as(i32, 500), timeout.ReceiveTimeout);
}
const port = try incoming.bind("tcp://127.0.0.1:!");
try std.testing.expect(port >= 0xC000);
// try to receive the message
try std.testing.expectError(error.ReceiveFrameInterrupted, incoming.receive());
}