From 4d8d513e16d308131846d98267bc844bf702e9ce Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Sun, 11 Aug 2019 19:53:10 -0400 Subject: [PATCH] all tests passing --- BRANCH_TODO | 6 ++- doc/docgen.zig | 2 +- doc/langref.html.in | 81 ++++++++++--------------------- src/analyze.cpp | 2 +- src/codegen.cpp | 49 ++++++++++--------- src/ir.cpp | 3 ++ std/event/channel.zig | 11 +++-- std/event/fs.zig | 102 ++++++++++------------------------------ std/event/future.zig | 45 ++++++++---------- std/event/group.zig | 68 ++++++++------------------- std/event/io.zig | 19 ++++---- std/event/lock.zig | 54 +++++++++------------ std/event/loop.zig | 4 +- std/event/net.zig | 53 +++++++++------------ std/event/rwlock.zig | 85 ++++++++++++++++----------------- std/zig/parser_test.zig | 2 +- test/compile_errors.zig | 35 ++++---------- 17 files changed, 240 insertions(+), 381 deletions(-) diff --git a/BRANCH_TODO b/BRANCH_TODO index bd797a75a..b2b293aec 100644 --- a/BRANCH_TODO +++ b/BRANCH_TODO @@ -1,10 +1,13 @@ + * for loops need to spill the index. other payload captures probably also need to spill + * compile error (instead of crashing) for trying to get @Frame of generic function + * compile error (instead of crashing) for trying to async call and passing @Frame of wrong function + * `const result = (await a) + (await b);` this causes "Instruction does not dominate all uses" - need spill * compile error for error: expected anyframe->T, found 'anyframe' * compile error for error: expected anyframe->T, found 'i32' * await of a non async function * async call on a non async function * a test where an async function destroys its own frame in a defer * implicit cast of normal function to async function should be allowed when it is inferred to be async - * revive std.event.Loop * @typeInfo for @Frame(func) * peer type resolution of *@Frame(func) and anyframe * peer type resolution of *@Frame(func) and anyframe->T when the return type matches @@ -36,3 +39,4 @@ - it can be assumed that these are always available: the awaiter ptr, return ptr if applicable, error return trace ptr if applicable. - it can be assumed that it is never cancelled + * fix the debug info for variables of async functions diff --git a/doc/docgen.zig b/doc/docgen.zig index 3d3dcba76..92764d764 100644 --- a/doc/docgen.zig +++ b/doc/docgen.zig @@ -770,7 +770,7 @@ fn tokenizeAndPrintRaw(docgen_tokenizer: *Tokenizer, out: var, source_token: Tok .Keyword_or, .Keyword_orelse, .Keyword_packed, - .Keyword_promise, + .Keyword_anyframe, .Keyword_pub, .Keyword_resume, .Keyword_return, diff --git a/doc/langref.html.in b/doc/langref.html.in index ac381e00b..0cb76a4bd 100644 --- a/doc/langref.html.in +++ b/doc/langref.html.in @@ -6024,13 +6024,14 @@ const assert = std.debug.assert; var x: i32 = 1; -test "create a coroutine and cancel it" { - const p = try async simpleAsyncFn(); - comptime assert(@typeOf(p) == promise->void); - cancel p; +test "call an async function" { + var frame = async simpleAsyncFn(); + comptime assert(@typeOf(frame) == @Frame(simpleAsyncFn)); assert(x == 2); } -async<*std.mem.Allocator> fn simpleAsyncFn() void { +fn simpleAsyncFn() void { + x += 1; + suspend; x += 1; } {#code_end#} @@ -6041,60 +6042,33 @@ async<*std.mem.Allocator> fn simpleAsyncFn() void { return to the caller or resumer. The following code demonstrates where control flow goes:

- {#code_begin|test#} -const std = @import("std"); -const assert = std.debug.assert; - -test "coroutine suspend, resume, cancel" { - seq('a'); - const p = try async testAsyncSeq(); - seq('c'); - resume p; - seq('f'); - cancel p; - seq('g'); - - assert(std.mem.eql(u8, points, "abcdefg")); -} -async fn testAsyncSeq() void { - defer seq('e'); - - seq('b'); - suspend; - seq('d'); -} -var points = [_]u8{0} ** "abcdefg".len; -var index: usize = 0; - -fn seq(c: u8) void { - points[index] = c; - index += 1; -} - {#code_end#} +

+ TODO another test example here +

When an async function suspends itself, it must be sure that it will be resumed or canceled somehow, for example by registering its promise handle in an event loop. Use a suspend capture block to gain access to the - promise: + promise (TODO this is outdated):

{#code_begin|test#} const std = @import("std"); const assert = std.debug.assert; +var the_frame: anyframe = undefined; +var result = false; + test "coroutine suspend with block" { - const p = try async testSuspendBlock(); + _ = async testSuspendBlock(); std.debug.assert(!result); - resume a_promise; + resume the_frame; std.debug.assert(result); - cancel p; } -var a_promise: promise = undefined; -var result = false; -async fn testSuspendBlock() void { +fn testSuspendBlock() void { suspend { - comptime assert(@typeOf(@handle()) == promise->void); - a_promise = @handle(); + comptime assert(@typeOf(@frame()) == *@Frame(testSuspendBlock)); + the_frame = @frame(); } result = true; } @@ -6124,16 +6098,13 @@ const std = @import("std"); const assert = std.debug.assert; test "resume from suspend" { - var buf: [500]u8 = undefined; - var a = &std.heap.FixedBufferAllocator.init(buf[0..]).allocator; var my_result: i32 = 1; - const p = try async testResumeFromSuspend(&my_result); - cancel p; + _ = async testResumeFromSuspend(&my_result); std.debug.assert(my_result == 2); } async fn testResumeFromSuspend(my_result: *i32) void { suspend { - resume @handle(); + resume @frame(); } my_result.* += 1; suspend; @@ -6172,30 +6143,30 @@ async fn testResumeFromSuspend(my_result: *i32) void { const std = @import("std"); const assert = std.debug.assert; -var a_promise: promise = undefined; +var the_frame: anyframe = undefined; var final_result: i32 = 0; test "coroutine await" { seq('a'); - const p = async amain() catch unreachable; + _ = async amain(); seq('f'); - resume a_promise; + resume the_frame; seq('i'); assert(final_result == 1234); assert(std.mem.eql(u8, seq_points, "abcdefghi")); } async fn amain() void { seq('b'); - const p = async another() catch unreachable; + var f = async another(); seq('e'); - final_result = await p; + final_result = await f; seq('h'); } async fn another() i32 { seq('c'); suspend { seq('d'); - a_promise = @handle(); + the_frame = @frame(); } seq('g'); return 1234; diff --git a/src/analyze.cpp b/src/analyze.cpp index 7482ba92b..30aa82a21 100644 --- a/src/analyze.cpp +++ b/src/analyze.cpp @@ -5325,7 +5325,7 @@ static Error resolve_coro_frame(CodeGen *g, ZigType *frame_type) { if (*instruction->name_hint == 0) { name = buf_ptr(buf_sprintf("@local%" ZIG_PRI_usize, alloca_i)); } else { - name = instruction->name_hint; + name = buf_ptr(buf_sprintf("%s.%" ZIG_PRI_usize, instruction->name_hint, alloca_i)); } field_names.append(name); field_types.append(child_type); diff --git a/src/codegen.cpp b/src/codegen.cpp index f1a42e321..4510e7156 100644 --- a/src/codegen.cpp +++ b/src/codegen.cpp @@ -535,24 +535,24 @@ static LLVMValueRef make_fn_llvm_value(CodeGen *g, ZigFn *fn) { // use the ABI alignment, which is fine. } - unsigned init_gen_i = 0; - if (!type_has_bits(return_type)) { - // nothing to do - } else if (type_is_nonnull_ptr(return_type)) { - addLLVMAttr(llvm_fn, 0, "nonnull"); - } else if (!is_async && want_first_arg_sret(g, &fn_type->data.fn.fn_type_id)) { - // Sret pointers must not be address 0 - addLLVMArgAttr(llvm_fn, 0, "nonnull"); - addLLVMArgAttr(llvm_fn, 0, "sret"); - if (cc_want_sret_attr(cc)) { - addLLVMArgAttr(llvm_fn, 0, "noalias"); - } - init_gen_i = 1; - } - if (is_async) { addLLVMArgAttr(llvm_fn, 0, "nonnull"); } else { + unsigned init_gen_i = 0; + if (!type_has_bits(return_type)) { + // nothing to do + } else if (type_is_nonnull_ptr(return_type)) { + addLLVMAttr(llvm_fn, 0, "nonnull"); + } else if (want_first_arg_sret(g, &fn_type->data.fn.fn_type_id)) { + // Sret pointers must not be address 0 + addLLVMArgAttr(llvm_fn, 0, "nonnull"); + addLLVMArgAttr(llvm_fn, 0, "sret"); + if (cc_want_sret_attr(cc)) { + addLLVMArgAttr(llvm_fn, 0, "noalias"); + } + init_gen_i = 1; + } + // set parameter attributes FnWalk fn_walk = {}; fn_walk.id = FnWalkIdAttrs; @@ -911,7 +911,7 @@ static Buf *panic_msg_buf(PanicMsgId msg_id) { case PanicMsgIdBadResume: return buf_create_from_str("resumed an async function which already returned"); case PanicMsgIdBadAwait: - return buf_create_from_str("async function awaited/canceled twice"); + return buf_create_from_str("async function awaited twice"); case PanicMsgIdBadReturn: return buf_create_from_str("async function returned twice"); case PanicMsgIdResumedAnAwaitingFn: @@ -2350,6 +2350,10 @@ static LLVMValueRef ir_render_return_begin(CodeGen *g, IrExecutable *executable, return get_handle_value(g, g->cur_ret_ptr, operand_type, get_pointer_to_type(g, operand_type, true)); } +static void set_tail_call_if_appropriate(CodeGen *g, LLVMValueRef call_inst) { + LLVMSetTailCall(call_inst, true); +} + static LLVMValueRef ir_render_return(CodeGen *g, IrExecutable *executable, IrInstructionReturn *instruction) { if (fn_is_async(g->cur_fn)) { LLVMTypeRef usize_type_ref = g->builtin_types.entry_usize->llvm_type; @@ -2394,7 +2398,7 @@ static LLVMValueRef ir_render_return(CodeGen *g, IrExecutable *executable, IrIns LLVMValueRef their_frame_ptr = LLVMBuildIntToPtr(g->builder, masked_prev_val, get_llvm_type(g, any_frame_type), ""); LLVMValueRef call_inst = gen_resume(g, nullptr, their_frame_ptr, ResumeIdReturn, nullptr); - LLVMSetTailCall(call_inst, true); + set_tail_call_if_appropriate(g, call_inst); LLVMBuildRetVoid(g->builder); g->cur_is_after_return = false; @@ -4009,7 +4013,7 @@ static LLVMValueRef ir_render_call(CodeGen *g, IrExecutable *executable, IrInstr LLVMBasicBlockRef call_bb = gen_suspend_begin(g, "CallResume"); LLVMValueRef call_inst = gen_resume(g, fn_val, frame_result_loc, ResumeIdCall, nullptr); - LLVMSetTailCall(call_inst, true); + set_tail_call_if_appropriate(g, call_inst); LLVMBuildRetVoid(g->builder); LLVMPositionBuilderAtEnd(g->builder, call_bb); @@ -5520,7 +5524,7 @@ static LLVMValueRef ir_render_cancel(CodeGen *g, IrExecutable *executable, IrIns LLVMPositionBuilderAtEnd(g->builder, early_return_block); LLVMValueRef call_inst = gen_resume(g, nullptr, target_frame_ptr, ResumeIdAwaitEarlyReturn, awaiter_ored_val); - LLVMSetTailCall(call_inst, true); + set_tail_call_if_appropriate(g, call_inst); LLVMBuildRetVoid(g->builder); LLVMPositionBuilderAtEnd(g->builder, resume_bb); @@ -5556,8 +5560,9 @@ static LLVMValueRef ir_render_await(CodeGen *g, IrExecutable *executable, IrInst } // supply the error return trace pointer - LLVMValueRef my_err_ret_trace_val = get_cur_err_ret_trace_val(g, instruction->base.scope); - if (my_err_ret_trace_val != nullptr) { + if (codegen_fn_has_err_ret_tracing_arg(g, result_type)) { + LLVMValueRef my_err_ret_trace_val = get_cur_err_ret_trace_val(g, instruction->base.scope); + assert(my_err_ret_trace_val != nullptr); LLVMValueRef err_ret_trace_ptr_ptr = LLVMBuildStructGEP(g->builder, target_frame_ptr, frame_index_trace_arg(g, result_type), ""); LLVMBuildStore(g->builder, my_err_ret_trace_val, err_ret_trace_ptr_ptr); @@ -5588,7 +5593,7 @@ static LLVMValueRef ir_render_await(CodeGen *g, IrExecutable *executable, IrInst // Tail resume it now, so that it can complete. LLVMPositionBuilderAtEnd(g->builder, early_return_block); LLVMValueRef call_inst = gen_resume(g, nullptr, target_frame_ptr, ResumeIdAwaitEarlyReturn, awaiter_init_val); - LLVMSetTailCall(call_inst, true); + set_tail_call_if_appropriate(g, call_inst); LLVMBuildRetVoid(g->builder); // Rely on the target to resume us from suspension. diff --git a/src/ir.cpp b/src/ir.cpp index f1d4b80a2..57c50db81 100644 --- a/src/ir.cpp +++ b/src/ir.cpp @@ -15064,6 +15064,9 @@ static IrInstruction *ir_analyze_async_call(IrAnalyze *ira, IrInstructionCallSrc if (result_loc != nullptr && (type_is_invalid(result_loc->value.type) || instr_is_unreachable(result_loc))) { return result_loc; } + result_loc = ir_implicit_cast(ira, result_loc, get_pointer_to_type(ira->codegen, frame_type, false)); + if (type_is_invalid(result_loc->value.type)) + return ira->codegen->invalid_instruction; return &ir_build_call_gen(ira, &call_instruction->base, fn_entry, fn_ref, arg_count, casted_args, FnInlineAuto, true, nullptr, result_loc, frame_type)->base; } diff --git a/std/event/channel.zig b/std/event/channel.zig index c9686e37e..c4f7dca08 100644 --- a/std/event/channel.zig +++ b/std/event/channel.zig @@ -77,18 +77,19 @@ pub fn Channel(comptime T: type) type { /// must be called when all calls to put and get have suspended and no more calls occur pub fn destroy(self: *SelfChannel) void { while (self.getters.get()) |get_node| { - cancel get_node.data.tick_node.data; + resume get_node.data.tick_node.data; } while (self.putters.get()) |put_node| { - cancel put_node.data.tick_node.data; + resume put_node.data.tick_node.data; } self.loop.allocator.free(self.buffer_nodes); self.loop.allocator.destroy(self); } - /// puts a data item in the channel. The promise completes when the value has been added to the + /// puts a data item in the channel. The function returns when the value has been added to the /// buffer, or in the case of a zero size buffer, when the item has been retrieved by a getter. - pub async fn put(self: *SelfChannel, data: T) void { + /// Or when the channel is destroyed. + pub fn put(self: *SelfChannel, data: T) void { var my_tick_node = Loop.NextTickNode.init(@frame()); var queue_node = std.atomic.Queue(PutNode).Node.init(PutNode{ .tick_node = &my_tick_node, @@ -114,7 +115,7 @@ pub fn Channel(comptime T: type) type { } } - /// await this function to get an item from the channel. If the buffer is empty, the promise will + /// await this function to get an item from the channel. If the buffer is empty, the frame will /// complete when the next item is put in the channel. pub async fn get(self: *SelfChannel) T { // TODO integrate this function with named return values diff --git a/std/event/fs.zig b/std/event/fs.zig index 22e9fc38c..fe2f604ac 100644 --- a/std/event/fs.zig +++ b/std/event/fs.zig @@ -76,12 +76,8 @@ pub const Request = struct { pub const PWriteVError = error{OutOfMemory} || File.WriteError; -/// data - just the inner references - must live until pwritev promise completes. +/// data - just the inner references - must live until pwritev frame completes. pub async fn pwritev(loop: *Loop, fd: fd_t, data: []const []const u8, offset: usize) PWriteVError!void { - // workaround for https://github.com/ziglang/zig/issues/1194 - suspend { - resume @handle(); - } switch (builtin.os) { .macosx, .linux, @@ -109,7 +105,7 @@ pub async fn pwritev(loop: *Loop, fd: fd_t, data: []const []const u8, offset: us } } -/// data must outlive the returned promise +/// data must outlive the returned frame pub async fn pwritevWindows(loop: *Loop, fd: fd_t, data: []const []const u8, offset: usize) os.WindowsWriteError!void { if (data.len == 0) return; if (data.len == 1) return await (async pwriteWindows(loop, fd, data[0], offset) catch unreachable); @@ -123,15 +119,10 @@ pub async fn pwritevWindows(loop: *Loop, fd: fd_t, data: []const []const u8, off } pub async fn pwriteWindows(loop: *Loop, fd: fd_t, data: []const u8, offset: u64) os.WindowsWriteError!void { - // workaround for https://github.com/ziglang/zig/issues/1194 - suspend { - resume @handle(); - } - var resume_node = Loop.ResumeNode.Basic{ .base = Loop.ResumeNode{ .id = Loop.ResumeNode.Id.Basic, - .handle = @handle(), + .handle = @frame(), .overlapped = windows.OVERLAPPED{ .Internal = 0, .InternalHigh = 0, @@ -166,18 +157,13 @@ pub async fn pwriteWindows(loop: *Loop, fd: fd_t, data: []const u8, offset: u64) } } -/// iovecs must live until pwritev promise completes. +/// iovecs must live until pwritev frame completes. pub async fn pwritevPosix( loop: *Loop, fd: fd_t, iovecs: []const os.iovec_const, offset: usize, ) os.WriteError!void { - // workaround for https://github.com/ziglang/zig/issues/1194 - suspend { - resume @handle(); - } - var req_node = RequestNode{ .prev = null, .next = null, @@ -194,7 +180,7 @@ pub async fn pwritevPosix( .TickNode = Loop.NextTickNode{ .prev = null, .next = null, - .data = @handle(), + .data = @frame(), }, }, }, @@ -211,13 +197,8 @@ pub async fn pwritevPosix( pub const PReadVError = error{OutOfMemory} || File.ReadError; -/// data - just the inner references - must live until preadv promise completes. +/// data - just the inner references - must live until preadv frame completes. pub async fn preadv(loop: *Loop, fd: fd_t, data: []const []u8, offset: usize) PReadVError!usize { - // workaround for https://github.com/ziglang/zig/issues/1194 - suspend { - resume @handle(); - } - assert(data.len != 0); switch (builtin.os) { .macosx, @@ -246,7 +227,7 @@ pub async fn preadv(loop: *Loop, fd: fd_t, data: []const []u8, offset: usize) PR } } -/// data must outlive the returned promise +/// data must outlive the returned frame pub async fn preadvWindows(loop: *Loop, fd: fd_t, data: []const []u8, offset: u64) !usize { assert(data.len != 0); if (data.len == 1) return await (async preadWindows(loop, fd, data[0], offset) catch unreachable); @@ -272,15 +253,10 @@ pub async fn preadvWindows(loop: *Loop, fd: fd_t, data: []const []u8, offset: u6 } pub async fn preadWindows(loop: *Loop, fd: fd_t, data: []u8, offset: u64) !usize { - // workaround for https://github.com/ziglang/zig/issues/1194 - suspend { - resume @handle(); - } - var resume_node = Loop.ResumeNode.Basic{ .base = Loop.ResumeNode{ .id = Loop.ResumeNode.Id.Basic, - .handle = @handle(), + .handle = @frame(), .overlapped = windows.OVERLAPPED{ .Internal = 0, .InternalHigh = 0, @@ -314,18 +290,13 @@ pub async fn preadWindows(loop: *Loop, fd: fd_t, data: []u8, offset: u64) !usize return usize(bytes_transferred); } -/// iovecs must live until preadv promise completes +/// iovecs must live until preadv frame completes pub async fn preadvPosix( loop: *Loop, fd: fd_t, iovecs: []const os.iovec, offset: usize, ) os.ReadError!usize { - // workaround for https://github.com/ziglang/zig/issues/1194 - suspend { - resume @handle(); - } - var req_node = RequestNode{ .prev = null, .next = null, @@ -342,7 +313,7 @@ pub async fn preadvPosix( .TickNode = Loop.NextTickNode{ .prev = null, .next = null, - .data = @handle(), + .data = @frame(), }, }, }, @@ -363,11 +334,6 @@ pub async fn openPosix( flags: u32, mode: File.Mode, ) File.OpenError!fd_t { - // workaround for https://github.com/ziglang/zig/issues/1194 - suspend { - resume @handle(); - } - const path_c = try std.os.toPosixPath(path); var req_node = RequestNode{ @@ -386,7 +352,7 @@ pub async fn openPosix( .TickNode = Loop.NextTickNode{ .prev = null, .next = null, - .data = @handle(), + .data = @frame(), }, }, }, @@ -643,11 +609,6 @@ async fn writeFileWindows(loop: *Loop, path: []const u8, contents: []const u8) ! } async fn writeFileModeThread(loop: *Loop, path: []const u8, contents: []const u8, mode: File.Mode) !void { - // workaround for https://github.com/ziglang/zig/issues/1194 - suspend { - resume @handle(); - } - const path_with_null = try std.cstr.addNullByte(loop.allocator, path); defer loop.allocator.free(path_with_null); @@ -667,7 +628,7 @@ async fn writeFileModeThread(loop: *Loop, path: []const u8, contents: []const u8 .TickNode = Loop.NextTickNode{ .prev = null, .next = null, - .data = @handle(), + .data = @frame(), }, }, }, @@ -682,7 +643,7 @@ async fn writeFileModeThread(loop: *Loop, path: []const u8, contents: []const u8 return req_node.data.msg.WriteFile.result; } -/// The promise resumes when the last data has been confirmed written, but before the file handle +/// The frame resumes when the last data has been confirmed written, but before the file handle /// is closed. /// Caller owns returned memory. pub async fn readFile(loop: *Loop, file_path: []const u8, max_size: usize) ![]u8 { @@ -734,7 +695,7 @@ pub const WatchEventId = enum { // // const FileTable = std.AutoHashMap([]const u8, *Put); // const Put = struct { -// putter: promise, +// putter: anyframe, // value_ptr: *V, // }; // }, @@ -748,21 +709,21 @@ pub const WatchEventId = enum { // const WindowsOsData = struct { // table_lock: event.Lock, // dir_table: DirTable, -// all_putters: std.atomic.Queue(promise), +// all_putters: std.atomic.Queue(anyframe), // ref_count: std.atomic.Int(usize), // // const DirTable = std.AutoHashMap([]const u8, *Dir); // const FileTable = std.AutoHashMap([]const u16, V); // // const Dir = struct { -// putter: promise, +// putter: anyframe, // file_table: FileTable, // table_lock: event.Lock, // }; // }; // // const LinuxOsData = struct { -// putter: promise, +// putter: anyframe, // inotify_fd: i32, // wd_table: WdTable, // table_lock: event.Lock, @@ -776,7 +737,7 @@ pub const WatchEventId = enum { // }; // }; // -// const FileToHandle = std.AutoHashMap([]const u8, promise); +// const FileToHandle = std.AutoHashMap([]const u8, anyframe); // // const Self = @This(); // @@ -811,7 +772,7 @@ pub const WatchEventId = enum { // .table_lock = event.Lock.init(loop), // .dir_table = OsData.DirTable.init(loop.allocator), // .ref_count = std.atomic.Int(usize).init(1), -// .all_putters = std.atomic.Queue(promise).init(), +// .all_putters = std.atomic.Queue(anyframe).init(), // }, // }; // return self; @@ -926,14 +887,9 @@ pub const WatchEventId = enum { // } // // async fn kqPutEvents(self: *Self, close_op: *CloseOperation, value: V, out_put: **OsData.Put) void { -// // TODO https://github.com/ziglang/zig/issues/1194 -// suspend { -// resume @handle(); -// } -// // var value_copy = value; // var put = OsData.Put{ -// .putter = @handle(), +// .putter = @frame(), // .value_ptr = &value_copy, // }; // out_put.* = &put; @@ -1091,18 +1047,13 @@ pub const WatchEventId = enum { // } // // async fn windowsDirReader(self: *Self, dir_handle: windows.HANDLE, dir: *OsData.Dir) void { -// // TODO https://github.com/ziglang/zig/issues/1194 -// suspend { -// resume @handle(); -// } -// // self.ref(); // defer self.deref(); // // defer os.close(dir_handle); // -// var putter_node = std.atomic.Queue(promise).Node{ -// .data = @handle(), +// var putter_node = std.atomic.Queue(anyframe).Node{ +// .data = @frame(), // .prev = null, // .next = null, // }; @@ -1112,7 +1063,7 @@ pub const WatchEventId = enum { // var resume_node = Loop.ResumeNode.Basic{ // .base = Loop.ResumeNode{ // .id = Loop.ResumeNode.Id.Basic, -// .handle = @handle(), +// .handle = @frame(), // .overlapped = windows.OVERLAPPED{ // .Internal = 0, // .InternalHigh = 0, @@ -1207,17 +1158,12 @@ pub const WatchEventId = enum { // } // // async fn linuxEventPutter(inotify_fd: i32, channel: *event.Channel(Event.Error!Event), out_watch: **Self) void { -// // TODO https://github.com/ziglang/zig/issues/1194 -// suspend { -// resume @handle(); -// } -// // const loop = channel.loop; // // var watch = Self{ // .channel = channel, // .os_data = OsData{ -// .putter = @handle(), +// .putter = @frame(), // .inotify_fd = inotify_fd, // .wd_table = OsData.WdTable.init(loop.allocator), // .table_lock = event.Lock.init(loop), diff --git a/std/event/future.zig b/std/event/future.zig index 2e62ace97..11a4c82fb 100644 --- a/std/event/future.zig +++ b/std/event/future.zig @@ -2,8 +2,6 @@ const std = @import("../std.zig"); const assert = std.debug.assert; const testing = std.testing; const builtin = @import("builtin"); -const AtomicRmwOp = builtin.AtomicRmwOp; -const AtomicOrder = builtin.AtomicOrder; const Lock = std.event.Lock; const Loop = std.event.Loop; @@ -23,7 +21,7 @@ pub fn Future(comptime T: type) type { available: u8, const Self = @This(); - const Queue = std.atomic.Queue(promise); + const Queue = std.atomic.Queue(anyframe); pub fn init(loop: *Loop) Self { return Self{ @@ -37,10 +35,10 @@ pub fn Future(comptime T: type) type { /// available. /// Thread-safe. pub async fn get(self: *Self) *T { - if (@atomicLoad(u8, &self.available, AtomicOrder.SeqCst) == 2) { + if (@atomicLoad(u8, &self.available, .SeqCst) == 2) { return &self.data; } - const held = await (async self.lock.acquire() catch unreachable); + const held = self.lock.acquire(); held.release(); return &self.data; @@ -49,7 +47,7 @@ pub fn Future(comptime T: type) type { /// Gets the data without waiting for it. If it's available, a pointer is /// returned. Otherwise, null is returned. pub fn getOrNull(self: *Self) ?*T { - if (@atomicLoad(u8, &self.available, AtomicOrder.SeqCst) == 2) { + if (@atomicLoad(u8, &self.available, .SeqCst) == 2) { return &self.data; } else { return null; @@ -62,10 +60,10 @@ pub fn Future(comptime T: type) type { /// It's not required to call start() before resolve() but it can be useful since /// this method is thread-safe. pub async fn start(self: *Self) ?*T { - const state = @cmpxchgStrong(u8, &self.available, 0, 1, AtomicOrder.SeqCst, AtomicOrder.SeqCst) orelse return null; + const state = @cmpxchgStrong(u8, &self.available, 0, 1, .SeqCst, .SeqCst) orelse return null; switch (state) { 1 => { - const held = await (async self.lock.acquire() catch unreachable); + const held = self.lock.acquire(); held.release(); return &self.data; }, @@ -77,7 +75,7 @@ pub fn Future(comptime T: type) type { /// Make the data become available. May be called only once. /// Before calling this, modify the `data` property. pub fn resolve(self: *Self) void { - const prev = @atomicRmw(u8, &self.available, AtomicRmwOp.Xchg, 2, AtomicOrder.SeqCst); + const prev = @atomicRmw(u8, &self.available, .Xchg, 2, .SeqCst); assert(prev == 0 or prev == 1); // resolve() called twice Lock.Held.release(Lock.Held{ .lock = &self.lock }); } @@ -86,7 +84,7 @@ pub fn Future(comptime T: type) type { test "std.event.Future" { // https://github.com/ziglang/zig/issues/1908 - if (builtin.single_threaded or builtin.os != builtin.Os.linux) return error.SkipZigTest; + if (builtin.single_threaded) return error.SkipZigTest; const allocator = std.heap.direct_allocator; @@ -94,38 +92,33 @@ test "std.event.Future" { try loop.initMultiThreaded(allocator); defer loop.deinit(); - const handle = try async testFuture(&loop); - defer cancel handle; + const handle = async testFuture(&loop); loop.run(); } async fn testFuture(loop: *Loop) void { - suspend { - resume @handle(); - } var future = Future(i32).init(loop); - const a = async waitOnFuture(&future) catch @panic("memory"); - const b = async waitOnFuture(&future) catch @panic("memory"); - const c = async resolveFuture(&future) catch @panic("memory"); + const a = async waitOnFuture(&future); + const b = async waitOnFuture(&future); + const c = async resolveFuture(&future); + + // TODO make this work: + //const result = (await a) + (await b); + const a_result = await a; + const b_result = await b; + const result = a_result + b_result; - const result = (await a) + (await b); cancel c; testing.expect(result == 12); } async fn waitOnFuture(future: *Future(i32)) i32 { - suspend { - resume @handle(); - } - return (await (async future.get() catch @panic("memory"))).*; + return future.get().*; } async fn resolveFuture(future: *Future(i32)) void { - suspend { - resume @handle(); - } future.data = 6; future.resolve(); } diff --git a/std/event/group.zig b/std/event/group.zig index 36235eed7..ab6d59227 100644 --- a/std/event/group.zig +++ b/std/event/group.zig @@ -2,8 +2,6 @@ const std = @import("../std.zig"); const builtin = @import("builtin"); const Lock = std.event.Lock; const Loop = std.event.Loop; -const AtomicRmwOp = builtin.AtomicRmwOp; -const AtomicOrder = builtin.AtomicOrder; const testing = std.testing; /// ReturnType must be `void` or `E!void` @@ -16,10 +14,10 @@ pub fn Group(comptime ReturnType: type) type { const Self = @This(); const Error = switch (@typeInfo(ReturnType)) { - builtin.TypeId.ErrorUnion => |payload| payload.error_set, + .ErrorUnion => |payload| payload.error_set, else => void, }; - const Stack = std.atomic.Stack(promise->ReturnType); + const Stack = std.atomic.Stack(anyframe->ReturnType); pub fn init(loop: *Loop) Self { return Self{ @@ -29,7 +27,7 @@ pub fn Group(comptime ReturnType: type) type { }; } - /// Cancel all the outstanding promises. Can be called even if wait was already called. + /// Cancel all the outstanding frames. Can be called even if wait was already called. pub fn deinit(self: *Self) void { while (self.coro_stack.pop()) |node| { cancel node.data; @@ -40,8 +38,8 @@ pub fn Group(comptime ReturnType: type) type { } } - /// Add a promise to the group. Thread-safe. - pub fn add(self: *Self, handle: promise->ReturnType) (error{OutOfMemory}!void) { + /// Add a frame to the group. Thread-safe. + pub fn add(self: *Self, handle: anyframe->ReturnType) (error{OutOfMemory}!void) { const node = try self.lock.loop.allocator.create(Stack.Node); node.* = Stack.Node{ .next = undefined, @@ -51,7 +49,7 @@ pub fn Group(comptime ReturnType: type) type { } /// Add a node to the group. Thread-safe. Cannot fail. - /// `node.data` should be the promise handle to add to the group. + /// `node.data` should be the frame handle to add to the group. /// The node's memory should be in the coroutine frame of /// the handle that is in the node, or somewhere guaranteed to live /// at least as long. @@ -59,40 +57,11 @@ pub fn Group(comptime ReturnType: type) type { self.coro_stack.push(node); } - /// This is equivalent to an async call, but the async function is added to the group, instead - /// of returning a promise. func must be async and have return type ReturnType. - /// Thread-safe. - pub fn call(self: *Self, comptime func: var, args: ...) (error{OutOfMemory}!void) { - const S = struct { - async fn asyncFunc(node: **Stack.Node, args2: ...) ReturnType { - // TODO this is a hack to make the memory following be inside the coro frame - suspend { - var my_node: Stack.Node = undefined; - node.* = &my_node; - resume @handle(); - } - - // TODO this allocation elision should be guaranteed because we await it in - // this coro frame - return await (async func(args2) catch unreachable); - } - }; - var node: *Stack.Node = undefined; - const handle = try async S.asyncFunc(&node, args); - node.* = Stack.Node{ - .next = undefined, - .data = handle, - }; - self.coro_stack.push(node); - } - /// Wait for all the calls and promises of the group to complete. /// Thread-safe. /// Safe to call any number of times. pub async fn wait(self: *Self) ReturnType { - // TODO catch unreachable because the allocation can be grouped with - // the coro frame allocation - const held = await (async self.lock.acquire() catch unreachable); + const held = self.lock.acquire(); defer held.release(); while (self.coro_stack.pop()) |node| { @@ -131,8 +100,7 @@ test "std.event.Group" { try loop.initMultiThreaded(allocator); defer loop.deinit(); - const handle = try async testGroup(&loop); - defer cancel handle; + const handle = async testGroup(&loop); loop.run(); } @@ -140,26 +108,30 @@ test "std.event.Group" { async fn testGroup(loop: *Loop) void { var count: usize = 0; var group = Group(void).init(loop); - group.add(async sleepALittle(&count) catch @panic("memory")) catch @panic("memory"); - group.call(increaseByTen, &count) catch @panic("memory"); - await (async group.wait() catch @panic("memory")); + var sleep_a_little_frame = async sleepALittle(&count); + group.add(&sleep_a_little_frame) catch @panic("memory"); + var increase_by_ten_frame = async increaseByTen(&count); + group.add(&increase_by_ten_frame) catch @panic("memory"); + group.wait(); testing.expect(count == 11); var another = Group(anyerror!void).init(loop); - another.add(async somethingElse() catch @panic("memory")) catch @panic("memory"); - another.call(doSomethingThatFails) catch @panic("memory"); - testing.expectError(error.ItBroke, await (async another.wait() catch @panic("memory"))); + var something_else_frame = async somethingElse(); + another.add(&something_else_frame) catch @panic("memory"); + var something_that_fails_frame = async doSomethingThatFails(); + another.add(&something_that_fails_frame) catch @panic("memory"); + testing.expectError(error.ItBroke, another.wait()); } async fn sleepALittle(count: *usize) void { std.time.sleep(1 * std.time.millisecond); - _ = @atomicRmw(usize, count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); + _ = @atomicRmw(usize, count, .Add, 1, .SeqCst); } async fn increaseByTen(count: *usize) void { var i: usize = 0; while (i < 10) : (i += 1) { - _ = @atomicRmw(usize, count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); + _ = @atomicRmw(usize, count, .Add, 1, .SeqCst); } } diff --git a/std/event/io.zig b/std/event/io.zig index 29419a792..4b54822e6 100644 --- a/std/event/io.zig +++ b/std/event/io.zig @@ -1,6 +1,5 @@ const std = @import("../std.zig"); const builtin = @import("builtin"); -const Allocator = std.mem.Allocator; const assert = std.debug.assert; const mem = std.mem; @@ -12,13 +11,13 @@ pub fn InStream(comptime ReadError: type) type { /// Return the number of bytes read. It may be less than buffer.len. /// If the number of bytes read is 0, it means end of stream. /// End of stream is not an error condition. - readFn: async<*Allocator> fn (self: *Self, buffer: []u8) Error!usize, + readFn: async fn (self: *Self, buffer: []u8) Error!usize, /// Return the number of bytes read. It may be less than buffer.len. /// If the number of bytes read is 0, it means end of stream. /// End of stream is not an error condition. pub async fn read(self: *Self, buffer: []u8) !usize { - return await (async self.readFn(self, buffer) catch unreachable); + return self.readFn(self, buffer); } /// Return the number of bytes read. If it is less than buffer.len @@ -26,7 +25,7 @@ pub fn InStream(comptime ReadError: type) type { pub async fn readFull(self: *Self, buffer: []u8) !usize { var index: usize = 0; while (index != buf.len) { - const amt_read = try await (async self.read(buf[index..]) catch unreachable); + const amt_read = try self.read(buf[index..]); if (amt_read == 0) return index; index += amt_read; } @@ -35,25 +34,25 @@ pub fn InStream(comptime ReadError: type) type { /// Same as `readFull` but end of stream returns `error.EndOfStream`. pub async fn readNoEof(self: *Self, buf: []u8) !void { - const amt_read = try await (async self.readFull(buf[index..]) catch unreachable); + const amt_read = try self.readFull(buf[index..]); if (amt_read < buf.len) return error.EndOfStream; } pub async fn readIntLittle(self: *Self, comptime T: type) !T { var bytes: [@sizeOf(T)]u8 = undefined; - try await (async self.readNoEof(bytes[0..]) catch unreachable); + try self.readNoEof(bytes[0..]); return mem.readIntLittle(T, &bytes); } pub async fn readIntBe(self: *Self, comptime T: type) !T { var bytes: [@sizeOf(T)]u8 = undefined; - try await (async self.readNoEof(bytes[0..]) catch unreachable); + try self.readNoEof(bytes[0..]); return mem.readIntBig(T, &bytes); } pub async fn readInt(self: *Self, comptime T: type, endian: builtin.Endian) !T { var bytes: [@sizeOf(T)]u8 = undefined; - try await (async self.readNoEof(bytes[0..]) catch unreachable); + try self.readNoEof(bytes[0..]); return mem.readInt(T, &bytes, endian); } @@ -61,7 +60,7 @@ pub fn InStream(comptime ReadError: type) type { // Only extern and packed structs have defined in-memory layout. comptime assert(@typeInfo(T).Struct.layout != builtin.TypeInfo.ContainerLayout.Auto); var res: [1]T = undefined; - try await (async self.readNoEof(@sliceToBytes(res[0..])) catch unreachable); + try self.readNoEof(@sliceToBytes(res[0..])); return res[0]; } }; @@ -72,6 +71,6 @@ pub fn OutStream(comptime WriteError: type) type { const Self = @This(); pub const Error = WriteError; - writeFn: async<*Allocator> fn (self: *Self, buffer: []u8) Error!void, + writeFn: async fn (self: *Self, buffer: []u8) Error!void, }; } diff --git a/std/event/lock.zig b/std/event/lock.zig index d86902cc0..8f2dac008 100644 --- a/std/event/lock.zig +++ b/std/event/lock.zig @@ -3,8 +3,6 @@ const builtin = @import("builtin"); const assert = std.debug.assert; const testing = std.testing; const mem = std.mem; -const AtomicRmwOp = builtin.AtomicRmwOp; -const AtomicOrder = builtin.AtomicOrder; const Loop = std.event.Loop; /// Thread-safe async/await lock. @@ -17,7 +15,7 @@ pub const Lock = struct { queue: Queue, queue_empty_bit: u8, // TODO make this a bool - const Queue = std.atomic.Queue(promise); + const Queue = std.atomic.Queue(anyframe); pub const Held = struct { lock: *Lock, @@ -30,19 +28,19 @@ pub const Lock = struct { } // We need to release the lock. - _ = @atomicRmw(u8, &self.lock.queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); - _ = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + _ = @atomicRmw(u8, &self.lock.queue_empty_bit, .Xchg, 1, .SeqCst); + _ = @atomicRmw(u8, &self.lock.shared_bit, .Xchg, 0, .SeqCst); // There might be a queue item. If we know the queue is empty, we can be done, // because the other actor will try to obtain the lock. // But if there's a queue item, we are the actor which must loop and attempt // to grab the lock again. - if (@atomicLoad(u8, &self.lock.queue_empty_bit, AtomicOrder.SeqCst) == 1) { + if (@atomicLoad(u8, &self.lock.queue_empty_bit, .SeqCst) == 1) { return; } while (true) { - const old_bit = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); + const old_bit = @atomicRmw(u8, &self.lock.shared_bit, .Xchg, 1, .SeqCst); if (old_bit != 0) { // We did not obtain the lock. Great, the queue is someone else's problem. return; @@ -55,11 +53,11 @@ pub const Lock = struct { } // Release the lock again. - _ = @atomicRmw(u8, &self.lock.queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); - _ = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + _ = @atomicRmw(u8, &self.lock.queue_empty_bit, .Xchg, 1, .SeqCst); + _ = @atomicRmw(u8, &self.lock.shared_bit, .Xchg, 0, .SeqCst); // Find out if we can be done. - if (@atomicLoad(u8, &self.lock.queue_empty_bit, AtomicOrder.SeqCst) == 1) { + if (@atomicLoad(u8, &self.lock.queue_empty_bit, .SeqCst) == 1) { return; } } @@ -88,15 +86,11 @@ pub const Lock = struct { /// All calls to acquire() and release() must complete before calling deinit(). pub fn deinit(self: *Lock) void { assert(self.shared_bit == 0); - while (self.queue.get()) |node| cancel node.data; + while (self.queue.get()) |node| resume node.data; } pub async fn acquire(self: *Lock) Held { - // TODO explicitly put this memory in the coroutine frame #1194 - suspend { - resume @handle(); - } - var my_tick_node = Loop.NextTickNode.init(@handle()); + var my_tick_node = Loop.NextTickNode.init(@frame()); errdefer _ = self.queue.remove(&my_tick_node); // TODO test canceling an acquire suspend { @@ -107,9 +101,9 @@ pub const Lock = struct { // We set this bit so that later we can rely on the fact, that if queue_empty_bit is 1, some actor // will attempt to grab the lock. - _ = @atomicRmw(u8, &self.queue_empty_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + _ = @atomicRmw(u8, &self.queue_empty_bit, .Xchg, 0, .SeqCst); - const old_bit = @atomicRmw(u8, &self.shared_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); + const old_bit = @atomicRmw(u8, &self.shared_bit, .Xchg, 1, .SeqCst); if (old_bit == 0) { if (self.queue.get()) |node| { // Whether this node is us or someone else, we tail resume it. @@ -123,8 +117,7 @@ pub const Lock = struct { }; test "std.event.Lock" { - // TODO https://github.com/ziglang/zig/issues/2377 - if (true) return error.SkipZigTest; + // TODO https://github.com/ziglang/zig/issues/1908 if (builtin.single_threaded) return error.SkipZigTest; const allocator = std.heap.direct_allocator; @@ -136,39 +129,34 @@ test "std.event.Lock" { var lock = Lock.init(&loop); defer lock.deinit(); - const handle = try async testLock(&loop, &lock); - defer cancel handle; + _ = async testLock(&loop, &lock); loop.run(); testing.expectEqualSlices(i32, [1]i32{3 * @intCast(i32, shared_test_data.len)} ** shared_test_data.len, shared_test_data); } async fn testLock(loop: *Loop, lock: *Lock) void { - // TODO explicitly put next tick node memory in the coroutine frame #1194 - suspend { - resume @handle(); - } - const handle1 = async lockRunner(lock) catch @panic("out of memory"); + const handle1 = async lockRunner(lock); var tick_node1 = Loop.NextTickNode{ .prev = undefined, .next = undefined, - .data = handle1, + .data = &handle1, }; loop.onNextTick(&tick_node1); - const handle2 = async lockRunner(lock) catch @panic("out of memory"); + const handle2 = async lockRunner(lock); var tick_node2 = Loop.NextTickNode{ .prev = undefined, .next = undefined, - .data = handle2, + .data = &handle2, }; loop.onNextTick(&tick_node2); - const handle3 = async lockRunner(lock) catch @panic("out of memory"); + const handle3 = async lockRunner(lock); var tick_node3 = Loop.NextTickNode{ .prev = undefined, .next = undefined, - .data = handle3, + .data = &handle3, }; loop.onNextTick(&tick_node3); @@ -185,7 +173,7 @@ async fn lockRunner(lock: *Lock) void { var i: usize = 0; while (i < shared_test_data.len) : (i += 1) { - const lock_promise = async lock.acquire() catch @panic("out of memory"); + const lock_promise = async lock.acquire(); const handle = await lock_promise; defer handle.release(); diff --git a/std/event/loop.zig b/std/event/loop.zig index a4a60b509..f1febd3fd 100644 --- a/std/event/loop.zig +++ b/std/event/loop.zig @@ -457,7 +457,7 @@ pub const Loop = struct { var resume_node = ResumeNode.Basic{ .base = ResumeNode{ .id = ResumeNode.Id.Basic, - .handle = @handle(), + .handle = @frame(), .overlapped = ResumeNode.overlapped_init, }, }; @@ -469,7 +469,7 @@ pub const Loop = struct { var resume_node = ResumeNode.Basic{ .base = ResumeNode{ .id = ResumeNode.Id.Basic, - .handle = @handle(), + .handle = @frame(), .overlapped = ResumeNode.overlapped_init, }, .kev = undefined, diff --git a/std/event/net.zig b/std/event/net.zig index 46b724e32..3752c88e9 100644 --- a/std/event/net.zig +++ b/std/event/net.zig @@ -9,17 +9,17 @@ const File = std.fs.File; const fd_t = os.fd_t; pub const Server = struct { - handleRequestFn: async<*mem.Allocator> fn (*Server, *const std.net.Address, File) void, + handleRequestFn: async fn (*Server, *const std.net.Address, File) void, loop: *Loop, sockfd: ?i32, - accept_coro: ?promise, + accept_coro: ?anyframe, listen_address: std.net.Address, waiting_for_emfile_node: PromiseNode, listen_resume_node: event.Loop.ResumeNode, - const PromiseNode = std.TailQueue(promise).Node; + const PromiseNode = std.TailQueue(anyframe).Node; pub fn init(loop: *Loop) Server { // TODO can't initialize handler coroutine here because we need well defined copy elision @@ -41,7 +41,7 @@ pub const Server = struct { pub fn listen( self: *Server, address: *const std.net.Address, - handleRequestFn: async<*mem.Allocator> fn (*Server, *const std.net.Address, File) void, + handleRequestFn: async fn (*Server, *const std.net.Address, File) void, ) !void { self.handleRequestFn = handleRequestFn; @@ -53,7 +53,7 @@ pub const Server = struct { try os.listen(sockfd, os.SOMAXCONN); self.listen_address = std.net.Address.initPosix(try os.getsockname(sockfd)); - self.accept_coro = try async Server.handler(self); + self.accept_coro = async Server.handler(self); errdefer cancel self.accept_coro.?; self.listen_resume_node.handle = self.accept_coro.?; @@ -86,12 +86,7 @@ pub const Server = struct { continue; } var socket = File.openHandle(accepted_fd); - _ = async self.handleRequestFn(self, &accepted_addr, socket) catch |err| switch (err) { - error.OutOfMemory => { - socket.close(); - continue; - }, - }; + self.handleRequestFn(self, &accepted_addr, socket); } else |err| switch (err) { error.ProcessFdQuotaExceeded => @panic("TODO handle this error"), error.ConnectionAborted => continue, @@ -124,7 +119,7 @@ pub async fn connectUnixSocket(loop: *Loop, path: []const u8) !i32 { mem.copy(u8, sock_addr.path[0..], path); const size = @intCast(u32, @sizeOf(os.sa_family_t) + path.len); try os.connect_async(sockfd, &sock_addr, size); - try await try async loop.linuxWaitFd(sockfd, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET); + try loop.linuxWaitFd(sockfd, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET); try os.getsockoptError(sockfd); return sockfd; @@ -149,7 +144,7 @@ pub async fn read(loop: *std.event.Loop, fd: fd_t, buffer: []u8) ReadError!usize .iov_len = buffer.len, }; const iovs: *const [1]os.iovec = &iov; - return await (async readvPosix(loop, fd, iovs, 1) catch unreachable); + return readvPosix(loop, fd, iovs, 1); } pub const WriteError = error{}; @@ -160,7 +155,7 @@ pub async fn write(loop: *std.event.Loop, fd: fd_t, buffer: []const u8) WriteErr .iov_len = buffer.len, }; const iovs: *const [1]os.iovec_const = &iov; - return await (async writevPosix(loop, fd, iovs, 1) catch unreachable); + return writevPosix(loop, fd, iovs, 1); } pub async fn writevPosix(loop: *Loop, fd: i32, iov: [*]const os.iovec_const, count: usize) !void { @@ -174,7 +169,7 @@ pub async fn writevPosix(loop: *Loop, fd: i32, iov: [*]const os.iovec_const, cou os.EINVAL => unreachable, os.EFAULT => unreachable, os.EAGAIN => { - try await (async loop.linuxWaitFd(fd, os.EPOLLET | os.EPOLLOUT) catch unreachable); + try loop.linuxWaitFd(fd, os.EPOLLET | os.EPOLLOUT); continue; }, os.EBADF => unreachable, // always a race condition @@ -205,7 +200,7 @@ pub async fn readvPosix(loop: *std.event.Loop, fd: i32, iov: [*]os.iovec, count: os.EINVAL => unreachable, os.EFAULT => unreachable, os.EAGAIN => { - try await (async loop.linuxWaitFd(fd, os.EPOLLET | os.EPOLLIN) catch unreachable); + try loop.linuxWaitFd(fd, os.EPOLLET | os.EPOLLIN); continue; }, os.EBADF => unreachable, // always a race condition @@ -232,7 +227,7 @@ pub async fn writev(loop: *Loop, fd: fd_t, data: []const []const u8) !void { }; } - return await (async writevPosix(loop, fd, iovecs.ptr, data.len) catch unreachable); + return writevPosix(loop, fd, iovecs.ptr, data.len); } pub async fn readv(loop: *Loop, fd: fd_t, data: []const []u8) !usize { @@ -246,7 +241,7 @@ pub async fn readv(loop: *Loop, fd: fd_t, data: []const []u8) !usize { }; } - return await (async readvPosix(loop, fd, iovecs.ptr, data.len) catch unreachable); + return readvPosix(loop, fd, iovecs.ptr, data.len); } pub async fn connect(loop: *Loop, _address: *const std.net.Address) !File { @@ -256,7 +251,7 @@ pub async fn connect(loop: *Loop, _address: *const std.net.Address) !File { errdefer os.close(sockfd); try os.connect_async(sockfd, &address.os_addr, @sizeOf(os.sockaddr_in)); - try await try async loop.linuxWaitFd(sockfd, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET); + try loop.linuxWaitFd(sockfd, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET); try os.getsockoptError(sockfd); return File.openHandle(sockfd); @@ -275,17 +270,16 @@ test "listen on a port, send bytes, receive bytes" { tcp_server: Server, const Self = @This(); - async<*mem.Allocator> fn handler(tcp_server: *Server, _addr: *const std.net.Address, _socket: File) void { + async fn handler(tcp_server: *Server, _addr: *const std.net.Address, _socket: File) void { const self = @fieldParentPtr(Self, "tcp_server", tcp_server); var socket = _socket; // TODO https://github.com/ziglang/zig/issues/1592 defer socket.close(); // TODO guarantee elision of this allocation - const next_handler = async errorableHandler(self, _addr, socket) catch unreachable; - (await next_handler) catch |err| { + const next_handler = errorableHandler(self, _addr, socket) catch |err| { std.debug.panic("unable to handle connection: {}\n", err); }; suspend { - cancel @handle(); + cancel @frame(); } } async fn errorableHandler(self: *Self, _addr: *const std.net.Address, _socket: File) !void { @@ -306,15 +300,14 @@ test "listen on a port, send bytes, receive bytes" { defer server.tcp_server.deinit(); try server.tcp_server.listen(&addr, MyServer.handler); - const p = try async doAsyncTest(&loop, &server.tcp_server.listen_address, &server.tcp_server); - defer cancel p; + _ = async doAsyncTest(&loop, &server.tcp_server.listen_address, &server.tcp_server); loop.run(); } async fn doAsyncTest(loop: *Loop, address: *const std.net.Address, server: *Server) void { errdefer @panic("test failure"); - var socket_file = try await try async connect(loop, address); + var socket_file = try connect(loop, address); defer socket_file.close(); var buf: [512]u8 = undefined; @@ -340,9 +333,9 @@ pub const OutStream = struct { }; } - async<*mem.Allocator> fn writeFn(out_stream: *Stream, bytes: []const u8) Error!void { + async fn writeFn(out_stream: *Stream, bytes: []const u8) Error!void { const self = @fieldParentPtr(OutStream, "stream", out_stream); - return await (async write(self.loop, self.fd, bytes) catch unreachable); + return write(self.loop, self.fd, bytes); } }; @@ -362,8 +355,8 @@ pub const InStream = struct { }; } - async<*mem.Allocator> fn readFn(in_stream: *Stream, bytes: []u8) Error!usize { + async fn readFn(in_stream: *Stream, bytes: []u8) Error!usize { const self = @fieldParentPtr(InStream, "stream", in_stream); - return await (async read(self.loop, self.fd, bytes) catch unreachable); + return read(self.loop, self.fd, bytes); } }; diff --git a/std/event/rwlock.zig b/std/event/rwlock.zig index 7b97fa24c..a5768e5b6 100644 --- a/std/event/rwlock.zig +++ b/std/event/rwlock.zig @@ -3,8 +3,6 @@ const builtin = @import("builtin"); const assert = std.debug.assert; const testing = std.testing; const mem = std.mem; -const AtomicRmwOp = builtin.AtomicRmwOp; -const AtomicOrder = builtin.AtomicOrder; const Loop = std.event.Loop; /// Thread-safe async/await lock. @@ -28,19 +26,19 @@ pub const RwLock = struct { const ReadLock = 2; }; - const Queue = std.atomic.Queue(promise); + const Queue = std.atomic.Queue(anyframe); pub const HeldRead = struct { lock: *RwLock, pub fn release(self: HeldRead) void { // If other readers still hold the lock, we're done. - if (@atomicRmw(usize, &self.lock.reader_lock_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst) != 1) { + if (@atomicRmw(usize, &self.lock.reader_lock_count, .Sub, 1, .SeqCst) != 1) { return; } - _ = @atomicRmw(u8, &self.lock.reader_queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); - if (@cmpxchgStrong(u8, &self.lock.shared_state, State.ReadLock, State.Unlocked, AtomicOrder.SeqCst, AtomicOrder.SeqCst) != null) { + _ = @atomicRmw(u8, &self.lock.reader_queue_empty_bit, .Xchg, 1, .SeqCst); + if (@cmpxchgStrong(u8, &self.lock.shared_state, State.ReadLock, State.Unlocked, .SeqCst, .SeqCst) != null) { // Didn't unlock. Someone else's problem. return; } @@ -61,17 +59,17 @@ pub const RwLock = struct { } // We need to release the write lock. Check if any readers are waiting to grab the lock. - if (@atomicLoad(u8, &self.lock.reader_queue_empty_bit, AtomicOrder.SeqCst) == 0) { + if (@atomicLoad(u8, &self.lock.reader_queue_empty_bit, .SeqCst) == 0) { // Switch to a read lock. - _ = @atomicRmw(u8, &self.lock.shared_state, AtomicRmwOp.Xchg, State.ReadLock, AtomicOrder.SeqCst); + _ = @atomicRmw(u8, &self.lock.shared_state, .Xchg, State.ReadLock, .SeqCst); while (self.lock.reader_queue.get()) |node| { self.lock.loop.onNextTick(node); } return; } - _ = @atomicRmw(u8, &self.lock.writer_queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); - _ = @atomicRmw(u8, &self.lock.shared_state, AtomicRmwOp.Xchg, State.Unlocked, AtomicOrder.SeqCst); + _ = @atomicRmw(u8, &self.lock.writer_queue_empty_bit, .Xchg, 1, .SeqCst); + _ = @atomicRmw(u8, &self.lock.shared_state, .Xchg, State.Unlocked, .SeqCst); self.lock.commonPostUnlock(); } @@ -93,17 +91,16 @@ pub const RwLock = struct { /// All calls to acquire() and release() must complete before calling deinit(). pub fn deinit(self: *RwLock) void { assert(self.shared_state == State.Unlocked); - while (self.writer_queue.get()) |node| cancel node.data; - while (self.reader_queue.get()) |node| cancel node.data; + while (self.writer_queue.get()) |node| resume node.data; + while (self.reader_queue.get()) |node| resume node.data; } pub async fn acquireRead(self: *RwLock) HeldRead { - _ = @atomicRmw(usize, &self.reader_lock_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); + _ = @atomicRmw(usize, &self.reader_lock_count, .Add, 1, .SeqCst); suspend { - // TODO explicitly put this memory in the coroutine frame #1194 var my_tick_node = Loop.NextTickNode{ - .data = @handle(), + .data = @frame(), .prev = undefined, .next = undefined, }; @@ -115,10 +112,10 @@ pub const RwLock = struct { // We set this bit so that later we can rely on the fact, that if reader_queue_empty_bit is 1, // some actor will attempt to grab the lock. - _ = @atomicRmw(u8, &self.reader_queue_empty_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + _ = @atomicRmw(u8, &self.reader_queue_empty_bit, .Xchg, 0, .SeqCst); // Here we don't care if we are the one to do the locking or if it was already locked for reading. - const have_read_lock = if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.ReadLock, AtomicOrder.SeqCst, AtomicOrder.SeqCst)) |old_state| old_state == State.ReadLock else true; + const have_read_lock = if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.ReadLock, .SeqCst, .SeqCst)) |old_state| old_state == State.ReadLock else true; if (have_read_lock) { // Give out all the read locks. if (self.reader_queue.get()) |first_node| { @@ -134,9 +131,8 @@ pub const RwLock = struct { pub async fn acquireWrite(self: *RwLock) HeldWrite { suspend { - // TODO explicitly put this memory in the coroutine frame #1194 var my_tick_node = Loop.NextTickNode{ - .data = @handle(), + .data = @frame(), .prev = undefined, .next = undefined, }; @@ -148,10 +144,10 @@ pub const RwLock = struct { // We set this bit so that later we can rely on the fact, that if writer_queue_empty_bit is 1, // some actor will attempt to grab the lock. - _ = @atomicRmw(u8, &self.writer_queue_empty_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + _ = @atomicRmw(u8, &self.writer_queue_empty_bit, .Xchg, 0, .SeqCst); // Here we must be the one to acquire the write lock. It cannot already be locked. - if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.WriteLock, AtomicOrder.SeqCst, AtomicOrder.SeqCst) == null) { + if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.WriteLock, .SeqCst, .SeqCst) == null) { // We now have a write lock. if (self.writer_queue.get()) |node| { // Whether this node is us or someone else, we tail resume it. @@ -169,8 +165,8 @@ pub const RwLock = struct { // obtain the lock. // But if there's a writer_queue item or a reader_queue item, // we are the actor which must loop and attempt to grab the lock again. - if (@atomicLoad(u8, &self.writer_queue_empty_bit, AtomicOrder.SeqCst) == 0) { - if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.WriteLock, AtomicOrder.SeqCst, AtomicOrder.SeqCst) != null) { + if (@atomicLoad(u8, &self.writer_queue_empty_bit, .SeqCst) == 0) { + if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.WriteLock, .SeqCst, .SeqCst) != null) { // We did not obtain the lock. Great, the queues are someone else's problem. return; } @@ -180,13 +176,13 @@ pub const RwLock = struct { return; } // Release the lock again. - _ = @atomicRmw(u8, &self.writer_queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); - _ = @atomicRmw(u8, &self.shared_state, AtomicRmwOp.Xchg, State.Unlocked, AtomicOrder.SeqCst); + _ = @atomicRmw(u8, &self.writer_queue_empty_bit, .Xchg, 1, .SeqCst); + _ = @atomicRmw(u8, &self.shared_state, .Xchg, State.Unlocked, .SeqCst); continue; } - if (@atomicLoad(u8, &self.reader_queue_empty_bit, AtomicOrder.SeqCst) == 0) { - if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.ReadLock, AtomicOrder.SeqCst, AtomicOrder.SeqCst) != null) { + if (@atomicLoad(u8, &self.reader_queue_empty_bit, .SeqCst) == 0) { + if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.ReadLock, .SeqCst, .SeqCst) != null) { // We did not obtain the lock. Great, the queues are someone else's problem. return; } @@ -199,8 +195,8 @@ pub const RwLock = struct { return; } // Release the lock again. - _ = @atomicRmw(u8, &self.reader_queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); - if (@cmpxchgStrong(u8, &self.shared_state, State.ReadLock, State.Unlocked, AtomicOrder.SeqCst, AtomicOrder.SeqCst) != null) { + _ = @atomicRmw(u8, &self.reader_queue_empty_bit, .Xchg, 1, .SeqCst); + if (@cmpxchgStrong(u8, &self.shared_state, State.ReadLock, State.Unlocked, .SeqCst, .SeqCst) != null) { // Didn't unlock. Someone else's problem. return; } @@ -215,6 +211,9 @@ test "std.event.RwLock" { // https://github.com/ziglang/zig/issues/2377 if (true) return error.SkipZigTest; + // https://github.com/ziglang/zig/issues/1908 + if (builtin.single_threaded) return error.SkipZigTest; + const allocator = std.heap.direct_allocator; var loop: Loop = undefined; @@ -224,8 +223,7 @@ test "std.event.RwLock" { var lock = RwLock.init(&loop); defer lock.deinit(); - const handle = try async testLock(&loop, &lock); - defer cancel handle; + const handle = testLock(&loop, &lock); loop.run(); const expected_result = [1]i32{shared_it_count * @intCast(i32, shared_test_data.len)} ** shared_test_data.len; @@ -233,28 +231,31 @@ test "std.event.RwLock" { } async fn testLock(loop: *Loop, lock: *RwLock) void { - // TODO explicitly put next tick node memory in the coroutine frame #1194 - suspend { - resume @handle(); - } - var read_nodes: [100]Loop.NextTickNode = undefined; for (read_nodes) |*read_node| { - read_node.data = async readRunner(lock) catch @panic("out of memory"); + const frame = loop.allocator.create(@Frame(readRunner)) catch @panic("memory"); + read_node.data = frame; + frame.* = async readRunner(lock); loop.onNextTick(read_node); } var write_nodes: [shared_it_count]Loop.NextTickNode = undefined; for (write_nodes) |*write_node| { - write_node.data = async writeRunner(lock) catch @panic("out of memory"); + const frame = loop.allocator.create(@Frame(writeRunner)) catch @panic("memory"); + write_node.data = frame; + frame.* = async writeRunner(lock); loop.onNextTick(write_node); } for (write_nodes) |*write_node| { - await @ptrCast(promise->void, write_node.data); + const casted = @ptrCast(*const @Frame(writeRunner), write_node.data); + await casted; + loop.allocator.destroy(casted); } for (read_nodes) |*read_node| { - await @ptrCast(promise->void, read_node.data); + const casted = @ptrCast(*const @Frame(readRunner), read_node.data); + await casted; + loop.allocator.destroy(casted); } } @@ -269,7 +270,7 @@ async fn writeRunner(lock: *RwLock) void { var i: usize = 0; while (i < shared_test_data.len) : (i += 1) { std.time.sleep(100 * std.time.microsecond); - const lock_promise = async lock.acquireWrite() catch @panic("out of memory"); + const lock_promise = async lock.acquireWrite(); const handle = await lock_promise; defer handle.release(); @@ -287,7 +288,7 @@ async fn readRunner(lock: *RwLock) void { var i: usize = 0; while (i < shared_test_data.len) : (i += 1) { - const lock_promise = async lock.acquireRead() catch @panic("out of memory"); + const lock_promise = async lock.acquireRead(); const handle = await lock_promise; defer handle.release(); diff --git a/std/zig/parser_test.zig b/std/zig/parser_test.zig index 7407528bf..aec1ef96b 100644 --- a/std/zig/parser_test.zig +++ b/std/zig/parser_test.zig @@ -1183,7 +1183,7 @@ test "zig fmt: resume from suspend block" { try testCanonical( \\fn foo() void { \\ suspend { - \\ resume @handle(); + \\ resume @frame(); \\ } \\} \\ diff --git a/test/compile_errors.zig b/test/compile_errors.zig index 810e40b18..835f968e2 100644 --- a/test/compile_errors.zig +++ b/test/compile_errors.zig @@ -1403,24 +1403,14 @@ pub fn addCases(cases: *tests.CompileErrorContext) void { ); cases.add( - "@handle() called outside of function definition", - \\var handle_undef: promise = undefined; - \\var handle_dummy: promise = @handle(); + "@frame() called outside of function definition", + \\var handle_undef: anyframe = undefined; + \\var handle_dummy: anyframe = @frame(); \\export fn entry() bool { \\ return handle_undef == handle_dummy; \\} , - "tmp.zig:2:29: error: @handle() called outside of function definition", - ); - - cases.add( - "@handle() in non-async function", - \\export fn entry() bool { - \\ var handle_undef: promise = undefined; - \\ return handle_undef == @handle(); - \\} - , - "tmp.zig:3:28: error: @handle() in non-async function", + "tmp.zig:2:30: error: @frame() called outside of function definition", ); cases.add( @@ -1796,15 +1786,9 @@ pub fn addCases(cases: *tests.CompileErrorContext) void { cases.add( "suspend inside suspend block", - \\const std = @import("std",); - \\ \\export fn entry() void { - \\ var buf: [500]u8 = undefined; - \\ var a = &std.heap.FixedBufferAllocator.init(buf[0..]).allocator; - \\ const p = (async foo()) catch unreachable; - \\ cancel p; + \\ _ = async foo(); \\} - \\ \\async fn foo() void { \\ suspend { \\ suspend { @@ -1812,8 +1796,8 @@ pub fn addCases(cases: *tests.CompileErrorContext) void { \\ } \\} , - "tmp.zig:12:9: error: cannot suspend inside suspend block", - "tmp.zig:11:5: note: other suspend block here", + "tmp.zig:6:9: error: cannot suspend inside suspend block", + "tmp.zig:5:5: note: other suspend block here", ); cases.add( @@ -1854,15 +1838,14 @@ pub fn addCases(cases: *tests.CompileErrorContext) void { cases.add( "returning error from void async function", - \\const std = @import("std",); \\export fn entry() void { - \\ const p = async amain() catch unreachable; + \\ _ = async amain(); \\} \\async fn amain() void { \\ return error.ShouldBeCompileError; \\} , - "tmp.zig:6:17: error: expected type 'void', found 'error{ShouldBeCompileError}'", + "tmp.zig:5:17: error: expected type 'void', found 'error{ShouldBeCompileError}'", ); cases.add(