Created
November 2, 2020 18:59
-
-
Save kevingessner/cb93fcdaee9961183fd2f8fb9ef27a8c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java | |
index 711cdd199e..713d6886d8 100644 | |
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java | |
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java | |
@@ -46,6 +46,7 @@ import io.grpc.Status.Code; | |
import io.grpc.StatusRuntimeException; | |
import io.netty.util.AbstractReferenceCounted; | |
import io.netty.util.ReferenceCounted; | |
+import java.io.FileOutputStream; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
@@ -244,7 +245,16 @@ class ByteStreamUploader extends AbstractReferenceCounted { | |
Futures.catchingAsync( | |
uploadResult, | |
StatusRuntimeException.class, | |
- (sre) -> Futures.immediateFailedFuture(new IOException(sre)), | |
+ (sre) -> { | |
+ String msg = String.format("failed uploading %s for %s (at offset %d/%d, %s)", hash, chunker, chunker.getOffset(), chunker.getSize(), chunker.hasNext() ? "has-next" : "no-next"); | |
+ chunker.reset(); | |
+ try (FileOutputStream out = new FileOutputStream("/tmp/bazel-chunker-" + hash.toString())) { | |
+ while (chunker.hasNext()) { | |
+ chunker.next().getData().writeTo(out); | |
+ } | |
+ } | |
+ return Futures.immediateFailedFuture(new IOException(msg, sre)); | |
+ }, | |
MoreExecutors.directExecutor()); | |
uploadsInProgress.put(hash, uploadResult); | |
@@ -281,9 +291,10 @@ class ByteStreamUploader extends AbstractReferenceCounted { | |
try { | |
chunker.reset(); | |
} catch (IOException e) { | |
- return Futures.immediateFailedFuture(e); | |
+ return Futures.immediateFailedFuture(new IOException(String.format("chunker failed for %s", hash), e)); | |
} | |
+ chunker.validating(hash); | |
UUID uploadId = UUID.randomUUID(); | |
String resourceName = uploadResourceName(instanceName, uploadId, hash, chunker.getSize()); | |
AsyncUpload newUpload = | |
diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java | |
index 7ce80ee24b..591cedc940 100644 | |
--- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java | |
+++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java | |
@@ -18,7 +18,12 @@ import static com.google.common.base.Preconditions.checkNotNull; | |
import static com.google.common.base.Preconditions.checkState; | |
import com.google.common.annotations.VisibleForTesting; | |
+import io.grpc.Status; | |
+import io.grpc.StatusRuntimeException; | |
import com.google.common.base.Throwables; | |
+import com.google.common.hash.HashCode; | |
+import com.google.common.hash.Hasher; | |
+import com.google.common.hash.Hashing; | |
import com.google.common.io.ByteStreams; | |
import com.google.devtools.build.lib.actions.ActionInput; | |
import com.google.devtools.build.lib.actions.ActionInputHelper; | |
@@ -101,6 +106,8 @@ public final class Chunker { | |
private InputStream data; | |
private long offset; | |
private byte[] chunkCache; | |
+ private HashCode expectedHash; | |
+ private Hasher hasher; | |
// Set to true on the first call to next(). This is so that the Chunker can open its data source | |
// lazily on the first call to next(), as opposed to opening it in the constructor or on reset(). | |
@@ -113,6 +120,10 @@ public final class Chunker { | |
this.emptyChunk = new Chunk(ByteString.EMPTY, 0); | |
} | |
+ public void validating(HashCode hash) { | |
+ this.expectedHash = hash; | |
+ } | |
+ | |
public long getOffset() { | |
return offset; | |
} | |
@@ -134,6 +145,7 @@ public final class Chunker { | |
offset = 0; | |
initialized = false; | |
chunkCache = null; | |
+ hasher = Hashing.sha256().newHasher(); | |
} | |
/** | |
@@ -142,6 +154,7 @@ public final class Chunker { | |
* <p>May close open resources in order to seek to an earlier offset. | |
*/ | |
public void seek(long toOffset) throws IOException { | |
+ expectedHash = null; | |
if (toOffset < offset) { | |
reset(); | |
} | |
@@ -204,8 +217,13 @@ public final class Chunker { | |
offset += bytesToRead; | |
ByteString blob = ByteString.copyFrom(chunkCache, 0, bytesToRead); | |
+ hasher.putBytes(blob.toByteArray()); | |
if (bytesLeft() == 0) { | |
+ HashCode hash = hasher.hash(); | |
+ if (expectedHash != null && !expectedHash.equals(hash)) { | |
+ throw new StatusRuntimeException(Status.FAILED_PRECONDITION.withDescription(String.format("mismatched hash! expected %s/%d, got %s", expectedHash, offset, hash))); | |
+ } | |
data.close(); | |
data = null; | |
chunkCache = null; | |
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java | |
index a4aa232220..790a78dab6 100644 | |
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java | |
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java | |
@@ -16,6 +16,7 @@ package com.google.devtools.build.lib.remote; | |
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture; | |
import static java.lang.String.format; | |
+import com.google.common.flogger.GoogleLogger; | |
import build.bazel.remote.execution.v2.Digest; | |
import build.bazel.remote.execution.v2.Directory; | |
import com.google.common.collect.ImmutableSet; | |
@@ -37,6 +38,7 @@ import java.util.Map; | |
/** A {@link RemoteCache} with additional functionality needed for remote execution. */ | |
public class RemoteExecutionCache extends RemoteCache { | |
+ private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); | |
public RemoteExecutionCache( | |
RemoteCacheClient protocolImpl, RemoteOptions options, DigestUtil digestUtil) { | |
@@ -74,13 +76,14 @@ public class RemoteExecutionCache extends RemoteCache { | |
throws IOException, InterruptedException { | |
Iterable<Digest> allDigests = | |
Iterables.concat(merkleTree.getAllDigests(), additionalInputs.keySet()); | |
- ImmutableSet<Digest> missingDigests = | |
- getFromFuture(cacheProtocol.findMissingDigests(allDigests)); | |
+ ImmutableSet<Digest> missingDigests = ImmutableSet.copyOf(allDigests); | |
+ //getFromFuture(cacheProtocol.findMissingDigests(allDigests)); | |
Map<Digest, Path> filesToUpload = new HashMap<>(); | |
Map<Digest, ByteString> blobsToUpload = new HashMap<>(); | |
for (Digest missingDigest : missingDigests) { | |
Directory node = merkleTree.getDirectoryByDigest(missingDigest); | |
if (node != null) { | |
+ logger.atWarning().log(String.format("uploading directory blob %s", DigestUtil.toString(missingDigest))); | |
blobsToUpload.put(missingDigest, node.toByteString()); | |
continue; | |
} | |
@@ -88,15 +91,18 @@ public class RemoteExecutionCache extends RemoteCache { | |
PathOrBytes file = merkleTree.getFileByDigest(missingDigest); | |
if (file != null) { | |
if (file.getBytes() != null) { | |
+ logger.atWarning().log(String.format("uploading file blob %s for %s", DigestUtil.toString(missingDigest), file.getPath())); | |
blobsToUpload.put(missingDigest, file.getBytes()); | |
continue; | |
} | |
+ logger.atWarning().log(String.format("uploading file path %s for %s", DigestUtil.toString(missingDigest), file.getPath())); | |
filesToUpload.put(missingDigest, file.getPath()); | |
continue; | |
} | |
Message message = additionalInputs.get(missingDigest); | |
if (message != null) { | |
+ logger.atWarning().log(String.format("uploading message blob %s for %s", DigestUtil.toString(missingDigest), message.toByteString().toString())); | |
blobsToUpload.put(missingDigest, message.toByteString()); | |
continue; | |
} | |
diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/BUILD b/src/main/java/com/google/devtools/build/lib/remote/common/BUILD | |
index 51a29ee892..c02ca5d69c 100644 | |
--- a/src/main/java/com/google/devtools/build/lib/remote/common/BUILD | |
+++ b/src/main/java/com/google/devtools/build/lib/remote/common/BUILD | |
@@ -18,5 +18,6 @@ java_library( | |
"//third_party:guava", | |
"//third_party/protobuf:protobuf_java", | |
"@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto", | |
+ "//third_party/grpc:grpc-jar", | |
], | |
) | |
diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java | |
index ae6a15113f..85cc6757a2 100644 | |
--- a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java | |
+++ b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java | |
@@ -21,6 +21,7 @@ import com.google.common.base.Preconditions; | |
import com.google.common.util.concurrent.ListenableFuture; | |
import com.google.devtools.build.lib.vfs.Path; | |
import com.google.protobuf.ByteString; | |
+import io.grpc.StatusRuntimeException; | |
import java.io.IOException; | |
import java.io.OutputStream; | |
diff --git a/src/main/java/com/google/devtools/build/lib/remote/merkletree/DirectoryTreeBuilder.java b/src/main/java/com/google/devtools/build/lib/remote/merkletree/DirectoryTreeBuilder.java | |
index 54bbeef432..dc0f0054f4 100644 | |
--- a/src/main/java/com/google/devtools/build/lib/remote/merkletree/DirectoryTreeBuilder.java | |
+++ b/src/main/java/com/google/devtools/build/lib/remote/merkletree/DirectoryTreeBuilder.java | |
@@ -139,6 +139,7 @@ class DirectoryTreeBuilder { | |
switch (metadata.getType()) { | |
case REGULAR_FILE: | |
Digest d = DigestUtil.buildDigest(metadata.getDigest(), metadata.getSize()); | |
+ System.out.println(String.format("adding input %s = %s = %s", path, input, metadata)); | |
currDir.addChild( | |
new FileNode( | |
path.getBaseName(), ActionInputHelper.toInputPath(input, execRoot), d)); | |
diff --git a/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java b/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java | |
index 7fac053ae3..418899c823 100644 | |
--- a/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java | |
+++ b/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java | |
@@ -35,6 +35,10 @@ import java.util.Map; | |
import java.util.SortedMap; | |
import java.util.concurrent.atomic.AtomicLong; | |
import javax.annotation.Nullable; | |
+import com.google.common.io.ByteStreams; | |
+import com.google.common.hash.HashCode; | |
+import com.google.common.hash.Hasher; | |
+import com.google.common.hash.Hashing; | |
/** A merkle tree representation as defined by the remote execution api. */ | |
public class MerkleTree { | |
@@ -174,6 +178,24 @@ public class MerkleTree { | |
for (DirectoryTree.FileNode file : files) { | |
b.addFiles(buildProto(file)); | |
digestPathMap.put(file.getDigest(), toPathOrBytes(file)); | |
+ try { | |
+ if (file.getPath() != null) { | |
+ try (java.io.InputStream in = file.getPath().getInputStream()) { | |
+ HashCode realHash = Hashing.sha256().hashBytes(ByteStreams.toByteArray(in)); | |
+ if (!realHash.toString().equals(file.getDigest().getHash())) { | |
+ throw new RuntimeException(String.format("merkle path %s/%s mismatch %s != %s", dirname, file.getPath(), realHash, file.getDigest().getHash())); | |
+ } | |
+ } | |
+ } | |
+ if (file.getBytes() != null) { | |
+ HashCode realHash = Hashing.sha256().hashBytes(file.getBytes().toByteArray()); | |
+ if (!realHash.toString().equals(file.getDigest().getHash())) { | |
+ throw new RuntimeException(String.format("merkle bytes %s/%s mismatch %s != %s", dirname, file.getPath(), realHash, file.getDigest().getHash())); | |
+ } | |
+ } | |
+ } catch (IOException e) { | |
+ throw new java.io.UncheckedIOException(e); | |
+ } | |
inputBytes.addAndGet(file.getDigest().getSizeBytes()); | |
} | |
for (DirectoryTree.DirectoryNode dir : dirs) { |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment