From ab534cc9f16697da4846820b038e17fd1bc27f35 Mon Sep 17 00:00:00 2001 From: Vexu <15308111+Vexu@users.noreply.github.com> Date: Sun, 24 Nov 2019 13:16:09 +0200 Subject: [PATCH] update event.fs to use global event loop --- lib/std/event/fs.zig | 182 +++++++++++++++++++------------------------ 1 file changed, 80 insertions(+), 102 deletions(-) diff --git a/lib/std/event/fs.zig b/lib/std/event/fs.zig index eef5cfae7..63e073fcb 100644 --- a/lib/std/event/fs.zig +++ b/lib/std/event/fs.zig @@ -10,6 +10,9 @@ const Loop = event.Loop; const fd_t = os.fd_t; const File = std.fs.File; +const global_event_loop = Loop.instance orelse + @compileError("std.event.fs currently only works with event-based I/O"); + pub const RequestNode = std.atomic.Queue(Request).Node; pub const Request = struct { @@ -86,7 +89,7 @@ pub const Request = struct { pub const PWriteVError = error{OutOfMemory} || File.WriteError; /// data - just the inner references - must live until pwritev frame completes. -pub fn pwritev(loop: *Loop, fd: fd_t, data: []const []const u8, offset: usize) PWriteVError!void { +pub fn pwritev(allocator: *Allocator, fd: fd_t, data: []const []const u8, offset: usize) PWriteVError!void { switch (builtin.os) { .macosx, .linux, @@ -94,8 +97,8 @@ pub fn pwritev(loop: *Loop, fd: fd_t, data: []const []const u8, offset: usize) P .netbsd, .dragonfly, => { - const iovecs = try loop.allocator.alloc(os.iovec_const, data.len); - defer loop.allocator.free(iovecs); + const iovecs = try allocator.alloc(os.iovec_const, data.len); + defer allocator.free(iovecs); for (data) |buf, i| { iovecs[i] = os.iovec_const{ @@ -104,31 +107,31 @@ pub fn pwritev(loop: *Loop, fd: fd_t, data: []const []const u8, offset: usize) P }; } - return pwritevPosix(loop, fd, iovecs, offset); + return pwritevPosix(fd, iovecs, offset); }, .windows => { - const data_copy = try std.mem.dupe(loop.allocator, []const u8, data); - defer loop.allocator.free(data_copy); - return pwritevWindows(loop, fd, data, offset); + const data_copy = try std.mem.dupe(allocator, []const u8, data); + defer allocator.free(data_copy); + return pwritevWindows(fd, data, offset); }, else => @compileError("Unsupported OS"), } } /// data must outlive the returned frame -pub fn pwritevWindows(loop: *Loop, fd: fd_t, data: []const []const u8, offset: usize) os.WindowsWriteError!void { +pub fn pwritevWindows(fd: fd_t, data: []const []const u8, offset: usize) os.WindowsWriteError!void { if (data.len == 0) return; - if (data.len == 1) return pwriteWindows(loop, fd, data[0], offset); + if (data.len == 1) return pwriteWindows(fd, data[0], offset); // TODO do these in parallel var off = offset; for (data) |buf| { - try pwriteWindows(loop, fd, buf, off); + try pwriteWindows(fd, buf, off); off += buf.len; } } -pub fn pwriteWindows(loop: *Loop, fd: fd_t, data: []const u8, offset: u64) os.WindowsWriteError!void { +pub fn pwriteWindows(fd: fd_t, data: []const u8, offset: u64) os.WindowsWriteError!void { var resume_node = Loop.ResumeNode.Basic{ .base = Loop.ResumeNode{ .id = Loop.ResumeNode.Id.Basic, @@ -143,9 +146,9 @@ pub fn pwriteWindows(loop: *Loop, fd: fd_t, data: []const u8, offset: u64) os.Wi }, }; // TODO only call create io completion port once per fd - _ = windows.CreateIoCompletionPort(fd, loop.os_data.io_port, undefined, undefined); - loop.beginOneEvent(); - errdefer loop.finishOneEvent(); + _ = windows.CreateIoCompletionPort(fd, global_event_loop.os_data.io_port, undefined, undefined); + global_event_loop.beginOneEvent(); + errdefer global_event_loop.finishOneEvent(); errdefer { _ = windows.kernel32.CancelIoEx(fd, &resume_node.base.overlapped); @@ -168,12 +171,7 @@ pub fn pwriteWindows(loop: *Loop, fd: fd_t, data: []const u8, offset: u64) os.Wi } /// iovecs must live until pwritev frame completes. -pub fn pwritevPosix( - loop: *Loop, - fd: fd_t, - iovecs: []const os.iovec_const, - offset: usize, -) os.WriteError!void { +pub fn pwritevPosix(fd: fd_t, iovecs: []const os.iovec_const, offset: usize) os.WriteError!void { var req_node = RequestNode{ .prev = null, .next = null, @@ -196,21 +194,17 @@ pub fn pwritevPosix( }, }; - errdefer loop.posixFsCancel(&req_node); + errdefer global_event_loop.posixFsCancel(&req_node); suspend { - loop.posixFsRequest(&req_node); + global_event_loop.posixFsRequest(&req_node); } return req_node.data.msg.PWriteV.result; } /// iovecs must live until pwritev frame completes. -pub fn writevPosix( - loop: *Loop, - fd: fd_t, - iovecs: []const os.iovec_const, -) os.WriteError!void { +pub fn writevPosix(fd: fd_t, iovecs: []const os.iovec_const) os.WriteError!void { var req_node = RequestNode{ .prev = null, .next = null, @@ -233,7 +227,7 @@ pub fn writevPosix( }; suspend { - loop.posixFsRequest(&req_node); + global_event_loop.posixFsRequest(&req_node); } return req_node.data.msg.WriteV.result; @@ -242,7 +236,7 @@ pub fn writevPosix( pub const PReadVError = error{OutOfMemory} || File.ReadError; /// data - just the inner references - must live until preadv frame completes. -pub fn preadv(loop: *Loop, fd: fd_t, data: []const []u8, offset: usize) PReadVError!usize { +pub fn preadv(allocator: *Allocator, fd: fd_t, data: []const []u8, offset: usize) PReadVError!usize { assert(data.len != 0); switch (builtin.os) { .macosx, @@ -251,8 +245,8 @@ pub fn preadv(loop: *Loop, fd: fd_t, data: []const []u8, offset: usize) PReadVEr .netbsd, .dragonfly, => { - const iovecs = try loop.allocator.alloc(os.iovec, data.len); - defer loop.allocator.free(iovecs); + const iovecs = try allocator.alloc(os.iovec, data.len); + defer allocator.free(iovecs); for (data) |buf, i| { iovecs[i] = os.iovec{ @@ -261,21 +255,21 @@ pub fn preadv(loop: *Loop, fd: fd_t, data: []const []u8, offset: usize) PReadVEr }; } - return preadvPosix(loop, fd, iovecs, offset); + return preadvPosix(fd, iovecs, offset); }, .windows => { - const data_copy = try std.mem.dupe(loop.allocator, []u8, data); - defer loop.allocator.free(data_copy); - return preadvWindows(loop, fd, data_copy, offset); + const data_copy = try std.mem.dupe(allocator, []u8, data); + defer allocator.free(data_copy); + return preadvWindows(fd, data_copy, offset); }, else => @compileError("Unsupported OS"), } } /// data must outlive the returned frame -pub fn preadvWindows(loop: *Loop, fd: fd_t, data: []const []u8, offset: u64) !usize { +pub fn preadvWindows(fd: fd_t, data: []const []u8, offset: u64) !usize { assert(data.len != 0); - if (data.len == 1) return preadWindows(loop, fd, data[0], offset); + if (data.len == 1) return preadWindows(fd, data[0], offset); // TODO do these in parallel? var off: usize = 0; @@ -283,7 +277,7 @@ pub fn preadvWindows(loop: *Loop, fd: fd_t, data: []const []u8, offset: u64) !us var inner_off: usize = 0; while (true) { const v = data[iov_i]; - const amt_read = try preadWindows(loop, fd, v[inner_off .. v.len - inner_off], offset + off); + const amt_read = try preadWindows(fd, v[inner_off .. v.len - inner_off], offset + off); off += amt_read; inner_off += amt_read; if (inner_off == v.len) { @@ -297,7 +291,7 @@ pub fn preadvWindows(loop: *Loop, fd: fd_t, data: []const []u8, offset: u64) !us } } -pub fn preadWindows(loop: *Loop, fd: fd_t, data: []u8, offset: u64) !usize { +pub fn preadWindows(fd: fd_t, data: []u8, offset: u64) !usize { var resume_node = Loop.ResumeNode.Basic{ .base = Loop.ResumeNode{ .id = Loop.ResumeNode.Id.Basic, @@ -312,9 +306,9 @@ pub fn preadWindows(loop: *Loop, fd: fd_t, data: []u8, offset: u64) !usize { }, }; // TODO only call create io completion port once per fd - _ = windows.CreateIoCompletionPort(fd, loop.os_data.io_port, undefined, undefined) catch undefined; - loop.beginOneEvent(); - errdefer loop.finishOneEvent(); + _ = windows.CreateIoCompletionPort(fd, global_event_loop.os_data.io_port, undefined, undefined) catch undefined; + global_event_loop.beginOneEvent(); + errdefer global_event_loop.finishOneEvent(); errdefer { _ = windows.kernel32.CancelIoEx(fd, &resume_node.base.overlapped); @@ -336,12 +330,7 @@ pub fn preadWindows(loop: *Loop, fd: fd_t, data: []u8, offset: u64) !usize { } /// iovecs must live until preadv frame completes -pub fn preadvPosix( - loop: *Loop, - fd: fd_t, - iovecs: []const os.iovec, - offset: usize, -) os.ReadError!usize { +pub fn preadvPosix(fd: fd_t, iovecs: []const os.iovec, offset: usize) os.ReadError!usize { var req_node = RequestNode{ .prev = null, .next = null, @@ -364,21 +353,16 @@ pub fn preadvPosix( }, }; - errdefer loop.posixFsCancel(&req_node); + errdefer global_event_loop.posixFsCancel(&req_node); suspend { - loop.posixFsRequest(&req_node); + global_event_loop.posixFsRequest(&req_node); } return req_node.data.msg.PReadV.result; } -pub fn openPosix( - loop: *Loop, - path: []const u8, - flags: u32, - mode: File.Mode, -) File.OpenError!fd_t { +pub fn openPosix(path: []const u8, flags: u32, mode: File.Mode) File.OpenError!fd_t { const path_c = try std.os.toPosixPath(path); var req_node = RequestNode{ @@ -403,21 +387,21 @@ pub fn openPosix( }, }; - errdefer loop.posixFsCancel(&req_node); + errdefer global_event_loop.posixFsCancel(&req_node); suspend { - loop.posixFsRequest(&req_node); + global_event_loop.posixFsRequest(&req_node); } return req_node.data.msg.Open.result; } -pub fn openRead(loop: *Loop, path: []const u8) File.OpenError!fd_t { +pub fn openRead(path: []const u8) File.OpenError!fd_t { switch (builtin.os) { .macosx, .linux, .freebsd, .netbsd, .dragonfly => { const O_LARGEFILE = if (@hasDecl(os, "O_LARGEFILE")) os.O_LARGEFILE else 0; const flags = O_LARGEFILE | os.O_RDONLY | os.O_CLOEXEC; - return openPosix(loop, path, flags, File.default_mode); + return openPosix(path, flags, File.default_mode); }, .windows => return windows.CreateFile( @@ -436,12 +420,12 @@ pub fn openRead(loop: *Loop, path: []const u8) File.OpenError!fd_t { /// Creates if does not exist. Truncates the file if it exists. /// Uses the default mode. -pub fn openWrite(loop: *Loop, path: []const u8) File.OpenError!fd_t { - return openWriteMode(loop, path, File.default_mode); +pub fn openWrite(path: []const u8) File.OpenError!fd_t { + return openWriteMode(path, File.default_mode); } /// Creates if does not exist. Truncates the file if it exists. -pub fn openWriteMode(loop: *Loop, path: []const u8, mode: File.Mode) File.OpenError!fd_t { +pub fn openWriteMode(path: []const u8, mode: File.Mode) File.OpenError!fd_t { switch (builtin.os) { .macosx, .linux, @@ -451,7 +435,7 @@ pub fn openWriteMode(loop: *Loop, path: []const u8, mode: File.Mode) File.OpenEr => { const O_LARGEFILE = if (@hasDecl(os, "O_LARGEFILE")) os.O_LARGEFILE else 0; const flags = O_LARGEFILE | os.O_WRONLY | os.O_CREAT | os.O_CLOEXEC | os.O_TRUNC; - return openPosix(loop, path, flags, File.default_mode); + return openPosix(path, flags, File.default_mode); }, .windows => return windows.CreateFile( path, @@ -467,16 +451,12 @@ pub fn openWriteMode(loop: *Loop, path: []const u8, mode: File.Mode) File.OpenEr } /// Creates if does not exist. Does not truncate. -pub fn openReadWrite( - loop: *Loop, - path: []const u8, - mode: File.Mode, -) File.OpenError!fd_t { +pub fn openReadWrite(path: []const u8, mode: File.Mode) File.OpenError!fd_t { switch (builtin.os) { .macosx, .linux, .freebsd, .netbsd, .dragonfly => { const O_LARGEFILE = if (@hasDecl(os, "O_LARGEFILE")) os.O_LARGEFILE else 0; const flags = O_LARGEFILE | os.O_RDWR | os.O_CREAT | os.O_CLOEXEC; - return openPosix(loop, path, flags, mode); + return openPosix(path, flags, mode); }, .windows => return windows.CreateFile( @@ -500,7 +480,7 @@ pub fn openReadWrite( /// If you call `setHandle` then finishing will close the fd; otherwise finishing /// will deallocate the `CloseOperation`. pub const CloseOperation = struct { - loop: *Loop, + allocator: *Allocator, os_data: OsData, const OsData = switch (builtin.os) { @@ -518,10 +498,10 @@ pub const CloseOperation = struct { close_req_node: RequestNode, }; - pub fn start(loop: *Loop) (error{OutOfMemory}!*CloseOperation) { - const self = try loop.allocator.create(CloseOperation); + pub fn start(allocator: *Allocator) (error{OutOfMemory}!*CloseOperation) { + const self = try allocator.create(CloseOperation); self.* = CloseOperation{ - .loop = loop, + .allocator = allocator, .os_data = switch (builtin.os) { .linux, .macosx, .freebsd, .netbsd, .dragonfly => initOsDataPosix(self), .windows => OsData{ .handle = null }, @@ -557,16 +537,16 @@ pub const CloseOperation = struct { .dragonfly, => { if (self.os_data.have_fd) { - self.loop.posixFsRequest(&self.os_data.close_req_node); + global_event_loop.posixFsRequest(&self.os_data.close_req_node); } else { - self.loop.allocator.destroy(self); + self.allocator.destroy(self); } }, .windows => { if (self.os_data.handle) |handle| { os.close(handle); } - self.loop.allocator.destroy(self); + self.allocator.destroy(self); }, else => @compileError("Unsupported OS"), } @@ -629,25 +609,25 @@ pub const CloseOperation = struct { /// contents must remain alive until writeFile completes. /// TODO make this atomic or provide writeFileAtomic and rename this one to writeFileTruncate -pub fn writeFile(loop: *Loop, path: []const u8, contents: []const u8) !void { - return writeFileMode(loop, path, contents, File.default_mode); +pub fn writeFile(allocator: *Allocator, path: []const u8, contents: []const u8) !void { + return writeFileMode(allocator, path, contents, File.default_mode); } /// contents must remain alive until writeFile completes. -pub fn writeFileMode(loop: *Loop, path: []const u8, contents: []const u8, mode: File.Mode) !void { +pub fn writeFileMode(allocator: *Allocator, path: []const u8, contents: []const u8, mode: File.Mode) !void { switch (builtin.os) { .linux, .macosx, .freebsd, .netbsd, .dragonfly, - => return writeFileModeThread(loop, path, contents, mode), - .windows => return writeFileWindows(loop, path, contents), + => return writeFileModeThread(allocator, path, contents, mode), + .windows => return writeFileWindows(path, contents), else => @compileError("Unsupported OS"), } } -fn writeFileWindows(loop: *Loop, path: []const u8, contents: []const u8) !void { +fn writeFileWindows(path: []const u8, contents: []const u8) !void { const handle = try windows.CreateFile( path, windows.GENERIC_WRITE, @@ -659,12 +639,12 @@ fn writeFileWindows(loop: *Loop, path: []const u8, contents: []const u8) !void { ); defer os.close(handle); - try pwriteWindows(loop, handle, contents, 0); + try pwriteWindows(handle, contents, 0); } -fn writeFileModeThread(loop: *Loop, path: []const u8, contents: []const u8, mode: File.Mode) !void { - const path_with_null = try std.cstr.addNullByte(loop.allocator, path); - defer loop.allocator.free(path_with_null); +fn writeFileModeThread(allocator: *Allocator, path: []const u8, contents: []const u8, mode: File.Mode) !void { + const path_with_null = try std.cstr.addNullByte(allocator, path); + defer allocator.free(path_with_null); var req_node = RequestNode{ .prev = null, @@ -688,10 +668,10 @@ fn writeFileModeThread(loop: *Loop, path: []const u8, contents: []const u8, mode }, }; - errdefer loop.posixFsCancel(&req_node); + errdefer global_event_loop.posixFsCancel(&req_node); suspend { - loop.posixFsRequest(&req_node); + global_event_loop.posixFsRequest(&req_node); } return req_node.data.msg.WriteFile.result; @@ -700,21 +680,21 @@ fn writeFileModeThread(loop: *Loop, path: []const u8, contents: []const u8, mode /// The frame resumes when the last data has been confirmed written, but before the file handle /// is closed. /// Caller owns returned memory. -pub fn readFile(loop: *Loop, file_path: []const u8, max_size: usize) ![]u8 { - var close_op = try CloseOperation.start(loop); +pub fn readFile(allocator: *Allocator, file_path: []const u8, max_size: usize) ![]u8 { + var close_op = try CloseOperation.start(); defer close_op.finish(); - const fd = try openRead(loop, file_path); + const fd = try openRead(file_path); close_op.setHandle(fd); - var list = std.ArrayList(u8).init(loop.allocator); + var list = std.ArrayList(u8).init(allocator); defer list.deinit(); while (true) { try list.ensureCapacity(list.len + mem.page_size); const buf = list.items[list.len..]; const buf_array = [_][]u8{buf}; - const amt = try preadv(loop, fd, buf_array, list.len); + const amt = try preadv(fd, buf_array, list.len); list.len += amt; if (list.len > max_size) { return error.FileTooBig; @@ -1392,16 +1372,15 @@ fn testFsWatch(loop: *Loop) !void { pub const OutStream = struct { fd: fd_t, stream: Stream, - loop: *Loop, + allocator: *Allocator, offset: usize, pub const Error = File.WriteError; pub const Stream = event.io.OutStream(Error); - pub fn init(loop: *Loop, fd: fd_t, offset: usize) OutStream { + pub fn init(allocator: *Allocator, fd: fd_t, offset: usize) OutStream { return OutStream{ .fd = fd, - .loop = loop, .offset = offset, .stream = Stream{ .writeFn = writeFn }, }; @@ -1411,23 +1390,22 @@ pub const OutStream = struct { const self = @fieldParentPtr(OutStream, "stream", out_stream); const offset = self.offset; self.offset += bytes.len; - return pwritev(self.loop, self.fd, [][]const u8{bytes}, offset); + return pwritev(self.allocator, self.fd, [_][]const u8{bytes}, offset); } }; pub const InStream = struct { fd: fd_t, stream: Stream, - loop: *Loop, + allocator: *Allocator, offset: usize, pub const Error = PReadVError; // TODO make this not have OutOfMemory pub const Stream = event.io.InStream(Error); - pub fn init(loop: *Loop, fd: fd_t, offset: usize) InStream { + pub fn init(allocator: *Allocator, fd: fd_t, offset: usize) InStream { return InStream{ .fd = fd, - .loop = loop, .offset = offset, .stream = Stream{ .readFn = readFn }, }; @@ -1435,7 +1413,7 @@ pub const InStream = struct { fn readFn(in_stream: *Stream, bytes: []u8) Error!usize { const self = @fieldParentPtr(InStream, "stream", in_stream); - const amt = try preadv(self.loop, self.fd, [][]u8{bytes}, self.offset); + const amt = try preadv(self.allocator, self.fd, [_][]u8{bytes}, self.offset); self.offset += amt; return amt; }