update event.fs to use global event loop

This commit is contained in:
Vexu 2019-11-24 13:16:09 +02:00
parent 29d7b5a80c
commit ab534cc9f1
No known key found for this signature in database
GPG Key ID: 5AEABFCAFF5CD8D6

View File

@ -10,6 +10,9 @@ const Loop = event.Loop;
const fd_t = os.fd_t;
const File = std.fs.File;
const global_event_loop = Loop.instance orelse
@compileError("std.event.fs currently only works with event-based I/O");
pub const RequestNode = std.atomic.Queue(Request).Node;
pub const Request = struct {
@ -86,7 +89,7 @@ pub const Request = struct {
pub const PWriteVError = error{OutOfMemory} || File.WriteError;
/// data - just the inner references - must live until pwritev frame completes.
pub fn pwritev(loop: *Loop, fd: fd_t, data: []const []const u8, offset: usize) PWriteVError!void {
pub fn pwritev(allocator: *Allocator, fd: fd_t, data: []const []const u8, offset: usize) PWriteVError!void {
switch (builtin.os) {
.macosx,
.linux,
@ -94,8 +97,8 @@ pub fn pwritev(loop: *Loop, fd: fd_t, data: []const []const u8, offset: usize) P
.netbsd,
.dragonfly,
=> {
const iovecs = try loop.allocator.alloc(os.iovec_const, data.len);
defer loop.allocator.free(iovecs);
const iovecs = try allocator.alloc(os.iovec_const, data.len);
defer allocator.free(iovecs);
for (data) |buf, i| {
iovecs[i] = os.iovec_const{
@ -104,31 +107,31 @@ pub fn pwritev(loop: *Loop, fd: fd_t, data: []const []const u8, offset: usize) P
};
}
return pwritevPosix(loop, fd, iovecs, offset);
return pwritevPosix(fd, iovecs, offset);
},
.windows => {
const data_copy = try std.mem.dupe(loop.allocator, []const u8, data);
defer loop.allocator.free(data_copy);
return pwritevWindows(loop, fd, data, offset);
const data_copy = try std.mem.dupe(allocator, []const u8, data);
defer allocator.free(data_copy);
return pwritevWindows(fd, data, offset);
},
else => @compileError("Unsupported OS"),
}
}
/// data must outlive the returned frame
pub fn pwritevWindows(loop: *Loop, fd: fd_t, data: []const []const u8, offset: usize) os.WindowsWriteError!void {
pub fn pwritevWindows(fd: fd_t, data: []const []const u8, offset: usize) os.WindowsWriteError!void {
if (data.len == 0) return;
if (data.len == 1) return pwriteWindows(loop, fd, data[0], offset);
if (data.len == 1) return pwriteWindows(fd, data[0], offset);
// TODO do these in parallel
var off = offset;
for (data) |buf| {
try pwriteWindows(loop, fd, buf, off);
try pwriteWindows(fd, buf, off);
off += buf.len;
}
}
pub fn pwriteWindows(loop: *Loop, fd: fd_t, data: []const u8, offset: u64) os.WindowsWriteError!void {
pub fn pwriteWindows(fd: fd_t, data: []const u8, offset: u64) os.WindowsWriteError!void {
var resume_node = Loop.ResumeNode.Basic{
.base = Loop.ResumeNode{
.id = Loop.ResumeNode.Id.Basic,
@ -143,9 +146,9 @@ pub fn pwriteWindows(loop: *Loop, fd: fd_t, data: []const u8, offset: u64) os.Wi
},
};
// TODO only call create io completion port once per fd
_ = windows.CreateIoCompletionPort(fd, loop.os_data.io_port, undefined, undefined);
loop.beginOneEvent();
errdefer loop.finishOneEvent();
_ = windows.CreateIoCompletionPort(fd, global_event_loop.os_data.io_port, undefined, undefined);
global_event_loop.beginOneEvent();
errdefer global_event_loop.finishOneEvent();
errdefer {
_ = windows.kernel32.CancelIoEx(fd, &resume_node.base.overlapped);
@ -168,12 +171,7 @@ pub fn pwriteWindows(loop: *Loop, fd: fd_t, data: []const u8, offset: u64) os.Wi
}
/// iovecs must live until pwritev frame completes.
pub fn pwritevPosix(
loop: *Loop,
fd: fd_t,
iovecs: []const os.iovec_const,
offset: usize,
) os.WriteError!void {
pub fn pwritevPosix(fd: fd_t, iovecs: []const os.iovec_const, offset: usize) os.WriteError!void {
var req_node = RequestNode{
.prev = null,
.next = null,
@ -196,21 +194,17 @@ pub fn pwritevPosix(
},
};
errdefer loop.posixFsCancel(&req_node);
errdefer global_event_loop.posixFsCancel(&req_node);
suspend {
loop.posixFsRequest(&req_node);
global_event_loop.posixFsRequest(&req_node);
}
return req_node.data.msg.PWriteV.result;
}
/// iovecs must live until pwritev frame completes.
pub fn writevPosix(
loop: *Loop,
fd: fd_t,
iovecs: []const os.iovec_const,
) os.WriteError!void {
pub fn writevPosix(fd: fd_t, iovecs: []const os.iovec_const) os.WriteError!void {
var req_node = RequestNode{
.prev = null,
.next = null,
@ -233,7 +227,7 @@ pub fn writevPosix(
};
suspend {
loop.posixFsRequest(&req_node);
global_event_loop.posixFsRequest(&req_node);
}
return req_node.data.msg.WriteV.result;
@ -242,7 +236,7 @@ pub fn writevPosix(
pub const PReadVError = error{OutOfMemory} || File.ReadError;
/// data - just the inner references - must live until preadv frame completes.
pub fn preadv(loop: *Loop, fd: fd_t, data: []const []u8, offset: usize) PReadVError!usize {
pub fn preadv(allocator: *Allocator, fd: fd_t, data: []const []u8, offset: usize) PReadVError!usize {
assert(data.len != 0);
switch (builtin.os) {
.macosx,
@ -251,8 +245,8 @@ pub fn preadv(loop: *Loop, fd: fd_t, data: []const []u8, offset: usize) PReadVEr
.netbsd,
.dragonfly,
=> {
const iovecs = try loop.allocator.alloc(os.iovec, data.len);
defer loop.allocator.free(iovecs);
const iovecs = try allocator.alloc(os.iovec, data.len);
defer allocator.free(iovecs);
for (data) |buf, i| {
iovecs[i] = os.iovec{
@ -261,21 +255,21 @@ pub fn preadv(loop: *Loop, fd: fd_t, data: []const []u8, offset: usize) PReadVEr
};
}
return preadvPosix(loop, fd, iovecs, offset);
return preadvPosix(fd, iovecs, offset);
},
.windows => {
const data_copy = try std.mem.dupe(loop.allocator, []u8, data);
defer loop.allocator.free(data_copy);
return preadvWindows(loop, fd, data_copy, offset);
const data_copy = try std.mem.dupe(allocator, []u8, data);
defer allocator.free(data_copy);
return preadvWindows(fd, data_copy, offset);
},
else => @compileError("Unsupported OS"),
}
}
/// data must outlive the returned frame
pub fn preadvWindows(loop: *Loop, fd: fd_t, data: []const []u8, offset: u64) !usize {
pub fn preadvWindows(fd: fd_t, data: []const []u8, offset: u64) !usize {
assert(data.len != 0);
if (data.len == 1) return preadWindows(loop, fd, data[0], offset);
if (data.len == 1) return preadWindows(fd, data[0], offset);
// TODO do these in parallel?
var off: usize = 0;
@ -283,7 +277,7 @@ pub fn preadvWindows(loop: *Loop, fd: fd_t, data: []const []u8, offset: u64) !us
var inner_off: usize = 0;
while (true) {
const v = data[iov_i];
const amt_read = try preadWindows(loop, fd, v[inner_off .. v.len - inner_off], offset + off);
const amt_read = try preadWindows(fd, v[inner_off .. v.len - inner_off], offset + off);
off += amt_read;
inner_off += amt_read;
if (inner_off == v.len) {
@ -297,7 +291,7 @@ pub fn preadvWindows(loop: *Loop, fd: fd_t, data: []const []u8, offset: u64) !us
}
}
pub fn preadWindows(loop: *Loop, fd: fd_t, data: []u8, offset: u64) !usize {
pub fn preadWindows(fd: fd_t, data: []u8, offset: u64) !usize {
var resume_node = Loop.ResumeNode.Basic{
.base = Loop.ResumeNode{
.id = Loop.ResumeNode.Id.Basic,
@ -312,9 +306,9 @@ pub fn preadWindows(loop: *Loop, fd: fd_t, data: []u8, offset: u64) !usize {
},
};
// TODO only call create io completion port once per fd
_ = windows.CreateIoCompletionPort(fd, loop.os_data.io_port, undefined, undefined) catch undefined;
loop.beginOneEvent();
errdefer loop.finishOneEvent();
_ = windows.CreateIoCompletionPort(fd, global_event_loop.os_data.io_port, undefined, undefined) catch undefined;
global_event_loop.beginOneEvent();
errdefer global_event_loop.finishOneEvent();
errdefer {
_ = windows.kernel32.CancelIoEx(fd, &resume_node.base.overlapped);
@ -336,12 +330,7 @@ pub fn preadWindows(loop: *Loop, fd: fd_t, data: []u8, offset: u64) !usize {
}
/// iovecs must live until preadv frame completes
pub fn preadvPosix(
loop: *Loop,
fd: fd_t,
iovecs: []const os.iovec,
offset: usize,
) os.ReadError!usize {
pub fn preadvPosix(fd: fd_t, iovecs: []const os.iovec, offset: usize) os.ReadError!usize {
var req_node = RequestNode{
.prev = null,
.next = null,
@ -364,21 +353,16 @@ pub fn preadvPosix(
},
};
errdefer loop.posixFsCancel(&req_node);
errdefer global_event_loop.posixFsCancel(&req_node);
suspend {
loop.posixFsRequest(&req_node);
global_event_loop.posixFsRequest(&req_node);
}
return req_node.data.msg.PReadV.result;
}
pub fn openPosix(
loop: *Loop,
path: []const u8,
flags: u32,
mode: File.Mode,
) File.OpenError!fd_t {
pub fn openPosix(path: []const u8, flags: u32, mode: File.Mode) File.OpenError!fd_t {
const path_c = try std.os.toPosixPath(path);
var req_node = RequestNode{
@ -403,21 +387,21 @@ pub fn openPosix(
},
};
errdefer loop.posixFsCancel(&req_node);
errdefer global_event_loop.posixFsCancel(&req_node);
suspend {
loop.posixFsRequest(&req_node);
global_event_loop.posixFsRequest(&req_node);
}
return req_node.data.msg.Open.result;
}
pub fn openRead(loop: *Loop, path: []const u8) File.OpenError!fd_t {
pub fn openRead(path: []const u8) File.OpenError!fd_t {
switch (builtin.os) {
.macosx, .linux, .freebsd, .netbsd, .dragonfly => {
const O_LARGEFILE = if (@hasDecl(os, "O_LARGEFILE")) os.O_LARGEFILE else 0;
const flags = O_LARGEFILE | os.O_RDONLY | os.O_CLOEXEC;
return openPosix(loop, path, flags, File.default_mode);
return openPosix(path, flags, File.default_mode);
},
.windows => return windows.CreateFile(
@ -436,12 +420,12 @@ pub fn openRead(loop: *Loop, path: []const u8) File.OpenError!fd_t {
/// Creates if does not exist. Truncates the file if it exists.
/// Uses the default mode.
pub fn openWrite(loop: *Loop, path: []const u8) File.OpenError!fd_t {
return openWriteMode(loop, path, File.default_mode);
pub fn openWrite(path: []const u8) File.OpenError!fd_t {
return openWriteMode(path, File.default_mode);
}
/// Creates if does not exist. Truncates the file if it exists.
pub fn openWriteMode(loop: *Loop, path: []const u8, mode: File.Mode) File.OpenError!fd_t {
pub fn openWriteMode(path: []const u8, mode: File.Mode) File.OpenError!fd_t {
switch (builtin.os) {
.macosx,
.linux,
@ -451,7 +435,7 @@ pub fn openWriteMode(loop: *Loop, path: []const u8, mode: File.Mode) File.OpenEr
=> {
const O_LARGEFILE = if (@hasDecl(os, "O_LARGEFILE")) os.O_LARGEFILE else 0;
const flags = O_LARGEFILE | os.O_WRONLY | os.O_CREAT | os.O_CLOEXEC | os.O_TRUNC;
return openPosix(loop, path, flags, File.default_mode);
return openPosix(path, flags, File.default_mode);
},
.windows => return windows.CreateFile(
path,
@ -467,16 +451,12 @@ pub fn openWriteMode(loop: *Loop, path: []const u8, mode: File.Mode) File.OpenEr
}
/// Creates if does not exist. Does not truncate.
pub fn openReadWrite(
loop: *Loop,
path: []const u8,
mode: File.Mode,
) File.OpenError!fd_t {
pub fn openReadWrite(path: []const u8, mode: File.Mode) File.OpenError!fd_t {
switch (builtin.os) {
.macosx, .linux, .freebsd, .netbsd, .dragonfly => {
const O_LARGEFILE = if (@hasDecl(os, "O_LARGEFILE")) os.O_LARGEFILE else 0;
const flags = O_LARGEFILE | os.O_RDWR | os.O_CREAT | os.O_CLOEXEC;
return openPosix(loop, path, flags, mode);
return openPosix(path, flags, mode);
},
.windows => return windows.CreateFile(
@ -500,7 +480,7 @@ pub fn openReadWrite(
/// If you call `setHandle` then finishing will close the fd; otherwise finishing
/// will deallocate the `CloseOperation`.
pub const CloseOperation = struct {
loop: *Loop,
allocator: *Allocator,
os_data: OsData,
const OsData = switch (builtin.os) {
@ -518,10 +498,10 @@ pub const CloseOperation = struct {
close_req_node: RequestNode,
};
pub fn start(loop: *Loop) (error{OutOfMemory}!*CloseOperation) {
const self = try loop.allocator.create(CloseOperation);
pub fn start(allocator: *Allocator) (error{OutOfMemory}!*CloseOperation) {
const self = try allocator.create(CloseOperation);
self.* = CloseOperation{
.loop = loop,
.allocator = allocator,
.os_data = switch (builtin.os) {
.linux, .macosx, .freebsd, .netbsd, .dragonfly => initOsDataPosix(self),
.windows => OsData{ .handle = null },
@ -557,16 +537,16 @@ pub const CloseOperation = struct {
.dragonfly,
=> {
if (self.os_data.have_fd) {
self.loop.posixFsRequest(&self.os_data.close_req_node);
global_event_loop.posixFsRequest(&self.os_data.close_req_node);
} else {
self.loop.allocator.destroy(self);
self.allocator.destroy(self);
}
},
.windows => {
if (self.os_data.handle) |handle| {
os.close(handle);
}
self.loop.allocator.destroy(self);
self.allocator.destroy(self);
},
else => @compileError("Unsupported OS"),
}
@ -629,25 +609,25 @@ pub const CloseOperation = struct {
/// contents must remain alive until writeFile completes.
/// TODO make this atomic or provide writeFileAtomic and rename this one to writeFileTruncate
pub fn writeFile(loop: *Loop, path: []const u8, contents: []const u8) !void {
return writeFileMode(loop, path, contents, File.default_mode);
pub fn writeFile(allocator: *Allocator, path: []const u8, contents: []const u8) !void {
return writeFileMode(allocator, path, contents, File.default_mode);
}
/// contents must remain alive until writeFile completes.
pub fn writeFileMode(loop: *Loop, path: []const u8, contents: []const u8, mode: File.Mode) !void {
pub fn writeFileMode(allocator: *Allocator, path: []const u8, contents: []const u8, mode: File.Mode) !void {
switch (builtin.os) {
.linux,
.macosx,
.freebsd,
.netbsd,
.dragonfly,
=> return writeFileModeThread(loop, path, contents, mode),
.windows => return writeFileWindows(loop, path, contents),
=> return writeFileModeThread(allocator, path, contents, mode),
.windows => return writeFileWindows(path, contents),
else => @compileError("Unsupported OS"),
}
}
fn writeFileWindows(loop: *Loop, path: []const u8, contents: []const u8) !void {
fn writeFileWindows(path: []const u8, contents: []const u8) !void {
const handle = try windows.CreateFile(
path,
windows.GENERIC_WRITE,
@ -659,12 +639,12 @@ fn writeFileWindows(loop: *Loop, path: []const u8, contents: []const u8) !void {
);
defer os.close(handle);
try pwriteWindows(loop, handle, contents, 0);
try pwriteWindows(handle, contents, 0);
}
fn writeFileModeThread(loop: *Loop, path: []const u8, contents: []const u8, mode: File.Mode) !void {
const path_with_null = try std.cstr.addNullByte(loop.allocator, path);
defer loop.allocator.free(path_with_null);
fn writeFileModeThread(allocator: *Allocator, path: []const u8, contents: []const u8, mode: File.Mode) !void {
const path_with_null = try std.cstr.addNullByte(allocator, path);
defer allocator.free(path_with_null);
var req_node = RequestNode{
.prev = null,
@ -688,10 +668,10 @@ fn writeFileModeThread(loop: *Loop, path: []const u8, contents: []const u8, mode
},
};
errdefer loop.posixFsCancel(&req_node);
errdefer global_event_loop.posixFsCancel(&req_node);
suspend {
loop.posixFsRequest(&req_node);
global_event_loop.posixFsRequest(&req_node);
}
return req_node.data.msg.WriteFile.result;
@ -700,21 +680,21 @@ fn writeFileModeThread(loop: *Loop, path: []const u8, contents: []const u8, mode
/// The frame resumes when the last data has been confirmed written, but before the file handle
/// is closed.
/// Caller owns returned memory.
pub fn readFile(loop: *Loop, file_path: []const u8, max_size: usize) ![]u8 {
var close_op = try CloseOperation.start(loop);
pub fn readFile(allocator: *Allocator, file_path: []const u8, max_size: usize) ![]u8 {
var close_op = try CloseOperation.start();
defer close_op.finish();
const fd = try openRead(loop, file_path);
const fd = try openRead(file_path);
close_op.setHandle(fd);
var list = std.ArrayList(u8).init(loop.allocator);
var list = std.ArrayList(u8).init(allocator);
defer list.deinit();
while (true) {
try list.ensureCapacity(list.len + mem.page_size);
const buf = list.items[list.len..];
const buf_array = [_][]u8{buf};
const amt = try preadv(loop, fd, buf_array, list.len);
const amt = try preadv(fd, buf_array, list.len);
list.len += amt;
if (list.len > max_size) {
return error.FileTooBig;
@ -1392,16 +1372,15 @@ fn testFsWatch(loop: *Loop) !void {
pub const OutStream = struct {
fd: fd_t,
stream: Stream,
loop: *Loop,
allocator: *Allocator,
offset: usize,
pub const Error = File.WriteError;
pub const Stream = event.io.OutStream(Error);
pub fn init(loop: *Loop, fd: fd_t, offset: usize) OutStream {
pub fn init(allocator: *Allocator, fd: fd_t, offset: usize) OutStream {
return OutStream{
.fd = fd,
.loop = loop,
.offset = offset,
.stream = Stream{ .writeFn = writeFn },
};
@ -1411,23 +1390,22 @@ pub const OutStream = struct {
const self = @fieldParentPtr(OutStream, "stream", out_stream);
const offset = self.offset;
self.offset += bytes.len;
return pwritev(self.loop, self.fd, [][]const u8{bytes}, offset);
return pwritev(self.allocator, self.fd, [_][]const u8{bytes}, offset);
}
};
pub const InStream = struct {
fd: fd_t,
stream: Stream,
loop: *Loop,
allocator: *Allocator,
offset: usize,
pub const Error = PReadVError; // TODO make this not have OutOfMemory
pub const Stream = event.io.InStream(Error);
pub fn init(loop: *Loop, fd: fd_t, offset: usize) InStream {
pub fn init(allocator: *Allocator, fd: fd_t, offset: usize) InStream {
return InStream{
.fd = fd,
.loop = loop,
.offset = offset,
.stream = Stream{ .readFn = readFn },
};
@ -1435,7 +1413,7 @@ pub const InStream = struct {
fn readFn(in_stream: *Stream, bytes: []u8) Error!usize {
const self = @fieldParentPtr(InStream, "stream", in_stream);
const amt = try preadv(self.loop, self.fd, [][]u8{bytes}, self.offset);
const amt = try preadv(self.allocator, self.fd, [_][]u8{bytes}, self.offset);
self.offset += amt;
return amt;
}