Skip to content

Commit

Permalink
Add Windows support
Browse files Browse the repository at this point in the history
  • Loading branch information
fmeum committed Jun 4, 2024
1 parent bb3c4c6 commit 907d653
Showing 1 changed file with 112 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,38 @@
import com.google.common.flogger.GoogleLogger;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.EventHandler;
import com.google.devtools.build.lib.util.OS;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.common.options.OptionsProvider;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.stream.JsonReader;
import com.sun.nio.file.ExtendedOpenOption;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.Reader;
import java.io.Writer;
import java.net.UnixDomainSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousByteChannel;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.Channels;
import java.nio.channels.CompletionHandler;
import java.nio.channels.SocketChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.stream.Stream;
import javax.annotation.Nullable;

Expand All @@ -42,8 +55,8 @@ public class WatchmanDiffAwareness extends LocalDiffAwareness {
private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().create();

private final ImmutableSet<String> ignoredPaths;
// The watchman channel is kept open for as long as watchfs is enabled.
@Nullable private SocketChannel watchmanChannel;
// The watchman socket is kept open for as long as watchfs is enabled.
@Nullable private WatchmanSocket watchmanSocket;
@Nullable private List<Object> nextQueryCommand;

protected WatchmanDiffAwareness(String watchRoot, ImmutableSet<Path> ignoredPaths) {
Expand Down Expand Up @@ -77,7 +90,7 @@ private void init(PathFragment watchmanPath) throws IOException {
return;
}

watchmanChannel = SocketChannel.open(UnixDomainSocketAddress.of(rawSockname));
watchmanSocket = WatchmanSocket.connect(Path.of(rawSockname));
}

@Override
Expand All @@ -87,18 +100,18 @@ public View getCurrentView(OptionsProvider options, EventHandler eventHandler)
boolean useWatchman = watchFsOptions.watchFS && !watchFsOptions.watchmanPath.isEmpty();
// See the comments on WatchServiceDiffAwareness#getCurrentView for the handling of the state
// changes below.
if (useWatchman && watchmanChannel == null) {
if (useWatchman && watchmanSocket == null) {
try {
init(watchFsOptions.watchmanPath);
} catch (IOException e) {
throw new BrokenDiffAwarenessException(
"Error encountered with watchman: " + e);
}
} else if (!useWatchman && watchmanChannel != null) {
} else if (!useWatchman && watchmanSocket != null) {
close();
throw new BrokenDiffAwarenessException("Switched off --watchfs again");
}
if (watchmanChannel == null) {
if (watchmanSocket == null) {
return EVERYTHING_MODIFIED;
}
Set<Path> modifiedAbsolutePaths;
Expand Down Expand Up @@ -133,7 +146,8 @@ public View getCurrentView(OptionsProvider options, EventHandler eventHandler)
private void watchProject(Path watchRootPath, EventHandler eventHandler)
throws IOException, BrokenDiffAwarenessException {
sendCommand(List.of("watch-project", watchRootPath.toString()));
String responseJson = new BufferedReader(Channels.newReader(watchmanChannel, UTF_8)).readLine();

String responseJson = watchmanSocket.reader.readLine();
JsonObject response = GSON.fromJson(responseJson, JsonObject.class);
if (response.has("error")) {
throw new IOException(response.get("error").getAsString());
Expand Down Expand Up @@ -167,7 +181,7 @@ private ImmutableSet<Path> collectChangesAndUpdateClockspec(EventHandler eventHa

boolean isFreshInstance = false;
ImmutableSet.Builder<Path> modifiedPaths = ImmutableSet.builder();
var json = new JsonReader(new BufferedReader(Channels.newReader(watchmanChannel, UTF_8)));
var json = new JsonReader(watchmanSocket.reader);
json.beginObject();
while (json.hasNext()) {
switch (json.nextName()) {
Expand Down Expand Up @@ -225,23 +239,106 @@ private void updateClockspec(String clockspec) {
}

private void sendCommand(List<Object> command) throws IOException {
var writer = new BufferedWriter(Channels.newWriter(watchmanChannel, UTF_8));
GSON.toJson(command, writer);
writer.write('\n');
writer.flush();
GSON.toJson(command, watchmanSocket.writer);
watchmanSocket.writer.write('\n');
watchmanSocket.writer.flush();
}

@Override
public void close() {
if (watchmanChannel != null) {
if (watchmanSocket != null) {
try {
watchmanChannel.close();
watchmanSocket.close();
} catch (IOException e) {
logger.atInfo().withCause(e).log("While closing watchman channel: %s", e.getMessage());
} finally {
watchmanChannel = null;
watchmanSocket = null;
nextQueryCommand = null;
}
}
}

/**
* A persistent connection to Watchman, either via a Unix domain socket or a Windows named pipe.
*/
private static final class WatchmanSocket implements Closeable {
private final BufferedReader reader;
private final BufferedWriter writer;

private WatchmanSocket(BufferedReader reader, BufferedWriter writer) {
this.reader = reader;
this.writer = writer;
}

static WatchmanSocket connect(Path path) throws IOException {
Reader reader;
Writer writer;
if (OS.getCurrent() == OS.WINDOWS) {
// On Windows, Watchman uses a named pipe for communication. This call mimics
// https://github.com/facebook/watchman/blob/067252e00bb111fc199daf6aeb7e6572613a153b/watchman/java/src/com/facebook/watchman/windowspipe/WindowsNamedPipe.java#L40-L49
var channel =
asAsynchronousByteChannel(
AsynchronousFileChannel.open(
path,
StandardOpenOption.READ,
StandardOpenOption.WRITE,
ExtendedOpenOption.NOSHARE_READ,
ExtendedOpenOption.NOSHARE_WRITE));
reader = new InputStreamReader(Channels.newInputStream(channel), UTF_8);
writer = new OutputStreamWriter(Channels.newOutputStream(channel), UTF_8);
} else {
var channel = SocketChannel.open(UnixDomainSocketAddress.of(path));
reader = Channels.newReader(channel, UTF_8);
writer = Channels.newWriter(channel, UTF_8);
}
return new WatchmanSocket(new BufferedReader(reader), new BufferedWriter(writer));
}

@Override
public void close() throws IOException {
reader.close();
writer.close();
}
}

/**
* Returns an {@link AsynchronousByteChannel} that reads from the beginning of the given {@link
* AsynchronousFileChannel}.
*/
private static AsynchronousByteChannel asAsynchronousByteChannel(
AsynchronousFileChannel channel) {
return new AsynchronousByteChannel() {
@Override
public boolean isOpen() {
return channel.isOpen();
}

@Override
public void close() throws IOException {
channel.close();
}

@Override
public <A> void read(
ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
channel.read(dst, 0, attachment, handler);
}

@Override
public Future<Integer> read(ByteBuffer dst) {
return channel.read(dst, 0);
}

@Override
public <A> void write(
ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
channel.write(src, 0, attachment, handler);
}

@Override
public Future<Integer> write(ByteBuffer src) {
return channel.write(src, 0);
}
};
}
}

0 comments on commit 907d653

Please sign in to comment.