Skip to content

Commit

Permalink
deal with exception and always close stream
Browse files Browse the repository at this point in the history
Signed-off-by: zombee0 <flylucas_10@163.com>
  • Loading branch information
zombee0 committed Mar 10, 2023
1 parent c63f4c0 commit 879bc5b
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 7 deletions.
Expand Up @@ -46,6 +46,7 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Paths;
import java.security.SecureRandom;

Expand Down Expand Up @@ -151,4 +152,20 @@ public static void deleteLocalFileWithRemotePath(String key) {
LOG.warn("failed on deleting file: {}. msg: {}", hadoopOutputFile.getPath(), e);
}
}

public static void closeInputStreamIgnoreException(InputStream stream) {
try {
stream.close();
} catch (IOException e) {
//ignored
}
}

public static void closeOutputStreamIgnoreException(OutputStream stream) {
try {
stream.close();
} catch (IOException e) {
//ignored
}
}
}
Expand Up @@ -181,8 +181,8 @@ public static DiskCacheEntry newDiskCacheEntry(SeekableInputStream stream, long
HadoopOutputFile tmpOutputFile = (HadoopOutputFile) IOUtil.getTmpOutputFile(
METADATA_CACHE_DISK_PATH, key);
PositionOutputStream outputStream = tmpOutputFile.createOrOverwrite();
Path localFilePath = new Path(IOUtil.remoteToLocalFilePath(METADATA_CACHE_DISK_PATH, key));
try {
Path localFilePath = new Path(IOUtil.remoteToLocalFilePath(METADATA_CACHE_DISK_PATH, key));
while (totalBytesToRead > 0) {
// read the stream in 4MB chunk
int bytesToRead = (int) Math.min(BUFFER_CHUNK_SIZE, totalBytesToRead);
Expand Down Expand Up @@ -215,11 +215,17 @@ public static DiskCacheEntry newDiskCacheEntry(SeekableInputStream stream, long
}
} catch (IOException ex) {
try {
IOUtil.closeInputStreamIgnoreException(stream);
IOUtil.closeOutputStreamIgnoreException(outputStream);
tmpOutputFile.getFileSystem().delete(tmpOutputFile.getPath(), false);
} catch (IOException ioException) {
LOG.warn("failed on deleting file : {}. msg: {}", tmpOutputFile.getPath(), ioException);
}
throw new UncheckedIOException(ex);
} catch (IllegalArgumentException e) {
IOUtil.closeInputStreamIgnoreException(stream);
IOUtil.closeOutputStreamIgnoreException(outputStream);
throw e;
}
}
}
Expand Down Expand Up @@ -455,11 +461,14 @@ public static class DiskCacheSeekableInputStream extends SeekableInputStream {
}
@Override
public void close() throws IOException {
stream.close();
diskCache.asMap().computeIfPresent(key, (k, v) -> {
v.unpin();
return v;
});
try {
stream.close();
} finally {
diskCache.asMap().computeIfPresent(key, (k, v) -> {
v.unpin();
return v;
});
}
}

@Override
Expand Down Expand Up @@ -531,10 +540,10 @@ public boolean exists() {
}

private CacheEntry newCacheEntry() {
SeekableInputStream stream = null;
try {
long fileLength = getLength();
long totalBytesToRead = fileLength;
SeekableInputStream stream;
if (contentCache.isExistOnDiskCache(location())) {
LOG.debug(location() + " hit on disk cache");
stream = contentCache.getDiskSeekableStream(location());
Expand Down Expand Up @@ -568,6 +577,9 @@ private CacheEntry newCacheEntry() {
stream.close();
return new CacheEntry(fileLength - totalBytesToRead, buffers);
} catch (IOException ex) {
if (stream != null) {
IOUtil.closeInputStreamIgnoreException(stream);
}
throw new UncheckedIOException(ex);
}
}
Expand Down

0 comments on commit 879bc5b

Please sign in to comment.