Add support for RoutingId and RouterHandover socket options

Remove unused ZSocketOptionTag enum
Add `ZSocket - routing id` unit test
See https://github.com/nine-lives-later/zzmq/pull/4
Thanks to https://github.com/7Zifle
This commit is contained in:
Felix Kollmann 2024-05-05 15:04:23 +02:00
parent 666a45901f
commit c6ab6ee83e

View File

@ -132,19 +132,7 @@ pub const ZSocketType = enum(c_int) {
Push = c.ZMQ_PUSH,
};
pub const ZSocketOptionTag = enum {
ReceiveTimeout,
ReceiveHighWaterMark,
ReceiveBufferSize,
SendTimeout,
SendHighWaterMark,
SendBufferSize,
LingerTimeout,
};
pub const ZSocketOption = union(ZSocketOptionTag) {
pub const ZSocketOption = union(enum) {
/// ZMQ_RCVTIMEO: Maximum time before a recv operation returns with EAGAIN
///
/// Sets the timeout for receive operation on the socket.
@ -251,6 +239,38 @@ pub const ZSocketOption = union(ZSocketOptionTag) {
///
/// For more details, see https://libzmq.readthedocs.io/en/latest/zmq_setsockopt.html
LingerTimeout: i32,
/// ZMQ_ROUTING_ID: Set socket routing id
///
/// The 'ZMQ_ROUTING_ID' option shall set the routing id of the specified 'socket' when connecting to a ROUTER socket.
///
/// A routing id must be at least one byte and at most 255 bytes long. Identities starting with a zero byte are reserved for
/// use by the 0MQ infrastructure.
///
/// If two clients use the same routing id when connecting to a ROUTER, the results shall
/// depend on the ZMQ_ROUTER_HANDOVER option setting. If that is not set (or set to the
/// default of zero), the ROUTER socket shall reject clients trying to connect with an
/// already-used routing id. If that option is set to 1, the ROUTER socket shall hand-over
/// the connection to the new client and disconnect the existing one.
///
/// For more details, see https://libzmq.readthedocs.io/en/latest/zmq_setsockopt.html
RoutingId: []u8,
/// ZMQ_ROUTER_HANDOVER: handle duplicate client routing ids on ROUTER sockets
///
/// If two clients use the same routing id when connecting to a ROUTER,
/// the results shall depend on the ZMQ_ROUTER_HANDOVER option setting.
///
/// If that is not set (or set to the default of false), the ROUTER socket shall reject
/// clients trying to connect with an already-used routing id.
///
/// If that option is set to true, the ROUTER socket shall hand-over the connection
/// to the new client and disconnect the existing one.
///
/// Default: false (reject client)
///
/// For more details, see https://libzmq.readthedocs.io/en/latest/zmq_setsockopt.html
RouterHandover: bool,
};
/// System level socket, which allows for opening outgoing and
@ -492,20 +512,51 @@ pub const ZSocket = struct {
var result: c_int = 0;
switch (opt) {
.ReceiveTimeout => result = c.zmq_setsockopt(self.socket_, c.ZMQ_RCVTIMEO, &opt.ReceiveTimeout, @sizeOf(@TypeOf(opt.ReceiveTimeout))),
.ReceiveHighWaterMark => result = c.zmq_setsockopt(self.socket_, c.ZMQ_RCVHWM, &opt.ReceiveHighWaterMark, @sizeOf(@TypeOf(opt.ReceiveHighWaterMark))),
.ReceiveBufferSize => result = c.zmq_setsockopt(self.socket_, c.ZMQ_RCVBUF, &opt.ReceiveBufferSize, @sizeOf(@TypeOf(opt.ReceiveBufferSize))),
.ReceiveTimeout => {
result = c.zmq_setsockopt(self.socket_, c.ZMQ_RCVTIMEO, &opt.ReceiveTimeout, @sizeOf(@TypeOf(opt.ReceiveTimeout)));
},
.ReceiveHighWaterMark => {
result = c.zmq_setsockopt(self.socket_, c.ZMQ_RCVHWM, &opt.ReceiveHighWaterMark, @sizeOf(@TypeOf(opt.ReceiveHighWaterMark)));
},
.ReceiveBufferSize => {
result = c.zmq_setsockopt(self.socket_, c.ZMQ_RCVBUF, &opt.ReceiveBufferSize, @sizeOf(@TypeOf(opt.ReceiveBufferSize)));
},
.SendTimeout => result = c.zmq_setsockopt(self.socket_, c.ZMQ_SNDTIMEO, &opt.SendTimeout, @sizeOf(@TypeOf(opt.SendTimeout))),
.SendHighWaterMark => result = c.zmq_setsockopt(self.socket_, c.ZMQ_SNDHWM, &opt.SendHighWaterMark, @sizeOf(@TypeOf(opt.SendHighWaterMark))),
.SendBufferSize => result = c.zmq_setsockopt(self.socket_, c.ZMQ_SNDBUF, &opt.SendBufferSize, @sizeOf(@TypeOf(opt.SendBufferSize))),
.SendTimeout => {
result = c.zmq_setsockopt(self.socket_, c.ZMQ_SNDTIMEO, &opt.SendTimeout, @sizeOf(@TypeOf(opt.SendTimeout)));
},
.SendHighWaterMark => {
result = c.zmq_setsockopt(self.socket_, c.ZMQ_SNDHWM, &opt.SendHighWaterMark, @sizeOf(@TypeOf(opt.SendHighWaterMark)));
},
.SendBufferSize => {
result = c.zmq_setsockopt(self.socket_, c.ZMQ_SNDBUF, &opt.SendBufferSize, @sizeOf(@TypeOf(opt.SendBufferSize)));
},
.LingerTimeout => result = c.zmq_setsockopt(self.socket_, c.ZMQ_LINGER, &opt.LingerTimeout, @sizeOf(@TypeOf(opt.LingerTimeout))),
.LingerTimeout => {
result = c.zmq_setsockopt(self.socket_, c.ZMQ_LINGER, &opt.LingerTimeout, @sizeOf(@TypeOf(opt.LingerTimeout)));
},
.RoutingId => {
result = c.zmq_setsockopt(self.socket_, c.ZMQ_ROUTING_ID, opt.RoutingId.ptr, opt.RoutingId.len);
},
.RouterHandover => {
const v: c_int = @intFromBool(opt.RouterHandover);
result = c.zmq_setsockopt(self.socket_, c.ZMQ_ROUTER_HANDOVER, &v, @sizeOf(@TypeOf(v)));
},
//else => return error.UnknownOption,
}
if (result < 0) return error.SetFailed;
if (result < 0) {
switch (c.zmq_errno()) {
c.EINVAL => return error.OptionOrValueInvalid,
c.ETERM => return error.ZContextTerminated,
c.ENOTSOCK => return error.SocketInvalid,
c.EINTR => return error.Interrupted,
else => return error.SetFailed,
}
}
}
/// Get an option of the socket. See `ZSocketOption` for details.
@ -551,10 +602,25 @@ pub const ZSocket = struct {
result = c.zmq_getsockopt(self.socket_, c.ZMQ_LINGER, &opt.LingerTimeout, &length);
},
.RoutingId => {
result = c.zmq_getsockopt(self.socket_, c.ZMQ_ROUTING_ID, opt.RoutingId.ptr, &opt.RoutingId.len);
},
.RouterHandover => {
return error.UnknownOption; // ZMQ_ROUTER_HANDOVER cannot be retrieved
},
//else => return error.UnknownOption,
}
if (result < 0) return error.GetFailed;
if (result < 0) {
switch (c.zmq_errno()) {
c.EINVAL => return error.OptionOrValueInvalid,
c.ETERM => return error.ZContextTerminated,
c.ENOTSOCK => return error.SocketInvalid,
c.EINTR => return error.Interrupted,
else => return error.GetFailed,
}
}
}
/// Destroy the socket and clean up
@ -802,3 +868,40 @@ test "ZSocket - send timeout" {
// try again, the owner should be lost
try std.testing.expectError(error.MessageOwnershipLost, socket.send(&message, .{}));
}
test "ZSocket - routing id" {
const allocator = std.testing.allocator;
// create the context
var context = try zcontext.ZContext.init(allocator);
defer context.deinit();
// create the socket
var socket = try ZSocket.init(ZSocketType.Router, &context);
defer socket.deinit();
// set the routing id
{
var v = ZSocketOption{ .RoutingId = undefined };
try socket.getSocketOption(&v);
try std.testing.expectEqualStrings("", v.RoutingId);
}
try socket.setSocketOption(.{ .RoutingId = @constCast("myRoutingID") });
{
var routingId: [255]u8 = undefined;
var v = ZSocketOption{ .RoutingId = &routingId };
try socket.getSocketOption(&v);
try std.testing.expectEqualStrings("myRoutingID", v.RoutingId);
}
// set the router handover
try socket.setSocketOption(.{ .RouterHandover = true });
{
var v = ZSocketOption{ .RouterHandover = undefined };
try std.testing.expectError(error.UnknownOption, socket.getSocketOption(&v));
}
}