From 20f5f5698600589b32f3c4af19e856b5faf578da Mon Sep 17 00:00:00 2001 From: Vexu <15308111+Vexu@users.noreply.github.com> Date: Sun, 24 Nov 2019 14:08:51 +0200 Subject: [PATCH] uncomment event.fs.watch --- lib/std/event/fs.zig | 1180 +++++++++++++++++++++--------------------- 1 file changed, 590 insertions(+), 590 deletions(-) diff --git a/lib/std/event/fs.zig b/lib/std/event/fs.zig index 63e073fcb..c46fd7903 100644 --- a/lib/std/event/fs.zig +++ b/lib/std/event/fs.zig @@ -720,602 +720,602 @@ fn hashString(s: []const u16) u32 { return @truncate(u32, std.hash.Wyhash.hash(0, @sliceToBytes(s))); } -//pub const WatchEventError = error{ -// UserResourceLimitReached, -// SystemResources, -// AccessDenied, -// Unexpected, // TODO remove this possibility -//}; -// -//pub fn Watch(comptime V: type) type { -// return struct { -// channel: *event.Channel(Event.Error!Event), -// os_data: OsData, -// -// const OsData = switch (builtin.os) { -// .macosx, .freebsd, .netbsd, .dragonfly => struct { -// file_table: FileTable, -// table_lock: event.Lock, -// -// const FileTable = std.StringHashmap(*Put); -// const Put = struct { -// putter: anyframe, -// value_ptr: *V, -// }; -// }, -// -// .linux => LinuxOsData, -// .windows => WindowsOsData, -// -// else => @compileError("Unsupported OS"), -// }; -// -// const WindowsOsData = struct { -// table_lock: event.Lock, -// dir_table: DirTable, -// all_putters: std.atomic.Queue(anyframe), -// ref_count: std.atomic.Int(usize), -// -// const DirTable = std.StringHashMap(*Dir); -// const FileTable = std.HashMap([]const u16, V, hashString, eqlString); -// -// const Dir = struct { -// putter: anyframe, -// file_table: FileTable, -// table_lock: event.Lock, -// }; -// }; -// -// const LinuxOsData = struct { -// putter: anyframe, -// inotify_fd: i32, -// wd_table: WdTable, -// table_lock: event.Lock, -// -// const WdTable = std.AutoHashMap(i32, Dir); -// const FileTable = std.StringHashMap(V); -// -// const Dir = struct { -// dirname: []const u8, -// file_table: FileTable, -// }; -// }; -// -// const FileToHandle = std.StringHashMap(anyframe); -// -// const Self = @This(); -// -// pub const Event = struct { -// id: Id, -// data: V, -// -// pub const Id = WatchEventId; -// pub const Error = WatchEventError; -// }; -// -// pub fn create(loop: *Loop, event_buf_count: usize) !*Self { -// const channel = try event.Channel(Self.Event.Error!Self.Event).create(loop, event_buf_count); -// errdefer channel.destroy(); -// -// switch (builtin.os) { -// .linux => { -// const inotify_fd = try os.inotify_init1(os.linux.IN_NONBLOCK | os.linux.IN_CLOEXEC); -// errdefer os.close(inotify_fd); -// -// var result: *Self = undefined; -// _ = try async linuxEventPutter(inotify_fd, channel, &result); -// return result; -// }, -// -// .windows => { -// const self = try loop.allocator.create(Self); -// errdefer loop.allocator.destroy(self); -// self.* = Self{ -// .channel = channel, -// .os_data = OsData{ -// .table_lock = event.Lock.init(loop), -// .dir_table = OsData.DirTable.init(loop.allocator), -// .ref_count = std.atomic.Int(usize).init(1), -// .all_putters = std.atomic.Queue(anyframe).init(), -// }, -// }; -// return self; -// }, -// -// .macosx, .freebsd, .netbsd, .dragonfly => { -// const self = try loop.allocator.create(Self); -// errdefer loop.allocator.destroy(self); -// -// self.* = Self{ -// .channel = channel, -// .os_data = OsData{ -// .table_lock = event.Lock.init(loop), -// .file_table = OsData.FileTable.init(loop.allocator), -// }, -// }; -// return self; -// }, -// else => @compileError("Unsupported OS"), -// } -// } -// -// /// All addFile calls and removeFile calls must have completed. -// pub fn destroy(self: *Self) void { -// switch (builtin.os) { -// .macosx, .freebsd, .netbsd, .dragonfly => { -// // TODO we need to cancel the frames before destroying the lock -// self.os_data.table_lock.deinit(); -// var it = self.os_data.file_table.iterator(); -// while (it.next()) |entry| { -// cancel entry.value.putter; -// self.channel.loop.allocator.free(entry.key); -// } -// self.channel.destroy(); -// }, -// .linux => cancel self.os_data.putter, -// .windows => { -// while (self.os_data.all_putters.get()) |putter_node| { -// cancel putter_node.data; -// } -// self.deref(); -// }, -// else => @compileError("Unsupported OS"), -// } -// } -// -// fn ref(self: *Self) void { -// _ = self.os_data.ref_count.incr(); -// } -// -// fn deref(self: *Self) void { -// if (self.os_data.ref_count.decr() == 1) { -// const allocator = self.channel.loop.allocator; -// self.os_data.table_lock.deinit(); -// var it = self.os_data.dir_table.iterator(); -// while (it.next()) |entry| { -// allocator.free(entry.key); -// allocator.destroy(entry.value); -// } -// self.os_data.dir_table.deinit(); -// self.channel.destroy(); -// allocator.destroy(self); -// } -// } -// -// pub async fn addFile(self: *Self, file_path: []const u8, value: V) !?V { -// switch (builtin.os) { -// .macosx, .freebsd, .netbsd, .dragonfly => return await (async addFileKEvent(self, file_path, value) catch unreachable), -// .linux => return await (async addFileLinux(self, file_path, value) catch unreachable), -// .windows => return await (async addFileWindows(self, file_path, value) catch unreachable), -// else => @compileError("Unsupported OS"), -// } -// } -// -// async fn addFileKEvent(self: *Self, file_path: []const u8, value: V) !?V { -// const resolved_path = try std.fs.path.resolve(self.channel.loop.allocator, [_][]const u8{file_path}); -// var resolved_path_consumed = false; -// defer if (!resolved_path_consumed) self.channel.loop.allocator.free(resolved_path); -// -// var close_op = try CloseOperation.start(self.channel.loop); -// var close_op_consumed = false; -// defer if (!close_op_consumed) close_op.finish(); -// -// const flags = if (comptime std.Target.current.isDarwin()) os.O_SYMLINK | os.O_EVTONLY else 0; -// const mode = 0; -// const fd = try await (async openPosix(self.channel.loop, resolved_path, flags, mode) catch unreachable); -// close_op.setHandle(fd); -// -// var put_data: *OsData.Put = undefined; -// const putter = try async self.kqPutEvents(close_op, value, &put_data); -// close_op_consumed = true; -// errdefer cancel putter; -// -// const result = blk: { -// const held = await (async self.os_data.table_lock.acquire() catch unreachable); -// defer held.release(); -// -// const gop = try self.os_data.file_table.getOrPut(resolved_path); -// if (gop.found_existing) { -// const prev_value = gop.kv.value.value_ptr.*; -// cancel gop.kv.value.putter; -// gop.kv.value = put_data; -// break :blk prev_value; -// } else { -// resolved_path_consumed = true; -// gop.kv.value = put_data; -// break :blk null; -// } -// }; -// -// return result; -// } -// -// async fn kqPutEvents(self: *Self, close_op: *CloseOperation, value: V, out_put: **OsData.Put) void { -// var value_copy = value; -// var put = OsData.Put{ -// .putter = @frame(), -// .value_ptr = &value_copy, -// }; -// out_put.* = &put; -// self.channel.loop.beginOneEvent(); -// -// defer { -// close_op.finish(); -// self.channel.loop.finishOneEvent(); -// } -// -// while (true) { -// if (await (async self.channel.loop.bsdWaitKev( -// @intCast(usize, close_op.getHandle()), -// os.EVFILT_VNODE, -// os.NOTE_WRITE | os.NOTE_DELETE, -// ) catch unreachable)) |kev| { -// // TODO handle EV_ERROR -// if (kev.fflags & os.NOTE_DELETE != 0) { -// await (async self.channel.put(Self.Event{ -// .id = Event.Id.Delete, -// .data = value_copy, -// }) catch unreachable); -// } else if (kev.fflags & os.NOTE_WRITE != 0) { -// await (async self.channel.put(Self.Event{ -// .id = Event.Id.CloseWrite, -// .data = value_copy, -// }) catch unreachable); -// } -// } else |err| switch (err) { -// error.EventNotFound => unreachable, -// error.ProcessNotFound => unreachable, -// error.Overflow => unreachable, -// error.AccessDenied, error.SystemResources => |casted_err| { -// await (async self.channel.put(casted_err) catch unreachable); -// }, -// } -// } -// } -// -// async fn addFileLinux(self: *Self, file_path: []const u8, value: V) !?V { -// const value_copy = value; -// -// const dirname = std.fs.path.dirname(file_path) orelse "."; -// const dirname_with_null = try std.cstr.addNullByte(self.channel.loop.allocator, dirname); -// var dirname_with_null_consumed = false; -// defer if (!dirname_with_null_consumed) self.channel.loop.allocator.free(dirname_with_null); -// -// const basename = std.fs.path.basename(file_path); -// const basename_with_null = try std.cstr.addNullByte(self.channel.loop.allocator, basename); -// var basename_with_null_consumed = false; -// defer if (!basename_with_null_consumed) self.channel.loop.allocator.free(basename_with_null); -// -// const wd = try os.inotify_add_watchC( -// self.os_data.inotify_fd, -// dirname_with_null.ptr, -// os.linux.IN_CLOSE_WRITE | os.linux.IN_ONLYDIR | os.linux.IN_EXCL_UNLINK, -// ); -// // wd is either a newly created watch or an existing one. -// -// const held = await (async self.os_data.table_lock.acquire() catch unreachable); -// defer held.release(); -// -// const gop = try self.os_data.wd_table.getOrPut(wd); -// if (!gop.found_existing) { -// gop.kv.value = OsData.Dir{ -// .dirname = dirname_with_null, -// .file_table = OsData.FileTable.init(self.channel.loop.allocator), -// }; -// dirname_with_null_consumed = true; -// } -// const dir = &gop.kv.value; -// -// const file_table_gop = try dir.file_table.getOrPut(basename_with_null); -// if (file_table_gop.found_existing) { -// const prev_value = file_table_gop.kv.value; -// file_table_gop.kv.value = value_copy; -// return prev_value; -// } else { -// file_table_gop.kv.value = value_copy; -// basename_with_null_consumed = true; -// return null; -// } -// } -// -// async fn addFileWindows(self: *Self, file_path: []const u8, value: V) !?V { -// const value_copy = value; -// // TODO we might need to convert dirname and basename to canonical file paths ("short"?) -// -// const dirname = try std.mem.dupe(self.channel.loop.allocator, u8, std.fs.path.dirname(file_path) orelse "."); -// var dirname_consumed = false; -// defer if (!dirname_consumed) self.channel.loop.allocator.free(dirname); -// -// const dirname_utf16le = try std.unicode.utf8ToUtf16LeWithNull(self.channel.loop.allocator, dirname); -// defer self.channel.loop.allocator.free(dirname_utf16le); -// -// // TODO https://github.com/ziglang/zig/issues/265 -// const basename = std.fs.path.basename(file_path); -// const basename_utf16le_null = try std.unicode.utf8ToUtf16LeWithNull(self.channel.loop.allocator, basename); -// var basename_utf16le_null_consumed = false; -// defer if (!basename_utf16le_null_consumed) self.channel.loop.allocator.free(basename_utf16le_null); -// const basename_utf16le_no_null = basename_utf16le_null[0 .. basename_utf16le_null.len - 1]; -// -// const dir_handle = try windows.CreateFileW( -// dirname_utf16le.ptr, -// windows.FILE_LIST_DIRECTORY, -// windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE | windows.FILE_SHARE_WRITE, -// null, -// windows.OPEN_EXISTING, -// windows.FILE_FLAG_BACKUP_SEMANTICS | windows.FILE_FLAG_OVERLAPPED, -// null, -// ); -// var dir_handle_consumed = false; -// defer if (!dir_handle_consumed) windows.CloseHandle(dir_handle); -// -// const held = await (async self.os_data.table_lock.acquire() catch unreachable); -// defer held.release(); -// -// const gop = try self.os_data.dir_table.getOrPut(dirname); -// if (gop.found_existing) { -// const dir = gop.kv.value; -// const held_dir_lock = await (async dir.table_lock.acquire() catch unreachable); -// defer held_dir_lock.release(); -// -// const file_gop = try dir.file_table.getOrPut(basename_utf16le_no_null); -// if (file_gop.found_existing) { -// const prev_value = file_gop.kv.value; -// file_gop.kv.value = value_copy; -// return prev_value; -// } else { -// file_gop.kv.value = value_copy; -// basename_utf16le_null_consumed = true; -// return null; -// } -// } else { -// errdefer _ = self.os_data.dir_table.remove(dirname); -// const dir = try self.channel.loop.allocator.create(OsData.Dir); -// errdefer self.channel.loop.allocator.destroy(dir); -// -// dir.* = OsData.Dir{ -// .file_table = OsData.FileTable.init(self.channel.loop.allocator), -// .table_lock = event.Lock.init(self.channel.loop), -// .putter = undefined, -// }; -// gop.kv.value = dir; -// assert((try dir.file_table.put(basename_utf16le_no_null, value_copy)) == null); -// basename_utf16le_null_consumed = true; -// -// dir.putter = try async self.windowsDirReader(dir_handle, dir); -// dir_handle_consumed = true; -// -// dirname_consumed = true; -// -// return null; -// } -// } -// -// async fn windowsDirReader(self: *Self, dir_handle: windows.HANDLE, dir: *OsData.Dir) void { -// self.ref(); -// defer self.deref(); -// -// defer os.close(dir_handle); -// -// var putter_node = std.atomic.Queue(anyframe).Node{ -// .data = @frame(), -// .prev = null, -// .next = null, -// }; -// self.os_data.all_putters.put(&putter_node); -// defer _ = self.os_data.all_putters.remove(&putter_node); -// -// var resume_node = Loop.ResumeNode.Basic{ -// .base = Loop.ResumeNode{ -// .id = Loop.ResumeNode.Id.Basic, -// .handle = @frame(), -// .overlapped = windows.OVERLAPPED{ -// .Internal = 0, -// .InternalHigh = 0, -// .Offset = 0, -// .OffsetHigh = 0, -// .hEvent = null, -// }, -// }, -// }; -// var event_buf: [4096]u8 align(@alignOf(windows.FILE_NOTIFY_INFORMATION)) = undefined; -// -// // TODO handle this error not in the channel but in the setup -// _ = windows.CreateIoCompletionPort( -// dir_handle, -// self.channel.loop.os_data.io_port, -// undefined, -// undefined, -// ) catch |err| { -// await (async self.channel.put(err) catch unreachable); -// return; -// }; -// -// while (true) { -// { -// // TODO only 1 beginOneEvent for the whole function -// self.channel.loop.beginOneEvent(); -// errdefer self.channel.loop.finishOneEvent(); -// errdefer { -// _ = windows.kernel32.CancelIoEx(dir_handle, &resume_node.base.overlapped); -// } -// suspend { -// _ = windows.kernel32.ReadDirectoryChangesW( -// dir_handle, -// &event_buf, -// @intCast(windows.DWORD, event_buf.len), -// windows.FALSE, // watch subtree -// windows.FILE_NOTIFY_CHANGE_FILE_NAME | windows.FILE_NOTIFY_CHANGE_DIR_NAME | -// windows.FILE_NOTIFY_CHANGE_ATTRIBUTES | windows.FILE_NOTIFY_CHANGE_SIZE | -// windows.FILE_NOTIFY_CHANGE_LAST_WRITE | windows.FILE_NOTIFY_CHANGE_LAST_ACCESS | -// windows.FILE_NOTIFY_CHANGE_CREATION | windows.FILE_NOTIFY_CHANGE_SECURITY, -// null, // number of bytes transferred (unused for async) -// &resume_node.base.overlapped, -// null, // completion routine - unused because we use IOCP -// ); -// } -// } -// var bytes_transferred: windows.DWORD = undefined; -// if (windows.kernel32.GetOverlappedResult(dir_handle, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) { -// const err = switch (windows.kernel32.GetLastError()) { -// else => |err| windows.unexpectedError(err), -// }; -// await (async self.channel.put(err) catch unreachable); -// } else { -// // can't use @bytesToSlice because of the special variable length name field -// var ptr = event_buf[0..].ptr; -// const end_ptr = ptr + bytes_transferred; -// var ev: *windows.FILE_NOTIFY_INFORMATION = undefined; -// while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) : (ptr += ev.NextEntryOffset) { -// ev = @ptrCast(*windows.FILE_NOTIFY_INFORMATION, ptr); -// const emit = switch (ev.Action) { -// windows.FILE_ACTION_REMOVED => WatchEventId.Delete, -// windows.FILE_ACTION_MODIFIED => WatchEventId.CloseWrite, -// else => null, -// }; -// if (emit) |id| { -// const basename_utf16le = ([*]u16)(&ev.FileName)[0 .. ev.FileNameLength / 2]; -// const user_value = blk: { -// const held = await (async dir.table_lock.acquire() catch unreachable); -// defer held.release(); -// -// if (dir.file_table.get(basename_utf16le)) |entry| { -// break :blk entry.value; -// } else { -// break :blk null; -// } -// }; -// if (user_value) |v| { -// await (async self.channel.put(Event{ -// .id = id, -// .data = v, -// }) catch unreachable); -// } -// } -// if (ev.NextEntryOffset == 0) break; -// } -// } -// } -// } -// -// pub async fn removeFile(self: *Self, file_path: []const u8) ?V { -// @panic("TODO"); -// } -// -// async fn linuxEventPutter(inotify_fd: i32, channel: *event.Channel(Event.Error!Event), out_watch: **Self) void { -// const loop = channel.loop; -// -// var watch = Self{ -// .channel = channel, -// .os_data = OsData{ -// .putter = @frame(), -// .inotify_fd = inotify_fd, -// .wd_table = OsData.WdTable.init(loop.allocator), -// .table_lock = event.Lock.init(loop), -// }, -// }; -// out_watch.* = &watch; -// -// loop.beginOneEvent(); -// -// defer { -// watch.os_data.table_lock.deinit(); -// var wd_it = watch.os_data.wd_table.iterator(); -// while (wd_it.next()) |wd_entry| { -// var file_it = wd_entry.value.file_table.iterator(); -// while (file_it.next()) |file_entry| { -// loop.allocator.free(file_entry.key); -// } -// loop.allocator.free(wd_entry.value.dirname); -// } -// loop.finishOneEvent(); -// os.close(inotify_fd); -// channel.destroy(); -// } -// -// var event_buf: [4096]u8 align(@alignOf(os.linux.inotify_event)) = undefined; -// -// while (true) { -// const rc = os.linux.read(inotify_fd, &event_buf, event_buf.len); -// const errno = os.linux.getErrno(rc); -// switch (errno) { -// 0 => { -// // can't use @bytesToSlice because of the special variable length name field -// var ptr = event_buf[0..].ptr; -// const end_ptr = ptr + event_buf.len; -// var ev: *os.linux.inotify_event = undefined; -// while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) : (ptr += @sizeOf(os.linux.inotify_event) + ev.len) { -// ev = @ptrCast(*os.linux.inotify_event, ptr); -// if (ev.mask & os.linux.IN_CLOSE_WRITE == os.linux.IN_CLOSE_WRITE) { -// const basename_ptr = ptr + @sizeOf(os.linux.inotify_event); -// const basename_with_null = basename_ptr[0 .. std.mem.len(u8, basename_ptr) + 1]; -// const user_value = blk: { -// const held = await (async watch.os_data.table_lock.acquire() catch unreachable); -// defer held.release(); -// -// const dir = &watch.os_data.wd_table.get(ev.wd).?.value; -// if (dir.file_table.get(basename_with_null)) |entry| { -// break :blk entry.value; -// } else { -// break :blk null; -// } -// }; -// if (user_value) |v| { -// await (async channel.put(Event{ -// .id = WatchEventId.CloseWrite, -// .data = v, -// }) catch unreachable); -// } -// } -// } -// }, -// os.linux.EINTR => continue, -// os.linux.EINVAL => unreachable, -// os.linux.EFAULT => unreachable, -// os.linux.EAGAIN => { -// (await (async loop.linuxWaitFd( -// inotify_fd, -// os.linux.EPOLLET | os.linux.EPOLLIN, -// ) catch unreachable)) catch |err| { -// const transformed_err = switch (err) { -// error.FileDescriptorAlreadyPresentInSet => unreachable, -// error.OperationCausesCircularLoop => unreachable, -// error.FileDescriptorNotRegistered => unreachable, -// error.FileDescriptorIncompatibleWithEpoll => unreachable, -// error.Unexpected => unreachable, -// else => |e| e, -// }; -// await (async channel.put(transformed_err) catch unreachable); -// }; -// }, -// else => unreachable, -// } -// } -// } -// }; -//} +pub const WatchEventError = error{ + UserResourceLimitReached, + SystemResources, + AccessDenied, + Unexpected, // TODO remove this possibility +}; + +pub fn Watch(comptime V: type) type { + return struct { + channel: *event.Channel(Event.Error!Event), + os_data: OsData, + + const OsData = switch (builtin.os) { + .macosx, .freebsd, .netbsd, .dragonfly => struct { + file_table: FileTable, + table_lock: event.Lock, + + const FileTable = std.StringHashmap(*Put); + const Put = struct { + putter: anyframe, + value_ptr: *V, + }; + }, + + .linux => LinuxOsData, + .windows => WindowsOsData, + + else => @compileError("Unsupported OS"), + }; + + const WindowsOsData = struct { + table_lock: event.Lock, + dir_table: DirTable, + all_putters: std.atomic.Queue(anyframe), + ref_count: std.atomic.Int(usize), + + const DirTable = std.StringHashMap(*Dir); + const FileTable = std.HashMap([]const u16, V, hashString, eqlString); + + const Dir = struct { + putter: anyframe, + file_table: FileTable, + table_lock: event.Lock, + }; + }; + + const LinuxOsData = struct { + putter: anyframe, + inotify_fd: i32, + wd_table: WdTable, + table_lock: event.Lock, + + const WdTable = std.AutoHashMap(i32, Dir); + const FileTable = std.StringHashMap(V); + + const Dir = struct { + dirname: []const u8, + file_table: FileTable, + }; + }; + + const FileToHandle = std.StringHashMap(anyframe); + + const Self = @This(); + + pub const Event = struct { + id: Id, + data: V, + + pub const Id = WatchEventId; + pub const Error = WatchEventError; + }; + + pub fn create(loop: *Loop, event_buf_count: usize) !*Self { + const channel = try event.Channel(Self.Event.Error!Self.Event).create(loop, event_buf_count); + errdefer channel.destroy(); + + switch (builtin.os) { + .linux => { + const inotify_fd = try os.inotify_init1(os.linux.IN_NONBLOCK | os.linux.IN_CLOEXEC); + errdefer os.close(inotify_fd); + + var result: *Self = undefined; +// _ = try async linuxEventPutter(inotify_fd, channel, &result); + return result; + }, + + .windows => { + const self = try loop.allocator.create(Self); + errdefer loop.allocator.destroy(self); + self.* = Self{ + .channel = channel, + .os_data = OsData{ + .table_lock = event.Lock.init(loop), + .dir_table = OsData.DirTable.init(loop.allocator), + .ref_count = std.atomic.Int(usize).init(1), + .all_putters = std.atomic.Queue(anyframe).init(), + }, + }; + return self; + }, + + .macosx, .freebsd, .netbsd, .dragonfly => { + const self = try loop.allocator.create(Self); + errdefer loop.allocator.destroy(self); + + self.* = Self{ + .channel = channel, + .os_data = OsData{ + .table_lock = event.Lock.init(loop), + .file_table = OsData.FileTable.init(loop.allocator), + }, + }; + return self; + }, + else => @compileError("Unsupported OS"), + } + } + + /// All addFile calls and removeFile calls must have completed. + pub fn destroy(self: *Self) void { + switch (builtin.os) { + .macosx, .freebsd, .netbsd, .dragonfly => { + // TODO we need to cancel the frames before destroying the lock + self.os_data.table_lock.deinit(); + var it = self.os_data.file_table.iterator(); + while (it.next()) |entry| { +// cancel entry.value.putter; + self.channel.loop.allocator.free(entry.key); + } + self.channel.destroy(); + }, +// .linux => cancel self.os_data.putter, + .windows => { + while (self.os_data.all_putters.get()) |putter_node| { +// cancel putter_node.data; + } + self.deref(); + }, + else => @compileError("Unsupported OS"), + } + } + + fn ref(self: *Self) void { + _ = self.os_data.ref_count.incr(); + } + + fn deref(self: *Self) void { + if (self.os_data.ref_count.decr() == 1) { + const allocator = self.channel.loop.allocator; + self.os_data.table_lock.deinit(); + var it = self.os_data.dir_table.iterator(); + while (it.next()) |entry| { + allocator.free(entry.key); + allocator.destroy(entry.value); + } + self.os_data.dir_table.deinit(); + self.channel.destroy(); + allocator.destroy(self); + } + } + + pub async fn addFile(self: *Self, file_path: []const u8, value: V) !?V { + switch (builtin.os) { + .macosx, .freebsd, .netbsd, .dragonfly => return await (async addFileKEvent(self, file_path, value) catch unreachable), + .linux => return await (async addFileLinux(self, file_path, value) catch unreachable), + .windows => return await (async addFileWindows(self, file_path, value) catch unreachable), + else => @compileError("Unsupported OS"), + } + } + + async fn addFileKEvent(self: *Self, file_path: []const u8, value: V) !?V { + const resolved_path = try std.fs.path.resolve(self.channel.loop.allocator, [_][]const u8{file_path}); + var resolved_path_consumed = false; + defer if (!resolved_path_consumed) self.channel.loop.allocator.free(resolved_path); + + var close_op = try CloseOperation.start(self.channel.loop); + var close_op_consumed = false; + defer if (!close_op_consumed) close_op.finish(); + + const flags = if (comptime std.Target.current.isDarwin()) os.O_SYMLINK | os.O_EVTONLY else 0; + const mode = 0; + const fd = try await (async openPosix(self.channel.loop, resolved_path, flags, mode) catch unreachable); + close_op.setHandle(fd); + + var put_data: *OsData.Put = undefined; + const putter = try async self.kqPutEvents(close_op, value, &put_data); + close_op_consumed = true; +// errdefer cancel putter; + + const result = blk: { + const held = await (async self.os_data.table_lock.acquire() catch unreachable); + defer held.release(); + + const gop = try self.os_data.file_table.getOrPut(resolved_path); + if (gop.found_existing) { + const prev_value = gop.kv.value.value_ptr.*; +// cancel gop.kv.value.putter; + gop.kv.value = put_data; + break :blk prev_value; + } else { + resolved_path_consumed = true; + gop.kv.value = put_data; + break :blk null; + } + }; + + return result; + } + + async fn kqPutEvents(self: *Self, close_op: *CloseOperation, value: V, out_put: **OsData.Put) void { + var value_copy = value; + var put = OsData.Put{ + .putter = @frame(), + .value_ptr = &value_copy, + }; + out_put.* = &put; + self.channel.loop.beginOneEvent(); + + defer { + close_op.finish(); + self.channel.loop.finishOneEvent(); + } + + while (true) { + if (await (async self.channel.loop.bsdWaitKev( + @intCast(usize, close_op.getHandle()), + os.EVFILT_VNODE, + os.NOTE_WRITE | os.NOTE_DELETE, + ) catch unreachable)) |kev| { + // TODO handle EV_ERROR + if (kev.fflags & os.NOTE_DELETE != 0) { + await (async self.channel.put(Self.Event{ + .id = Event.Id.Delete, + .data = value_copy, + }) catch unreachable); + } else if (kev.fflags & os.NOTE_WRITE != 0) { + await (async self.channel.put(Self.Event{ + .id = Event.Id.CloseWrite, + .data = value_copy, + }) catch unreachable); + } + } else |err| switch (err) { + error.EventNotFound => unreachable, + error.ProcessNotFound => unreachable, + error.Overflow => unreachable, + error.AccessDenied, error.SystemResources => |casted_err| { + await (async self.channel.put(casted_err) catch unreachable); + }, + } + } + } + + async fn addFileLinux(self: *Self, file_path: []const u8, value: V) !?V { + const value_copy = value; + + const dirname = std.fs.path.dirname(file_path) orelse "."; + const dirname_with_null = try std.cstr.addNullByte(self.channel.loop.allocator, dirname); + var dirname_with_null_consumed = false; + defer if (!dirname_with_null_consumed) self.channel.loop.allocator.free(dirname_with_null); + + const basename = std.fs.path.basename(file_path); + const basename_with_null = try std.cstr.addNullByte(self.channel.loop.allocator, basename); + var basename_with_null_consumed = false; + defer if (!basename_with_null_consumed) self.channel.loop.allocator.free(basename_with_null); + + const wd = try os.inotify_add_watchC( + self.os_data.inotify_fd, + dirname_with_null.ptr, + os.linux.IN_CLOSE_WRITE | os.linux.IN_ONLYDIR | os.linux.IN_EXCL_UNLINK, + ); + // wd is either a newly created watch or an existing one. + + const held = await (async self.os_data.table_lock.acquire() catch unreachable); + defer held.release(); + + const gop = try self.os_data.wd_table.getOrPut(wd); + if (!gop.found_existing) { + gop.kv.value = OsData.Dir{ + .dirname = dirname_with_null, + .file_table = OsData.FileTable.init(self.channel.loop.allocator), + }; + dirname_with_null_consumed = true; + } + const dir = &gop.kv.value; + + const file_table_gop = try dir.file_table.getOrPut(basename_with_null); + if (file_table_gop.found_existing) { + const prev_value = file_table_gop.kv.value; + file_table_gop.kv.value = value_copy; + return prev_value; + } else { + file_table_gop.kv.value = value_copy; + basename_with_null_consumed = true; + return null; + } + } + + async fn addFileWindows(self: *Self, file_path: []const u8, value: V) !?V { + const value_copy = value; + // TODO we might need to convert dirname and basename to canonical file paths ("short"?) + + const dirname = try std.mem.dupe(self.channel.loop.allocator, u8, std.fs.path.dirname(file_path) orelse "."); + var dirname_consumed = false; + defer if (!dirname_consumed) self.channel.loop.allocator.free(dirname); + + const dirname_utf16le = try std.unicode.utf8ToUtf16LeWithNull(self.channel.loop.allocator, dirname); + defer self.channel.loop.allocator.free(dirname_utf16le); + + // TODO https://github.com/ziglang/zig/issues/265 + const basename = std.fs.path.basename(file_path); + const basename_utf16le_null = try std.unicode.utf8ToUtf16LeWithNull(self.channel.loop.allocator, basename); + var basename_utf16le_null_consumed = false; + defer if (!basename_utf16le_null_consumed) self.channel.loop.allocator.free(basename_utf16le_null); + const basename_utf16le_no_null = basename_utf16le_null[0 .. basename_utf16le_null.len - 1]; + + const dir_handle = try windows.CreateFileW( + dirname_utf16le.ptr, + windows.FILE_LIST_DIRECTORY, + windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE | windows.FILE_SHARE_WRITE, + null, + windows.OPEN_EXISTING, + windows.FILE_FLAG_BACKUP_SEMANTICS | windows.FILE_FLAG_OVERLAPPED, + null, + ); + var dir_handle_consumed = false; + defer if (!dir_handle_consumed) windows.CloseHandle(dir_handle); + + const held = await (async self.os_data.table_lock.acquire() catch unreachable); + defer held.release(); + + const gop = try self.os_data.dir_table.getOrPut(dirname); + if (gop.found_existing) { + const dir = gop.kv.value; + const held_dir_lock = await (async dir.table_lock.acquire() catch unreachable); + defer held_dir_lock.release(); + + const file_gop = try dir.file_table.getOrPut(basename_utf16le_no_null); + if (file_gop.found_existing) { + const prev_value = file_gop.kv.value; + file_gop.kv.value = value_copy; + return prev_value; + } else { + file_gop.kv.value = value_copy; + basename_utf16le_null_consumed = true; + return null; + } + } else { + errdefer _ = self.os_data.dir_table.remove(dirname); + const dir = try self.channel.loop.allocator.create(OsData.Dir); + errdefer self.channel.loop.allocator.destroy(dir); + + dir.* = OsData.Dir{ + .file_table = OsData.FileTable.init(self.channel.loop.allocator), + .table_lock = event.Lock.init(self.channel.loop), + .putter = undefined, + }; + gop.kv.value = dir; + assert((try dir.file_table.put(basename_utf16le_no_null, value_copy)) == null); + basename_utf16le_null_consumed = true; + + dir.putter = try async self.windowsDirReader(dir_handle, dir); + dir_handle_consumed = true; + + dirname_consumed = true; + + return null; + } + } + + async fn windowsDirReader(self: *Self, dir_handle: windows.HANDLE, dir: *OsData.Dir) void { + self.ref(); + defer self.deref(); + + defer os.close(dir_handle); + + var putter_node = std.atomic.Queue(anyframe).Node{ + .data = @frame(), + .prev = null, + .next = null, + }; + self.os_data.all_putters.put(&putter_node); + defer _ = self.os_data.all_putters.remove(&putter_node); + + var resume_node = Loop.ResumeNode.Basic{ + .base = Loop.ResumeNode{ + .id = Loop.ResumeNode.Id.Basic, + .handle = @frame(), + .overlapped = windows.OVERLAPPED{ + .Internal = 0, + .InternalHigh = 0, + .Offset = 0, + .OffsetHigh = 0, + .hEvent = null, + }, + }, + }; + var event_buf: [4096]u8 align(@alignOf(windows.FILE_NOTIFY_INFORMATION)) = undefined; + + // TODO handle this error not in the channel but in the setup + _ = windows.CreateIoCompletionPort( + dir_handle, + self.channel.loop.os_data.io_port, + undefined, + undefined, + ) catch |err| { + await (async self.channel.put(err) catch unreachable); + return; + }; + + while (true) { + { + // TODO only 1 beginOneEvent for the whole function + self.channel.loop.beginOneEvent(); + errdefer self.channel.loop.finishOneEvent(); + errdefer { + _ = windows.kernel32.CancelIoEx(dir_handle, &resume_node.base.overlapped); + } + suspend { + _ = windows.kernel32.ReadDirectoryChangesW( + dir_handle, + &event_buf, + @intCast(windows.DWORD, event_buf.len), + windows.FALSE, // watch subtree + windows.FILE_NOTIFY_CHANGE_FILE_NAME | windows.FILE_NOTIFY_CHANGE_DIR_NAME | + windows.FILE_NOTIFY_CHANGE_ATTRIBUTES | windows.FILE_NOTIFY_CHANGE_SIZE | + windows.FILE_NOTIFY_CHANGE_LAST_WRITE | windows.FILE_NOTIFY_CHANGE_LAST_ACCESS | + windows.FILE_NOTIFY_CHANGE_CREATION | windows.FILE_NOTIFY_CHANGE_SECURITY, + null, // number of bytes transferred (unused for async) + &resume_node.base.overlapped, + null, // completion routine - unused because we use IOCP + ); + } + } + var bytes_transferred: windows.DWORD = undefined; + if (windows.kernel32.GetOverlappedResult(dir_handle, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) { + const err = switch (windows.kernel32.GetLastError()) { + else => |err| windows.unexpectedError(err), + }; + await (async self.channel.put(err) catch unreachable); + } else { + // can't use @bytesToSlice because of the special variable length name field + var ptr = event_buf[0..].ptr; + const end_ptr = ptr + bytes_transferred; + var ev: *windows.FILE_NOTIFY_INFORMATION = undefined; + while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) : (ptr += ev.NextEntryOffset) { + ev = @ptrCast(*windows.FILE_NOTIFY_INFORMATION, ptr); + const emit = switch (ev.Action) { + windows.FILE_ACTION_REMOVED => WatchEventId.Delete, + windows.FILE_ACTION_MODIFIED => WatchEventId.CloseWrite, + else => null, + }; + if (emit) |id| { + const basename_utf16le = ([*]u16)(&ev.FileName)[0 .. ev.FileNameLength / 2]; + const user_value = blk: { + const held = await (async dir.table_lock.acquire() catch unreachable); + defer held.release(); + + if (dir.file_table.get(basename_utf16le)) |entry| { + break :blk entry.value; + } else { + break :blk null; + } + }; + if (user_value) |v| { + await (async self.channel.put(Event{ + .id = id, + .data = v, + }) catch unreachable); + } + } + if (ev.NextEntryOffset == 0) break; + } + } + } + } + + pub async fn removeFile(self: *Self, file_path: []const u8) ?V { + @panic("TODO"); + } + + async fn linuxEventPutter(inotify_fd: i32, channel: *event.Channel(Event.Error!Event), out_watch: **Self) void { + const loop = channel.loop; + + var watch = Self{ + .channel = channel, + .os_data = OsData{ + .putter = @frame(), + .inotify_fd = inotify_fd, + .wd_table = OsData.WdTable.init(loop.allocator), + .table_lock = event.Lock.init(loop), + }, + }; + out_watch.* = &watch; + + loop.beginOneEvent(); + + defer { + watch.os_data.table_lock.deinit(); + var wd_it = watch.os_data.wd_table.iterator(); + while (wd_it.next()) |wd_entry| { + var file_it = wd_entry.value.file_table.iterator(); + while (file_it.next()) |file_entry| { + loop.allocator.free(file_entry.key); + } + loop.allocator.free(wd_entry.value.dirname); + } + loop.finishOneEvent(); + os.close(inotify_fd); + channel.destroy(); + } + + var event_buf: [4096]u8 align(@alignOf(os.linux.inotify_event)) = undefined; + + while (true) { + const rc = os.linux.read(inotify_fd, &event_buf, event_buf.len); + const errno = os.linux.getErrno(rc); + switch (errno) { + 0 => { + // can't use @bytesToSlice because of the special variable length name field + var ptr = event_buf[0..].ptr; + const end_ptr = ptr + event_buf.len; + var ev: *os.linux.inotify_event = undefined; + while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) : (ptr += @sizeOf(os.linux.inotify_event) + ev.len) { + ev = @ptrCast(*os.linux.inotify_event, ptr); + if (ev.mask & os.linux.IN_CLOSE_WRITE == os.linux.IN_CLOSE_WRITE) { + const basename_ptr = ptr + @sizeOf(os.linux.inotify_event); + const basename_with_null = basename_ptr[0 .. std.mem.len(u8, basename_ptr) + 1]; + const user_value = blk: { + const held = await (async watch.os_data.table_lock.acquire() catch unreachable); + defer held.release(); + + const dir = &watch.os_data.wd_table.get(ev.wd).?.value; + if (dir.file_table.get(basename_with_null)) |entry| { + break :blk entry.value; + } else { + break :blk null; + } + }; + if (user_value) |v| { + await (async channel.put(Event{ + .id = WatchEventId.CloseWrite, + .data = v, + }) catch unreachable); + } + } + } + }, + os.linux.EINTR => continue, + os.linux.EINVAL => unreachable, + os.linux.EFAULT => unreachable, + os.linux.EAGAIN => { + (await (async loop.linuxWaitFd( + inotify_fd, + os.linux.EPOLLET | os.linux.EPOLLIN, + ) catch unreachable)) catch |err| { + const transformed_err = switch (err) { + error.FileDescriptorAlreadyPresentInSet => unreachable, + error.OperationCausesCircularLoop => unreachable, + error.FileDescriptorNotRegistered => unreachable, + error.FileDescriptorIncompatibleWithEpoll => unreachable, + error.Unexpected => unreachable, + else => |e| e, + }; + await (async channel.put(transformed_err) catch unreachable); + }; + }, + else => unreachable, + } + } + } + }; +} const test_tmp_dir = "std_event_fs_test"; // TODO this test is disabled until the async function rewrite is finished. -//test "write a file, watch it, write it again" { -// return error.SkipZigTest; -// const allocator = std.heap.direct_allocator; -// -// // TODO move this into event loop too -// try os.makePath(allocator, test_tmp_dir); -// defer os.deleteTree(test_tmp_dir) catch {}; -// -// var loop: Loop = undefined; -// try loop.initMultiThreaded(allocator); -// defer loop.deinit(); -// -// var result: anyerror!void = error.ResultNeverWritten; +test "write a file, watch it, write it again" { + return error.SkipZigTest; + const allocator = std.heap.direct_allocator; + + // TODO move this into event loop too + try os.makePath(allocator, test_tmp_dir); + defer os.deleteTree(test_tmp_dir) catch {}; + + var loop: Loop = undefined; + try loop.initMultiThreaded(allocator); + defer loop.deinit(); + + var result: anyerror!void = error.ResultNeverWritten; // const handle = try async testFsWatchCantFail(&loop, &result); // defer cancel handle; -// -// loop.run(); -// return result; -//} + + loop.run(); + return result; +} fn testFsWatchCantFail(loop: *Loop, result: *(anyerror!void)) void { result.* = testFsWatch(loop);