Skip to content

Commit

Permalink
Fix #14 Avoid blocking socket reads to infinitely block threads creat…
Browse files Browse the repository at this point in the history
…ed by Scalive in remote process
  • Loading branch information
ngocdaothanh committed Aug 6, 2016
1 parent 5ead107 commit 127f9f4
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 53 deletions.
36 changes: 36 additions & 0 deletions src/main/java/scalive/Net.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
package scalive;

import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;

public class Net {
// After this time, the REPL and completer connections should be closed,
// to avoid blocking socket reads to infinitely block threads created by Scalive in remote process
private static final int LONG_INACTIVITY = (int) TimeUnit.HOURS.toMillis(1);

public static final InetAddress LOCALHOST = getLocalHostAddress();

public static int getLocalFreePort() throws Exception {
Expand All @@ -14,6 +23,33 @@ public static int getLocalFreePort() throws Exception {
return port;
}

/**
* {@link SocketTimeoutException} will be thrown if there's no activity for a long time.
* This is to avoid blocking reads to block threads infinitely, causing leaks in the remote process.
*/
public static void throwSocketTimeoutExceptionForLongInactivity(Socket socket) throws SocketException {
socket.setSoTimeout(LONG_INACTIVITY);
}

/**
* Use socket closing as a way to notify/cleanup socket blocking read.
* The sockets are closed in the order they are given.
*/
public static Runnable getSocketCleaner(final Socket... sockets) {
return new Runnable() {
@Override
public void run() {
for (Socket socket : sockets) {
try {
socket.close();
} catch (IOException e) {
// Ignore
}
}
}
};
}

private static InetAddress getLocalHostAddress() {
try {
return InetAddress.getByAddress(new byte[] {127, 0, 0, 1});
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/scalive/client/AgentLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private static int loadAgent(String jarSearchDirs, String pid) throws Exception
final int port = Net.getLocalFreePort();

vm.loadAgent(agentJar, jarSearchDirs + " " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
Runtime.getRuntime().addShutdownHook(new Thread(AgentLoader.class.getName() + "-ShutdownHook") {
@Override
public void run() {
try {
Expand Down
14 changes: 4 additions & 10 deletions src/main/java/scalive/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import scalive.Log;
import scalive.Net;

import java.io.IOException;
import java.net.Socket;

class Client {
Expand All @@ -14,17 +13,12 @@ static void run(int port) throws Exception {
final Socket replSocket = new Socket(Net.LOCALHOST, port);
final Socket completerSocket = new Socket(Net.LOCALHOST, port);

// Try to notify the remote process to clean up when the client
// is suddenly terminated
Runtime.getRuntime().addShutdownHook(new Thread() {
// Try to notify the remote process to clean up when the client is terminated
final Runnable socketCleaner = Net.getSocketCleaner(replSocket, completerSocket);
Runtime.getRuntime().addShutdownHook(new Thread(Client.class.getName() + "-ShutdownHook") {
@Override
public void run() {
try {
replSocket.close();
completerSocket.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
socketCleaner.run();
}
});

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/scalive/client/Repl.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ static void run(Socket socket, final ConsoleReader reader) throws IOException {
final InputStream in = socket.getInputStream();
final OutputStream out = socket.getOutputStream();

new Thread(new Runnable() {
new Thread(Repl.class.getName() + "-printServerOutput") {
@Override
public void run() {
try {
Expand All @@ -23,7 +23,7 @@ public void run() {
throw new RuntimeException(e);
}
}
}).start();
}.start();

readLocalInput(reader, out);
}
Expand Down
7 changes: 2 additions & 5 deletions src/main/java/scalive/server/Agent.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package scalive.server;

import scalive.Log;

import java.io.File;
import java.io.IOException;
import java.lang.instrument.Instrumentation;
Expand Down Expand Up @@ -35,16 +33,15 @@ public static void agentmain(String agentArgs, Instrumentation inst) throws IOEx
// - The server is blocking for connections
// - VirtualMachine#loadAgent at the client does not return until this agentmain method returns
// - The client only connects to the server after VirtualMachine#loadAgent returns
new Thread(new Runnable() {
new Thread(Agent.class.getName() + "-Server") {
@Override
public void run() {
try {
Server.run(serverSocket, jarSearchDirs);
} catch (Exception e) {
throw new RuntimeException(e);
}
Log.log("Closed");
}
}).start();
}.start();
}
}
55 changes: 35 additions & 20 deletions src/main/java/scalive/server/Completer.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import scala.tools.nsc.interpreter.Completion.Candidates;

import scalive.Log;
import scalive.Net;

import java.io.BufferedReader;
import java.io.IOException;
Expand All @@ -18,38 +19,52 @@
* @see scalive.client.Completer
*/
class Completer {
static void run(Socket socket, ILoopWithCompletion iloop) throws IOException {
static void run(
Socket socket, ILoopWithCompletion iloop, Runnable socketCleaner
) throws IOException, InterruptedException {
InputStream in = socket.getInputStream();
OutputStream out = socket.getOutputStream();

BufferedReader reader = new BufferedReader(new InputStreamReader(in, "UTF-8"));

while (true) {
String line = reader.readLine();
if (line == null) break;
Net.throwSocketTimeoutExceptionForLongInactivity(socket);
try {
while (true) {
// See throwSocketTimeoutExceptionForLongInactivity above
String line = reader.readLine();

int idx = line.indexOf(" ");
String cursorString = line.substring(0, idx);
int cursor = Integer.parseInt(cursorString);
String buffer = line.substring(idx + 1);
// Socket closed
if (line == null) break;

Completion completion = getCompletion(iloop);
Candidates candidates = completion.completer().complete(buffer, cursor);
int idx = line.indexOf(" ");
String cursorString = line.substring(0, idx);
int cursor = Integer.parseInt(cursorString);
String buffer = line.substring(idx + 1);

out.write(("" + candidates.cursor()).getBytes("UTF-8"));
Completion completion = getCompletion(iloop);
Candidates candidates = completion.completer().complete(buffer, cursor);

List<String> list = candidates.candidates();
Iterator<String> it = list.iterator();
while (it.hasNext()) {
String candidate = it.next();
out.write(' ');
out.write(candidate.getBytes("UTF-8"));
}
out.write(("" + candidates.cursor()).getBytes("UTF-8"));

List<String> list = candidates.candidates();
Iterator<String> it = list.iterator();
while (it.hasNext()) {
String candidate = it.next();
out.write(' ');
out.write(candidate.getBytes("UTF-8"));
}

out.write('\n');
out.flush();
out.write('\n');
out.flush();
}
} catch (IOException e) {
// Socket closed
}

socketCleaner.run();

// Before logging this out, wait a litte for System.out to be restored back to the remote process
Thread.sleep(1000);
Log.log("Completer closed");
}

Expand Down
26 changes: 14 additions & 12 deletions src/main/java/scalive/server/Repl.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import scalive.Classpath;
import scalive.Log;
import scalive.Net;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -17,39 +18,40 @@

class Repl {
/** Creates a REPL and wire IO streams of the socket to it. */
static ILoopWithCompletion run(final Socket socket, URLClassLoader cl) throws IOException {
static ILoopWithCompletion run(
final Socket socket, URLClassLoader cl, final Runnable socketCleaner
) throws IOException {
final InputStream in = socket.getInputStream();
final OutputStream out = socket.getOutputStream();

final ILoopWithCompletion iloop = new ILoopWithCompletion(in, out);
final Settings settings = getSettings(cl);

new Thread(new Runnable() {
Net.throwSocketTimeoutExceptionForLongInactivity(socket);
new Thread(Repl.class.getName() + "-iloop") {
@Override
public void run() {
overrideScalaConsole(in, out, new Runnable() {
@Override
public void run() {
// This call does not return until socket is closed,
// or repl has been closed by the client using ":q"
iloop.process(settings);
try {
iloop.process(settings);
} catch (Exception e) {
// See throwSocketTimeoutExceptionForLongInactivity above;
// just let this thread ends
}
}
});

// This code should be put outside overrideScalaConsole above
// so that the output is not redirected to the client,
// in case repl has been closed by the client using ":q"
socketCleaner.run();
Log.log("REPL closed");

try {
// In case repl has been closed by the client using ":q",
// we need to close socket to notify the client to exit
socket.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}).start();
}.start();

return iloop;
}
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/scalive/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ static ServerSocket open(int port) throws IOException {

static void run(
ServerSocket serverSocket, String[] jarSearchDirs
) throws IOException, InvocationTargetException, NoSuchMethodException, ClassNotFoundException, IllegalAccessException {
) throws IOException, InvocationTargetException, NoSuchMethodException, ClassNotFoundException, IllegalAccessException, InterruptedException {
// Accept 2 connections (blocking)
Socket replSocket = serverSocket.accept();
Log.log("REPL connected");
Expand All @@ -34,8 +34,10 @@ static void run(
URLClassLoader cl = (URLClassLoader) ClassLoader.getSystemClassLoader();
loadDependencyJars(cl, jarSearchDirs);

ILoopWithCompletion iloop = Repl.run(replSocket, cl);
Completer.run(completerSocket, iloop);
Runnable socketCleaner = Net.getSocketCleaner(replSocket, completerSocket);

ILoopWithCompletion iloop = Repl.run(replSocket, cl, socketCleaner);
Completer.run(completerSocket, iloop, socketCleaner);
}

private static void loadDependencyJars(
Expand Down

0 comments on commit 127f9f4

Please sign in to comment.