implement correct buffer wrapping logic in std.event.Channel

This commit is contained in:
Quetzal Bradley 2019-11-28 02:19:08 +00:00 committed by Andrew Kelley
parent ca61a5f0b7
commit a6c9c5f767

View File

@ -54,6 +54,10 @@ pub fn Channel(comptime T: type) type {
/// For a zero length buffer, use `[0]T{}`.
/// TODO https://github.com/ziglang/zig/issues/2765
pub fn init(self: *SelfChannel, buffer: []T) void {
// The ring buffer implementation only works with power of 2 buffer sizes
// because of relying on subtracting across zero. For example (0 -% 1) % 10 == 5
assert(buffer.len == 0 or @popCount(usize, buffer.len) == 1);
self.* = SelfChannel{
.buffer_len = 0,
.buffer_nodes = buffer,
@ -184,11 +188,11 @@ pub fn Channel(comptime T: type) type {
const get_node = &self.getters.get().?.data;
switch (get_node.data) {
GetNode.Data.Normal => |info| {
info.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len];
info.ptr.* = self.buffer_nodes[(self.buffer_index -% self.buffer_len) % self.buffer_nodes.len];
},
GetNode.Data.OrNull => |info| {
_ = self.or_null_queue.remove(info.or_null);
info.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len];
info.ptr.* = self.buffer_nodes[(self.buffer_index -% self.buffer_len) % self.buffer_nodes.len];
},
}
global_event_loop.onNextTick(get_node.tick_node);
@ -222,7 +226,7 @@ pub fn Channel(comptime T: type) type {
while (self.buffer_len != self.buffer_nodes.len and put_count != 0) {
const put_node = &self.putters.get().?.data;
self.buffer_nodes[self.buffer_index] = put_node.data;
self.buffer_nodes[self.buffer_index % self.buffer_nodes.len] = put_node.data;
global_event_loop.onNextTick(put_node.tick_node);
self.buffer_index +%= 1;
self.buffer_len += 1;
@ -283,6 +287,29 @@ test "std.event.Channel" {
await putter;
}
test "std.event.Channel wraparound" {
// TODO provide a way to run tests in evented I/O mode
if (!std.io.is_async) return error.SkipZigTest;
const channel_size = 2;
var buf : [channel_size]i32 = undefined;
var channel: Channel(i32) = undefined;
channel.init(&buf);
defer channel.deinit();
// add items to channel and pull them out until
// the buffer wraps around, make sure it doesn't crash.
var result : i32 = undefined;
channel.put(5);
testing.expectEqual(@as(i32, 5), channel.get());
channel.put(6);
testing.expectEqual(@as(i32, 6), channel.get());
channel.put(7);
testing.expectEqual(@as(i32, 7), channel.get());
}
async fn testChannelGetter(channel: *Channel(i32)) void {
const value1 = channel.get();
testing.expect(value1 == 1234);