Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically pump os.proc streams when SystemStreams are redirected #3275

Merged
merged 17 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.sc
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ object Deps {
val junitInterface = ivy"com.github.sbt:junit-interface:0.13.3"
val lambdaTest = ivy"de.tototec:de.tobiasroeser.lambdatest:0.8.0"
val log4j2Core = ivy"org.apache.logging.log4j:log4j-core:2.23.1"
val osLib = ivy"com.lihaoyi::os-lib:0.10.2"
val osLib = ivy"com.lihaoyi::os-lib:0.10.3"
val pprint = ivy"com.lihaoyi::pprint:0.9.0"
val mainargs = ivy"com.lihaoyi::mainargs:0.7.0"
val millModuledefsVersion = "0.10.9"
Expand Down Expand Up @@ -618,6 +618,7 @@ object main extends MillStableScalaModule with BuildInfo {
)

object api extends MillStableScalaModule with BuildInfo {
def moduleDeps = Seq(client)
def buildInfoPackageName = "mill.api"
def buildInfoMembers = Seq(
BuildInfo.Value("millVersion", millVersion(), "Mill version."),
Expand Down
18 changes: 18 additions & 0 deletions integration/feature/subprocess-stdout/repo/build.sc
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import mill._


def inheritInterleaved = T {
for (i <- Range.inclusive(1, 9)) {
println("print stdout" + i)
os.proc("echo", "proc stdout" + i).call(stdout = os.Inherit)
System.err.println("print stderr" + i)
os.proc("bash", "-c", s"echo proc stderr${i} >&2").call(stderr = os.Inherit)
}
}

def inheritRaw = T{
println("print stdoutRaw")
os.proc("echo", "proc stdoutRaw").call(stdout = os.InheritRaw)
System.err.println("print stderrRaw")
os.proc("bash", "-c", "echo proc stderrRaw >&2").call(stderr = os.InheritRaw)
}
67 changes: 67 additions & 0 deletions integration/feature/subprocess-stdout/repo/mill
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#!/usr/bin/env sh

# This is a wrapper script, that automatically download mill from GitHub release pages
# You can give the required mill version with MILL_VERSION env variable
# If no version is given, it falls back to the value of DEFAULT_MILL_VERSION

set -e

if [ -z "${DEFAULT_MILL_VERSION}" ] ; then
DEFAULT_MILL_VERSION=0.11.6
fi

if [ -z "$MILL_VERSION" ] ; then
if [ -f ".mill-version" ] ; then
MILL_VERSION="$(head -n 1 .mill-version 2> /dev/null)"
elif [ -f ".config/mill-version" ] ; then
MILL_VERSION="$(head -n 1 .config/mill-version 2> /dev/null)"
elif [ -f "mill" ] && [ "$0" != "mill" ] ; then
MILL_VERSION=$(grep -F "DEFAULT_MILL_VERSION=" "mill" | head -n 1 | cut -d= -f2)
else
MILL_VERSION=$DEFAULT_MILL_VERSION
fi
fi

if [ "x${XDG_CACHE_HOME}" != "x" ] ; then
MILL_DOWNLOAD_PATH="${XDG_CACHE_HOME}/mill/download"
else
MILL_DOWNLOAD_PATH="${HOME}/.cache/mill/download"
fi
MILL_EXEC_PATH="${MILL_DOWNLOAD_PATH}/${MILL_VERSION}"

version_remainder="$MILL_VERSION"
MILL_MAJOR_VERSION="${version_remainder%%.*}"; version_remainder="${version_remainder#*.}"
MILL_MINOR_VERSION="${version_remainder%%.*}"; version_remainder="${version_remainder#*.}"

if [ ! -s "$MILL_EXEC_PATH" ] ; then
mkdir -p "$MILL_DOWNLOAD_PATH"
if [ "$MILL_MAJOR_VERSION" -gt 0 ] || [ "$MILL_MINOR_VERSION" -ge 5 ] ; then
ASSEMBLY="-assembly"
fi
DOWNLOAD_FILE=$MILL_EXEC_PATH-tmp-download
MILL_VERSION_TAG=$(echo $MILL_VERSION | sed -E 's/([^-]+)(-M[0-9]+)?(-.*)?/\1\2/')
MILL_DOWNLOAD_URL="https://repo1.maven.org/maven2/com/lihaoyi/mill-dist/$MILL_VERSION/mill-dist-$MILL_VERSION.jar"
curl --fail -L -o "$DOWNLOAD_FILE" "$MILL_DOWNLOAD_URL"
chmod +x "$DOWNLOAD_FILE"
mv "$DOWNLOAD_FILE" "$MILL_EXEC_PATH"
unset DOWNLOAD_FILE
unset MILL_DOWNLOAD_URL
fi

if [ -z "$MILL_MAIN_CLI" ] ; then
MILL_MAIN_CLI="${0}"
fi

MILL_FIRST_ARG=""

# first arg is a long flag for "--interactive" or starts with "-i"
if [ "$1" = "--bsp" ] || [ "${1#"-i"}" != "$1" ] || [ "$1" = "--interactive" ] || [ "$1" = "--no-server" ] || [ "$1" = "--repl" ] || [ "$1" = "--help" ] ; then
# Need to preserve the first position of those listed options
MILL_FIRST_ARG=$1
shift
fi

unset MILL_DOWNLOAD_PATH
unset MILL_VERSION

exec $MILL_EXEC_PATH $MILL_FIRST_ARG -D "mill.main.cli=${MILL_MAIN_CLI}" "$@"
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package mill.integration

import utest._

object SubprocessStdoutTests extends IntegrationTestSuite {
val tests: Tests = Tests {
initWorkspace()

test {
val res1 = evalStdCombined("inheritInterleaved").out
// Make sure that when a lot of printed/inherited stdout/stderr is printed
// in quick succession, the output ordering is preserved and it doesn't get
// jumbled up
assert(
res1.contains(
s"""print stdout1
|proc stdout1
|print stderr1
|proc stderr1
|print stdout2
|proc stdout2
|print stderr2
|proc stderr2
|print stdout3
|proc stdout3
|print stderr3
|proc stderr3
|print stdout4
|proc stdout4
|print stderr4
|proc stderr4
|print stdout5
|proc stdout5
|print stderr5
|proc stderr5
|print stdout6
|proc stdout6
|print stderr6
|proc stderr6
|print stdout7
|proc stdout7
|print stderr7
|proc stderr7
|print stdout8
|proc stdout8
|print stderr8
|proc stderr8
|print stdout9
|proc stdout9
|print stderr9
|proc stderr9""".stripMargin
)
)

// Make sure subprocess output that isn't captures by all of Mill's stdout/stderr/os.Inherit
// redirects still gets pikced up from the stdout/stderr log files and displayed. They may
// be out of order from the original Mill stdout/stderr, but they should still at least turn
// up in the console somewhere and not disappear
//
val res2 = evalStdCombined("inheritRaw").out
if (integrationTestMode == "fork") {
// For `fork` tests, which represent `-i`/`--interactive`/`--no-server`, the output should
// be properly ordered since it all comes directly from the stdout/stderr of the same process
assert(
res2.contains(
"""print stdoutRaw
|proc stdoutRaw
|print stderrRaw
|proc stderrRaw""".stripMargin
)
)
} else {
// Note that it should be out of order, because both `print`s will be captured and logged first,
// whereas the two `proc` outputs will get sent to their respective log files and only noticed
// a few milliseconds later as the files are polled for updates
assert(
res2.contains(
"""print stdoutRaw
|print stderrRaw
|proc stdoutRaw
|proc stderrRaw""".stripMargin
)
)
}
}
}
}
33 changes: 26 additions & 7 deletions integration/src/mill/integration/IntegrationTestSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import mill.resolve.SelectMode
import mill.runner.RunnerState
import os.{Path, Shellable}
import utest._

import collection.mutable
import scala.util.control.NonFatal

object IntegrationTestSuite {
Expand Down Expand Up @@ -38,20 +38,39 @@ abstract class IntegrationTestSuite extends TestSuite {
}

def evalTimeoutStdout(timeout: Long, s: Shellable*): IntegrationTestSuite.EvalResult = {
val output = mutable.Buffer.empty[String]
val error = mutable.Buffer.empty[String]

evalTimeoutStdout0(timeout, output, error, s)

}

val output = Seq.newBuilder[String]
val error = Seq.newBuilder[String]
val processOutput = os.ProcessOutput.Readlines(output += _)
val processError = os.ProcessOutput.Readlines(error += _)
def evalTimeoutStdout0(
timeout: Long,
output: mutable.Buffer[String],
error: mutable.Buffer[String],
s: Seq[Shellable]
): IntegrationTestSuite.EvalResult = {

val processOutput = os.ProcessOutput.Readlines(s => synchronized(output.append(s)))
val processError = os.ProcessOutput.Readlines(s => synchronized(error.append(s)))

val result = evalFork(processOutput, processError, s, timeout)

IntegrationTestSuite.EvalResult(
result,
output.result().mkString("\n"),
error.result().mkString("\n")
synchronized(output.mkString("\n")),
synchronized(error.mkString("\n"))
)
}

// Combines stdout and stderr into a single stream; useful for testing
// against the combined output and also asserting on ordering
def evalStdCombined(s: Shellable*): IntegrationTestSuite.EvalResult = {
val combined = mutable.Buffer.empty[String]
evalTimeoutStdout0(-1, combined, combined, s)
}

val millReleaseFileOpt: Option[Path] =
Option(System.getenv("MILL_TEST_LAUNCHER")).map(os.Path(_, os.pwd))

Expand Down
23 changes: 21 additions & 2 deletions main/api/src/mill/api/SystemStreams.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mill.api

import java.io.{InputStream, PrintStream}
import java.io.{InputStream, OutputStream, PrintStream}
import mill.main.client.InputPumper

/**
* Represents a set of streams that look similar to those provided by the
Expand Down Expand Up @@ -48,6 +49,18 @@ object SystemStreams {

def originalErr: PrintStream = original.err

private class PumpedProcessInput extends os.ProcessInput {
def redirectFrom = ProcessBuilder.Redirect.PIPE
def processInput(processIn: => os.SubProcess.InputStream): Some[InputPumper] = Some(
new InputPumper(() => System.in, () => processIn, true, () => true)
)
}

private class PumpedProcessOutput(dest: OutputStream) extends os.ProcessOutput {
def redirectTo = ProcessBuilder.Redirect.PIPE
def processOutput(processOut: => os.SubProcess.OutputStream): Some[InputPumper] =
Some(new InputPumper(() => processOut, () => dest, false, () => true))
}
def withStreams[T](systemStreams: SystemStreams)(t: => T): T = {
val in = System.in
val out = System.out
Expand All @@ -59,7 +72,13 @@ object SystemStreams {
Console.withIn(systemStreams.in) {
Console.withOut(systemStreams.out) {
Console.withErr(systemStreams.err) {
t
os.Inherit.in.withValue(new PumpedProcessInput) {
os.Inherit.out.withValue(new PumpedProcessOutput(System.out)) {
os.Inherit.err.withValue(new PumpedProcessOutput(System.err)) {
t
}
}
}
}
}
}
Expand Down
21 changes: 13 additions & 8 deletions main/client/src/mill/main/client/InputPumper.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,34 @@

import java.io.InputStream;
import java.io.OutputStream;
import java.util.function.Supplier;

public class InputPumper implements Runnable{
private InputStream src;
private OutputStream dest;
private Supplier<InputStream> src0;
private Supplier<OutputStream> dest0;

private Boolean checkAvailable;
private java.util.function.BooleanSupplier runningCheck;
public InputPumper(InputStream src,
OutputStream dest,
public InputPumper(Supplier<InputStream> src,
Supplier<OutputStream> dest,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making these lazily-initialized is necessary to avoid infinite recursion when initializing the pumper <-> stream <-> subprocess data structure, which is circular

Boolean checkAvailable){
this(src, dest, checkAvailable, () -> true);
}
public InputPumper(InputStream src,
OutputStream dest,
public InputPumper(Supplier<InputStream> src,
Supplier<OutputStream> dest,
Boolean checkAvailable,
java.util.function.BooleanSupplier runningCheck){
this.src = src;
this.dest = dest;
this.src0 = src;
this.dest0 = dest;
this.checkAvailable = checkAvailable;
this.runningCheck = runningCheck;
}

boolean running = true;
public void run() {
InputStream src = src0.get();
OutputStream dest = dest0.get();

byte[] buffer = new byte[1024];
try{
while(running){
Expand Down
2 changes: 1 addition & 1 deletion main/client/src/mill/main/client/MillClientMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public static int run(
InputStream outErr = ioSocket.getInputStream();
OutputStream in = ioSocket.getOutputStream();
ProxyStreamPumper outPump = new ProxyStreamPumper(outErr, stdout, stderr);
InputPumper inPump = new InputPumper(stdin, in, true);
InputPumper inPump = new InputPumper(() -> stdin, () -> in, true);
Thread outThread = new Thread(outPump, "outPump");
outThread.setDaemon(true);
Thread inThread = new Thread(inPump, "inPump");
Expand Down
Loading