Add socket options II

This commit is contained in:
Felix Kollmann 2024-02-18 14:05:57 +01:00
parent 84bc9c6512
commit e5298359f7

View File

@ -93,6 +93,12 @@ pub const ZSocketType = enum(c_int) {
pub const ZSocketOptionTag = enum {
ReceiveTimeout,
ReceiveHighWaterMark,
ReceiveBufferSize,
SendTimeout,
SendHighWaterMark,
SendBufferSize,
};
pub const ZSocketOption = union(ZSocketOptionTag) {
@ -109,6 +115,86 @@ pub const ZSocketOption = union(ZSocketOptionTag) {
///
/// For more details, see http://api.zeromq.org/3-0:zmq-setsockopt
ReceiveTimeout: i32,
/// ZMQ_RCVHWM: Set high water mark for inbound messages
///
/// The ZMQ_RCVHWM option shall set the high water mark for inbound messages on the specified socket.
/// The high water mark is a hard limit on the maximum number of outstanding messages ØMQ shall
/// queue in memory for any single peer that the specified socket is communicating with.
///
/// If this limit has been reached the socket shall enter an exceptional state and depending on the socket type,
/// ØMQ shall take appropriate action such as blocking or dropping sent messages.
///
/// Refer to the individual socket descriptions in zmq_socket(3) for details on the exact action taken for each socket type.
///
/// Unit: message count
/// Default: 0 (unlimited)
///
/// For more details, see http://api.zeromq.org/3-0:zmq-setsockopt
ReceiveHighWaterMark: i32,
/// ZMQ_RCVBUF: Set kernel receive buffer size
///
/// The ZMQ_RCVBUF option shall set the underlying kernel receive buffer size
/// for the socket to the specified size in bytes. A value of zero means leave
/// the OS default unchanged.
///
/// For details refer to your operating system documentation for the SO_RCVBUF socket option.
///
/// Unit: bytes
/// Default: 0 (use kernel default)
///
/// For more details, see http://api.zeromq.org/3-0:zmq-setsockopt
ReceiveBufferSize: i32,
/// ZMQ_SNDTIMEO: Maximum time before a send operation returns with EAGAIN
///
/// Sets the timeout for send operation on the socket.
/// If the value is 0, zmq_send(3) will return immediately, with a
/// EAGAIN error if the message cannot be sent. If the value is -1,
/// it will block until the message is sent. For all other values,
/// it will try to send the message for that amount of time before
/// returning with an EAGAIN error.
///
/// Unit: milliseconds
/// Default: -1 (infinite)
///
/// For more details, see http://api.zeromq.org/3-0:zmq-setsockopt
SendTimeout: i32,
/// ZMQ_SNDHWM: Set high water mark for outbound messages
///
/// The ZMQ_SNDHWM option shall set the high water mark for outbound messages
/// on the specified socket. The high water mark is a hard limit on the maximum
/// number of outstanding messages ØMQ shall queue in memory for any single peer
/// that the specified socket is communicating with.
///
/// If this limit has been reached the socket shall enter an exceptional state
/// and depending on the socket type, ØMQ shall take appropriate action such as
/// blocking or dropping sent messages.
///
/// Refer to the individual socket descriptions
/// in zmq_socket(3) for details on the exact action taken for each socket type.
///
/// Unit: message count
/// Default: 0 (unlimited)
///
/// For more details, see http://api.zeromq.org/3-0:zmq-setsockopt
SendHighWaterMark: i32,
/// ZMQ_SNDBUF: Set kernel transmit buffer size
///
/// The ZMQ_SNDBUF option shall set the underlying kernel transmit buffer size
/// for the socket to the specified size in bytes. A value of zero means leave
/// the OS default unchanged.
///
/// For details please refer to your operating system documentation for the SO_SNDBUF socket option.
///
/// Unit: bytes
/// Default: 0 (use kernel default)
///
/// For more details, see http://api.zeromq.org/3-0:zmq-setsockopt
SendBufferSize: i32,
};
/// System level socket, which allows for opening outgoing and
@ -301,6 +387,13 @@ pub const ZSocket = struct {
pub fn setSocketOption(self: *ZSocket, opt: ZSocketOption) !void {
switch (opt) {
.ReceiveTimeout => c.zsock_set_rcvtimeo(self.socket, @intCast(opt.ReceiveTimeout)),
.ReceiveHighWaterMark => c.zsock_set_rcvhwm(self.socket, @intCast(opt.ReceiveHighWaterMark)),
.ReceiveBufferSize => c.zsock_set_rcvbuf(self.socket, @intCast(opt.ReceiveBufferSize)),
.SendTimeout => c.zsock_set_sndtimeo(self.socket, @intCast(opt.SendTimeout)),
.SendHighWaterMark => c.zsock_set_sndhwm(self.socket, @intCast(opt.SendHighWaterMark)),
.SendBufferSize => c.zsock_set_sndbuf(self.socket, @intCast(opt.SendBufferSize)),
//else => return error.UnknownOption,
}
}
@ -309,6 +402,13 @@ pub const ZSocket = struct {
pub fn getSocketOption(self: *ZSocket, opt: *ZSocketOption) !void {
switch (opt.*) {
.ReceiveTimeout => opt.ReceiveTimeout = c.zsock_rcvtimeo(self.socket),
.ReceiveHighWaterMark => opt.ReceiveHighWaterMark = c.zsock_rcvhwm(self.socket),
.ReceiveBufferSize => opt.ReceiveBufferSize = c.zsock_rcvbuf(self.socket),
.SendTimeout => opt.SendTimeout = c.zsock_sndtimeo(self.socket),
.SendHighWaterMark => opt.SendHighWaterMark = c.zsock_sndhwm(self.socket),
.SendBufferSize => opt.SendBufferSize = c.zsock_sndbuf(self.socket),
//else => return error.UnknownOption,
}
}
@ -444,10 +544,11 @@ test "ZSocket - roundtrip json" {
test "ZSocket - receive timeout" {
const allocator = std.testing.allocator;
// bind the incoming socket
// create the socket
var incoming = try ZSocket.init(allocator, ZSocketType.Rep);
defer incoming.deinit();
// set the receive timeout
{
var timeout = ZSocketOption{ .ReceiveTimeout = undefined };
try incoming.getSocketOption(&timeout);
@ -462,9 +563,43 @@ test "ZSocket - receive timeout" {
try std.testing.expectEqual(@as(i32, 500), timeout.ReceiveTimeout);
}
// bind the port
const port = try incoming.bind("tcp://127.0.0.1:!");
try std.testing.expect(port >= 0xC000);
// try to receive the message
try std.testing.expectError(error.ReceiveFrameInterrupted, incoming.receive());
}
test "ZSocket - send timeout" {
const allocator = std.testing.allocator;
// create the socket
var socket = try ZSocket.init(allocator, ZSocketType.Pair);
defer socket.deinit();
// set the send timeout
{
var timeout = ZSocketOption{ .SendTimeout = undefined };
try socket.getSocketOption(&timeout);
try std.testing.expectEqual(@as(i32, -1), timeout.SendTimeout);
}
try socket.setSocketOption(.{ .SendTimeout = 500 });
{
var timeout = ZSocketOption{ .SendTimeout = undefined };
try socket.getSocketOption(&timeout);
try std.testing.expectEqual(@as(i32, 500), timeout.SendTimeout);
}
// bind the port
const port = try socket.bind("tcp://127.0.0.1:!");
try std.testing.expect(port >= 0xC000);
// try to send the message
var frame = try zframe.ZFrame.initEmpty();
defer frame.deinit();
try std.testing.expectError(error.SendFrameFailed, socket.send(&frame, .{}));
}