From 7147d76254ab18021be81637f515e342b2583d6b Mon Sep 17 00:00:00 2001 From: Li Haoyi Date: Mon, 22 Jul 2024 16:16:31 +0800 Subject: [PATCH] Automatically pump `os.proc` streams when `SystemStreams` are redirected (#3275) Depends on https://github.com/com-lihaoyi/os-lib/pull/283 This moves the subprocess stream handling logic out of `Jvm.spawnSubprocess` and makes it apply to all `os.proc` invocations, greatly reducing the room for error. With this, `Jvm.spawnSubprocess` becomes a very thin wrapper around `os.proc.spawn`. We also rely directly on OS-Lib's own pumper threads to pump to our destination, rather than having them pump into in-memory buffers and then spawning our own pumper threads to pump from those buffers to the destination I spent some time looking into how to do the stdout/err handling at the process level, but couldn't find any reasonable mechanism to do so that allows us to preserve the ordering of the stdout/stderr. This is the original motivation to squishing it into one stream via `ProxyOutputStream`/`ProxyStreamPumper` and is important because otherwise you find e.g. stack traces out of order with printlns, which makes debugging very difficult. Might be possible using some socket/fifo/pipe cleverness, but not as part of this PR Added an integration test to assert on the subtleties of stdout, stderr, and their inherited alternatives. This PR is required for the test to pass --- build.sc | 3 +- .../feature/subprocess-stdout/repo/build.sc | 18 ++++ .../feature/subprocess-stdout/repo/mill | 67 ++++++++++++++ .../test/src/SubprocessStdoutTests.scala | 87 +++++++++++++++++++ .../integration/IntegrationTestSuite.scala | 33 +++++-- main/api/src/mill/api/SystemStreams.scala | 23 ++++- .../src/mill/main/client/InputPumper.java | 21 +++-- .../src/mill/main/client/MillClientMain.java | 2 +- main/util/src/mill/util/Jvm.scala | 48 ++-------- runner/src/mill/runner/MillServerMain.scala | 2 +- .../scalajslib/worker/ScalaJSWorkerImpl.scala | 7 +- 11 files changed, 249 insertions(+), 62 deletions(-) create mode 100644 integration/feature/subprocess-stdout/repo/build.sc create mode 100755 integration/feature/subprocess-stdout/repo/mill create mode 100644 integration/feature/subprocess-stdout/test/src/SubprocessStdoutTests.scala diff --git a/build.sc b/build.sc index 7a9be46507c..e46e89b25a4 100644 --- a/build.sc +++ b/build.sc @@ -153,7 +153,7 @@ object Deps { val junitInterface = ivy"com.github.sbt:junit-interface:0.13.3" val lambdaTest = ivy"de.tototec:de.tobiasroeser.lambdatest:0.8.0" val log4j2Core = ivy"org.apache.logging.log4j:log4j-core:2.23.1" - val osLib = ivy"com.lihaoyi::os-lib:0.10.2" + val osLib = ivy"com.lihaoyi::os-lib:0.10.3" val pprint = ivy"com.lihaoyi::pprint:0.9.0" val mainargs = ivy"com.lihaoyi::mainargs:0.7.0" val millModuledefsVersion = "0.10.9" @@ -618,6 +618,7 @@ object main extends MillStableScalaModule with BuildInfo { ) object api extends MillStableScalaModule with BuildInfo { + def moduleDeps = Seq(client) def buildInfoPackageName = "mill.api" def buildInfoMembers = Seq( BuildInfo.Value("millVersion", millVersion(), "Mill version."), diff --git a/integration/feature/subprocess-stdout/repo/build.sc b/integration/feature/subprocess-stdout/repo/build.sc new file mode 100644 index 00000000000..0e20b33b049 --- /dev/null +++ b/integration/feature/subprocess-stdout/repo/build.sc @@ -0,0 +1,18 @@ +import mill._ + + +def inheritInterleaved = T { + for (i <- Range.inclusive(1, 9)) { + println("print stdout" + i) + os.proc("echo", "proc stdout" + i).call(stdout = os.Inherit) + System.err.println("print stderr" + i) + os.proc("bash", "-c", s"echo proc stderr${i} >&2").call(stderr = os.Inherit) + } +} + +def inheritRaw = T{ + println("print stdoutRaw") + os.proc("echo", "proc stdoutRaw").call(stdout = os.InheritRaw) + System.err.println("print stderrRaw") + os.proc("bash", "-c", "echo proc stderrRaw >&2").call(stderr = os.InheritRaw) +} \ No newline at end of file diff --git a/integration/feature/subprocess-stdout/repo/mill b/integration/feature/subprocess-stdout/repo/mill new file mode 100755 index 00000000000..d3055fe23ef --- /dev/null +++ b/integration/feature/subprocess-stdout/repo/mill @@ -0,0 +1,67 @@ +#!/usr/bin/env sh + +# This is a wrapper script, that automatically download mill from GitHub release pages +# You can give the required mill version with MILL_VERSION env variable +# If no version is given, it falls back to the value of DEFAULT_MILL_VERSION + +set -e + +if [ -z "${DEFAULT_MILL_VERSION}" ] ; then + DEFAULT_MILL_VERSION=0.11.6 +fi + +if [ -z "$MILL_VERSION" ] ; then + if [ -f ".mill-version" ] ; then + MILL_VERSION="$(head -n 1 .mill-version 2> /dev/null)" + elif [ -f ".config/mill-version" ] ; then + MILL_VERSION="$(head -n 1 .config/mill-version 2> /dev/null)" + elif [ -f "mill" ] && [ "$0" != "mill" ] ; then + MILL_VERSION=$(grep -F "DEFAULT_MILL_VERSION=" "mill" | head -n 1 | cut -d= -f2) + else + MILL_VERSION=$DEFAULT_MILL_VERSION + fi +fi + +if [ "x${XDG_CACHE_HOME}" != "x" ] ; then + MILL_DOWNLOAD_PATH="${XDG_CACHE_HOME}/mill/download" +else + MILL_DOWNLOAD_PATH="${HOME}/.cache/mill/download" +fi +MILL_EXEC_PATH="${MILL_DOWNLOAD_PATH}/${MILL_VERSION}" + +version_remainder="$MILL_VERSION" +MILL_MAJOR_VERSION="${version_remainder%%.*}"; version_remainder="${version_remainder#*.}" +MILL_MINOR_VERSION="${version_remainder%%.*}"; version_remainder="${version_remainder#*.}" + +if [ ! -s "$MILL_EXEC_PATH" ] ; then + mkdir -p "$MILL_DOWNLOAD_PATH" + if [ "$MILL_MAJOR_VERSION" -gt 0 ] || [ "$MILL_MINOR_VERSION" -ge 5 ] ; then + ASSEMBLY="-assembly" + fi + DOWNLOAD_FILE=$MILL_EXEC_PATH-tmp-download + MILL_VERSION_TAG=$(echo $MILL_VERSION | sed -E 's/([^-]+)(-M[0-9]+)?(-.*)?/\1\2/') + MILL_DOWNLOAD_URL="https://repo1.maven.org/maven2/com/lihaoyi/mill-dist/$MILL_VERSION/mill-dist-$MILL_VERSION.jar" + curl --fail -L -o "$DOWNLOAD_FILE" "$MILL_DOWNLOAD_URL" + chmod +x "$DOWNLOAD_FILE" + mv "$DOWNLOAD_FILE" "$MILL_EXEC_PATH" + unset DOWNLOAD_FILE + unset MILL_DOWNLOAD_URL +fi + +if [ -z "$MILL_MAIN_CLI" ] ; then + MILL_MAIN_CLI="${0}" +fi + +MILL_FIRST_ARG="" + + # first arg is a long flag for "--interactive" or starts with "-i" +if [ "$1" = "--bsp" ] || [ "${1#"-i"}" != "$1" ] || [ "$1" = "--interactive" ] || [ "$1" = "--no-server" ] || [ "$1" = "--repl" ] || [ "$1" = "--help" ] ; then + # Need to preserve the first position of those listed options + MILL_FIRST_ARG=$1 + shift +fi + +unset MILL_DOWNLOAD_PATH +unset MILL_VERSION + +exec $MILL_EXEC_PATH $MILL_FIRST_ARG -D "mill.main.cli=${MILL_MAIN_CLI}" "$@" diff --git a/integration/feature/subprocess-stdout/test/src/SubprocessStdoutTests.scala b/integration/feature/subprocess-stdout/test/src/SubprocessStdoutTests.scala new file mode 100644 index 00000000000..958376340d9 --- /dev/null +++ b/integration/feature/subprocess-stdout/test/src/SubprocessStdoutTests.scala @@ -0,0 +1,87 @@ +package mill.integration + +import utest._ + +object SubprocessStdoutTests extends IntegrationTestSuite { + val tests: Tests = Tests { + initWorkspace() + + test { + val res1 = evalStdCombined("inheritInterleaved").out + // Make sure that when a lot of printed/inherited stdout/stderr is printed + // in quick succession, the output ordering is preserved and it doesn't get + // jumbled up + assert( + res1.contains( + s"""print stdout1 + |proc stdout1 + |print stderr1 + |proc stderr1 + |print stdout2 + |proc stdout2 + |print stderr2 + |proc stderr2 + |print stdout3 + |proc stdout3 + |print stderr3 + |proc stderr3 + |print stdout4 + |proc stdout4 + |print stderr4 + |proc stderr4 + |print stdout5 + |proc stdout5 + |print stderr5 + |proc stderr5 + |print stdout6 + |proc stdout6 + |print stderr6 + |proc stderr6 + |print stdout7 + |proc stdout7 + |print stderr7 + |proc stderr7 + |print stdout8 + |proc stdout8 + |print stderr8 + |proc stderr8 + |print stdout9 + |proc stdout9 + |print stderr9 + |proc stderr9""".stripMargin + ) + ) + + // Make sure subprocess output that isn't captures by all of Mill's stdout/stderr/os.Inherit + // redirects still gets pikced up from the stdout/stderr log files and displayed. They may + // be out of order from the original Mill stdout/stderr, but they should still at least turn + // up in the console somewhere and not disappear + // + val res2 = evalStdCombined("inheritRaw").out + if (integrationTestMode == "fork") { + // For `fork` tests, which represent `-i`/`--interactive`/`--no-server`, the output should + // be properly ordered since it all comes directly from the stdout/stderr of the same process + assert( + res2.contains( + """print stdoutRaw + |proc stdoutRaw + |print stderrRaw + |proc stderrRaw""".stripMargin + ) + ) + } else { + // Note that it should be out of order, because both `print`s will be captured and logged first, + // whereas the two `proc` outputs will get sent to their respective log files and only noticed + // a few milliseconds later as the files are polled for updates + assert( + res2.contains( + """print stdoutRaw + |print stderrRaw + |proc stdoutRaw + |proc stderrRaw""".stripMargin + ) + ) + } + } + } +} diff --git a/integration/src/mill/integration/IntegrationTestSuite.scala b/integration/src/mill/integration/IntegrationTestSuite.scala index 9c9cddd1997..b2224b610a5 100644 --- a/integration/src/mill/integration/IntegrationTestSuite.scala +++ b/integration/src/mill/integration/IntegrationTestSuite.scala @@ -5,7 +5,7 @@ import mill.resolve.SelectMode import mill.runner.RunnerState import os.{Path, Shellable} import utest._ - +import collection.mutable import scala.util.control.NonFatal object IntegrationTestSuite { @@ -38,20 +38,39 @@ abstract class IntegrationTestSuite extends TestSuite { } def evalTimeoutStdout(timeout: Long, s: Shellable*): IntegrationTestSuite.EvalResult = { + val output = mutable.Buffer.empty[String] + val error = mutable.Buffer.empty[String] + + evalTimeoutStdout0(timeout, output, error, s) + + } - val output = Seq.newBuilder[String] - val error = Seq.newBuilder[String] - val processOutput = os.ProcessOutput.Readlines(output += _) - val processError = os.ProcessOutput.Readlines(error += _) + def evalTimeoutStdout0( + timeout: Long, + output: mutable.Buffer[String], + error: mutable.Buffer[String], + s: Seq[Shellable] + ): IntegrationTestSuite.EvalResult = { + + val processOutput = os.ProcessOutput.Readlines(s => synchronized(output.append(s))) + val processError = os.ProcessOutput.Readlines(s => synchronized(error.append(s))) val result = evalFork(processOutput, processError, s, timeout) + IntegrationTestSuite.EvalResult( result, - output.result().mkString("\n"), - error.result().mkString("\n") + synchronized(output.mkString("\n")), + synchronized(error.mkString("\n")) ) } + // Combines stdout and stderr into a single stream; useful for testing + // against the combined output and also asserting on ordering + def evalStdCombined(s: Shellable*): IntegrationTestSuite.EvalResult = { + val combined = mutable.Buffer.empty[String] + evalTimeoutStdout0(-1, combined, combined, s) + } + val millReleaseFileOpt: Option[Path] = Option(System.getenv("MILL_TEST_LAUNCHER")).map(os.Path(_, os.pwd)) diff --git a/main/api/src/mill/api/SystemStreams.scala b/main/api/src/mill/api/SystemStreams.scala index aecd81a7566..2389ebee320 100644 --- a/main/api/src/mill/api/SystemStreams.scala +++ b/main/api/src/mill/api/SystemStreams.scala @@ -1,6 +1,7 @@ package mill.api -import java.io.{InputStream, PrintStream} +import java.io.{InputStream, OutputStream, PrintStream} +import mill.main.client.InputPumper /** * Represents a set of streams that look similar to those provided by the @@ -48,6 +49,18 @@ object SystemStreams { def originalErr: PrintStream = original.err + private class PumpedProcessInput extends os.ProcessInput { + def redirectFrom = ProcessBuilder.Redirect.PIPE + def processInput(processIn: => os.SubProcess.InputStream): Some[InputPumper] = Some( + new InputPumper(() => System.in, () => processIn, true, () => true) + ) + } + + private class PumpedProcessOutput(dest: OutputStream) extends os.ProcessOutput { + def redirectTo = ProcessBuilder.Redirect.PIPE + def processOutput(processOut: => os.SubProcess.OutputStream): Some[InputPumper] = + Some(new InputPumper(() => processOut, () => dest, false, () => true)) + } def withStreams[T](systemStreams: SystemStreams)(t: => T): T = { val in = System.in val out = System.out @@ -59,7 +72,13 @@ object SystemStreams { Console.withIn(systemStreams.in) { Console.withOut(systemStreams.out) { Console.withErr(systemStreams.err) { - t + os.Inherit.in.withValue(new PumpedProcessInput) { + os.Inherit.out.withValue(new PumpedProcessOutput(System.out)) { + os.Inherit.err.withValue(new PumpedProcessOutput(System.err)) { + t + } + } + } } } } diff --git a/main/client/src/mill/main/client/InputPumper.java b/main/client/src/mill/main/client/InputPumper.java index cfd5a242884..cbd5ecf460d 100644 --- a/main/client/src/mill/main/client/InputPumper.java +++ b/main/client/src/mill/main/client/InputPumper.java @@ -2,29 +2,34 @@ import java.io.InputStream; import java.io.OutputStream; +import java.util.function.Supplier; public class InputPumper implements Runnable{ - private InputStream src; - private OutputStream dest; + private Supplier src0; + private Supplier dest0; + private Boolean checkAvailable; private java.util.function.BooleanSupplier runningCheck; - public InputPumper(InputStream src, - OutputStream dest, + public InputPumper(Supplier src, + Supplier dest, Boolean checkAvailable){ this(src, dest, checkAvailable, () -> true); } - public InputPumper(InputStream src, - OutputStream dest, + public InputPumper(Supplier src, + Supplier dest, Boolean checkAvailable, java.util.function.BooleanSupplier runningCheck){ - this.src = src; - this.dest = dest; + this.src0 = src; + this.dest0 = dest; this.checkAvailable = checkAvailable; this.runningCheck = runningCheck; } boolean running = true; public void run() { + InputStream src = src0.get(); + OutputStream dest = dest0.get(); + byte[] buffer = new byte[1024]; try{ while(running){ diff --git a/main/client/src/mill/main/client/MillClientMain.java b/main/client/src/mill/main/client/MillClientMain.java index e6b7d3a3969..5687c8b251c 100644 --- a/main/client/src/mill/main/client/MillClientMain.java +++ b/main/client/src/mill/main/client/MillClientMain.java @@ -221,7 +221,7 @@ public static int run( InputStream outErr = ioSocket.getInputStream(); OutputStream in = ioSocket.getOutputStream(); ProxyStreamPumper outPump = new ProxyStreamPumper(outErr, stdout, stderr); - InputPumper inPump = new InputPumper(stdin, in, true); + InputPumper inPump = new InputPumper(() -> stdin, () -> in, true); Thread outThread = new Thread(outPump, "outPump"); outThread.setDaemon(true); Thread inThread = new Thread(inPump, "inPump"); diff --git a/main/util/src/mill/util/Jvm.scala b/main/util/src/mill/util/Jvm.scala index 6267852c5f7..78417d35ed7 100644 --- a/main/util/src/mill/util/Jvm.scala +++ b/main/util/src/mill/util/Jvm.scala @@ -2,7 +2,6 @@ package mill.util import mill.api.Loose.Agg import mill.api._ -import mill.main.client.InputPumper import os.{ProcessOutput, SubProcess} import java.io._ @@ -223,46 +222,13 @@ object Jvm extends CoursierSupport { workingDir: os.Path, backgroundOutputs: Option[Tuple2[ProcessOutput, ProcessOutput]] = None ): SubProcess = { - // If System.in is fake, then we pump output manually rather than relying - // on `os.Inherit`. That is because `os.Inherit` does not follow changes - // to System.in/System.out/System.err, so the subprocess's streams get sent - // to the parent process's origin outputs even if we want to direct them - // elsewhere - - if (!SystemStreams.isOriginal()) { - val process = os.proc(commandArgs).spawn( - cwd = workingDir, - env = envArgs, - stdin = if (backgroundOutputs.isEmpty) os.Pipe else "", - stdout = backgroundOutputs.map(_._1).getOrElse(os.Pipe), - stderr = backgroundOutputs.map(_._2).getOrElse(os.Pipe) - ) - - val sources = Seq( - (process.stdout, System.out, "spawnSubprocess.stdout", false, () => true), - (process.stderr, System.err, "spawnSubprocess.stderr", false, () => true), - (System.in, process.stdin, "spawnSubprocess.stdin", true, () => process.isAlive()) - ) - - for ((std, dest, name, checkAvailable, runningCheck) <- sources) { - val t = new Thread( - new InputPumper(std, dest, checkAvailable, () => runningCheck()), - name - ) - t.setDaemon(true) - t.start() - } - - process - } else { - os.proc(commandArgs).spawn( - cwd = workingDir, - env = envArgs, - stdin = if (backgroundOutputs.isEmpty) os.Inherit else "", - stdout = backgroundOutputs.map(_._1).getOrElse(os.Inherit), - stderr = backgroundOutputs.map(_._2).getOrElse(os.Inherit) - ) - } + os.proc(commandArgs).spawn( + cwd = workingDir, + env = envArgs, + stdin = if (backgroundOutputs.isEmpty) os.Inherit else "", + stdout = backgroundOutputs.map(_._1).getOrElse(os.Inherit), + stderr = backgroundOutputs.map(_._2).getOrElse(os.Inherit) + ) } def runLocal( diff --git a/runner/src/mill/runner/MillServerMain.scala b/runner/src/mill/runner/MillServerMain.scala index 53ab81c4e5b..0a8fd606293 100644 --- a/runner/src/mill/runner/MillServerMain.scala +++ b/runner/src/mill/runner/MillServerMain.scala @@ -135,7 +135,7 @@ class Server[T]( val pipedInput = new PipedInputStream() val pipedOutput = new PipedOutputStream() pipedOutput.connect(pipedInput) - val pumper = new InputPumper(in, pipedOutput, false) + val pumper = new InputPumper(() => in, () => pipedOutput, false) val pumperThread = new Thread(pumper, "proxyInputStreamThroughPumper") pumperThread.setDaemon(true) pumperThread.start() diff --git a/scalajslib/worker/1/src/mill/scalajslib/worker/ScalaJSWorkerImpl.scala b/scalajslib/worker/1/src/mill/scalajslib/worker/ScalaJSWorkerImpl.scala index 79ab9b78284..d6f2164ff72 100644 --- a/scalajslib/worker/1/src/mill/scalajslib/worker/ScalaJSWorkerImpl.scala +++ b/scalajslib/worker/1/src/mill/scalajslib/worker/ScalaJSWorkerImpl.scala @@ -305,7 +305,12 @@ class ScalaJSWorkerImpl extends ScalaJSWorkerApi { for ((std, dest, name, checkAvailable, runningCheck) <- sources) { val t = new Thread( - new mill.main.client.InputPumper(std, dest, checkAvailable, () => runningCheck()), + new mill.main.client.InputPumper( + () => std, + () => dest, + checkAvailable, + () => runningCheck() + ), name ) t.setDaemon(true)