Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to safely exit a client with a thread blocking on read()? #46

Open
pfgithub opened this issue Sep 22, 2024 · 4 comments
Open

How to safely exit a client with a thread blocking on read()? #46

pfgithub opened this issue Sep 22, 2024 · 4 comments

Comments

@pfgithub
Copy link

This code would implement a program that lets you enter a message to send to the websocket server and sends it on enter. When you exit the program by pressing enter on windows, it just hangs and never closes the websocket connection. How do you close the connection and stop an active blocking read() call on another thread?

const std = @import("std");
const ws = @import("websocket");

const App = struct { client: ws.Client };

pub fn main() !void {
    var gpa_backing = std.heap.GeneralPurposeAllocator(.{}){};
    defer std.debug.assert(gpa_backing.deinit() == .ok);
    const gpa = gpa_backing.allocator();

    var app: App = .{
        .client = try ws.Client.init(gpa, .{
            .port = 9224,
            .host = "localhost",
        }),
    };
    defer app.client.deinit();

    try app.client.handshake("/ws", .{
        .timeout_ms = 10_000,
        .headers = "Host: localhost:9224", // separate multiple headers with \r\n
    });

    const recv_thread = try std.Thread.spawn(.{}, recvThread, .{&app});
    defer recv_thread.join();

    std.log.info("wsnc. enter to exit", .{});

    // wait for enter key to be pressed
    while (true) {
        const msg = try std.io.getStdIn().reader().readUntilDelimiterAlloc(gpa, '\n', std.math.maxInt(usize));
        defer gpa.free(msg);
        if (std.mem.eql(u8, msg, "") or std.mem.eql(u8, msg, "\r")) {
            // exit
            break;
        }
        std.log.info("sending \"{}\"...", .{std.zig.fmtEscapes(msg)});
        try app.client.writeBin(msg);
        std.log.info("-> sent", .{});
    }

    std.log.info("closing...", .{});
    try app.client.close(.{});
    std.log.info("-> closed", .{});
}

fn recvThread(self: *App) void {
    while (true) {
        const msg = (self.client.read() catch return) orelse unreachable;
        defer self.client.done(msg);

        std.log.info("received message: \"{}\"", .{std.zig.fmtEscapes(msg.data)});
    }
}
@karlseguin
Copy link
Owner

karlseguin commented Sep 22, 2024

This works fine on Mac and Linux, so I assume it's a windows issue, or an issue with Zig's window wrapper. Unfortunately, I can't test.

Can you try replacing the call to:

try app.client.close(.{});

with:

try std.posix.shutdown(app.client.stream.handle, .both); 
std.posix.close(app.client.stream.handle); 

@pfgithub
Copy link
Author

It hangs on std.posix.shutdown. I guess there's some kind of other problem because it also hangs on writeBin and setting writeTimeout has no effect.

It works fine as long as the recvThread isn't spawned (app.client.close() and std.posix.shutdown both work then), so maybe some problem with writing while there is an active read call on windows? readTimeout doesn't seem to work either so I can't see what would happen if trying to write while there isn't a read active.

Possibly a problem with zig's windows implementations of the posix fns

@karlseguin
Copy link
Owner

Would be nice if you could create a minimal reproducible example. I don't have access to windows, but I was thinking something like:

const std = @import("std");


pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    const allocator = gpa.allocator();

    var stream = try std.net.tcpConnectToHost(allocator, "127.0.0.1", 6000);
    const recv_thread = try std.Thread.spawn(.{}, recvThread, .{&stream});
    defer recv_thread.join();

    std.time.sleep(std.time.ns_per_s * 2);

    std.debug.print("closing\n", .{});
    stream.close();
    std.debug.print("closed\n", .{});
}

fn recvThread(stream: *std.net.Stream) void {
    while (true) {
        var buf: [64]u8 = undefined;
        const n = stream.read(&buf) catch |err| {
            std.debug.print("read err: {}", .{err});
            return;
        };
        if (n == 0) {
            return;
        }
        std.log.info("received: {any}\n", .{buf[0..n]});
    }
}

I believe this accurately reproduces your original code, although it's hard to tell for sure. Maybe the issue only manifests itself after a read already succeeds on a different thread (in your original code, the handshake call writes/reads from the socket)...so it might be necessary to try a few different things.

On Mac, after closing, the read returns an error error.NotOpenForReadingclosed.

@pfgithub
Copy link
Author

That code reproduces the issue on windows as long as there is a server running at that port for it to connect to. Although that code doesn't work on linux either, it hangs. The stream has to be shut down first. This reproduces the issue on windows and works as expected on linux:

const std = @import("std");

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    const allocator = gpa.allocator();

    var stream = try std.net.tcpConnectToHost(allocator, "127.0.0.1", 6000);
    const recv_thread = try std.Thread.spawn(.{}, recvThread, .{&stream});
    defer recv_thread.join();

    std.time.sleep(std.time.ns_per_s * 2);

    std.debug.print("closing\n", .{});
    try std.posix.shutdown(stream.handle, .both);
    stream.close();
    std.debug.print("closed\n", .{});
}

fn recvThread(stream: *std.net.Stream) void {
    while (true) {
        var buf: [64]u8 = undefined;
        const n = stream.read(&buf) catch |err| {
            std.debug.print("read err: {}", .{err});
            return;
        };
        if (n == 0) {
            return;
        }
        std.log.info("received: {any}\n", .{buf[0..n]});
    }
}

Server code that is enough to demonstrate the issue:

const std = @import("std");

pub fn main() !void {
    var addr = std.net.Address.initIp4(.{ 127, 0, 0, 1 }, 9224);
    var server = try addr.listen(.{});

    while (true) {
        std.log.info("waiting for connection", .{});
        const connection = try server.accept();
        std.log.info("client connected", .{});
        while (true) {
            var buf: [16]u8 = undefined;
            std.log.info("reading...", .{});
            const len = try connection.stream.read(&buf);
            std.log.info("read result: {d}", .{len});
            if (len == 0) break;
        }
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants