I want to post here a solution that I found back then, when the question was actual:
public class ZipServiceImpl implements ZipService {
private final Logger LOGGER = LoggerFactory.getLogger(ZipServiceImpl.class);
@Autowired
private AmazonS3ServiceImpl s3Service;
/**
* Method for simultaneous downloading files
* from the S3 bucket, generating a zip and
* uploading that zip into bucket
*
* @param keys
*/
@Override
public void createZip(Map<String, String> keys, String zipName) {
final PipedOutputStream pipedOutputStream = new PipedOutputStream();
final PipedInputStream pipedInputStream;
try {
pipedInputStream = new PipedInputStream(pipedOutputStream);
} catch (IOException e) {
LOGGER.error("ZipServiceImpl - createZip - failed to create input stream");
throw new RuntimeException(e);
}
final Thread downloadAndZip = getDownloadAndZipThread(keys, pipedOutputStream);
final Thread upload = getUploadThread(zipName, pipedInputStream);
downloadAndZip.start();
upload.start();
try {
downloadAndZip.join();
upload.join();
} catch (InterruptedException e) {
LOGGER.error("ZipService - failed to join thread due to error: " + e);
throw new RuntimeException(e);
}
}
private Thread getDownloadAndZipThread(Map<String, String> keys, PipedOutputStream pipedOutputStream) {
return new Thread(() -> {
long start = System.currentTimeMillis();
try (final ZipArchiveOutputStream zipOutputStream = new ZipArchiveOutputStream(pipedOutputStream)) {
for (Map.Entry<String, String> entry : keys.entrySet()) {
try {
downloadAndZip(zipOutputStream, entry.getKey(), entry.getValue());
} catch (Exception e) {
LOGGER.error("ZipServiceImpl - getDownloadAndZipThread - failed to download file: " + entry.getKey());
}
}
} catch (Exception e) {
LOGGER.error("ZipService - getDownloadAndZipThread - Failed to process inputStream due to error: " + e);
throw new RuntimeException(e);
}
long executedTime = System.currentTimeMillis() - start;
LOGGER.info("ZipService - getDownloadAndZipThread - execution time: " + executedTime);
});
}
/**
* Instantiating of thread for uploading file into bucket
*
* @param filename
* @return
*/
private Thread getUploadThread(String filename, PipedInputStream pipedInputStream) {
return new Thread(() -> {
long start = System.currentTimeMillis();
try {
s3Service.multipartUpload(filename, pipedInputStream);
pipedInputStream.close();
} catch (Exception e) {
LOGGER.error("Failed to process outputStream due to error: " + e);
throw new RuntimeException(e);
}
long executedTime = System.currentTimeMillis() - start;
LOGGER.info("ZipService - getUploadThread - execution time: " + executedTime);
});
}
/**
* @param zipOutputStream -
* @param awsKey - name of the file that should be downloaded
*/
private void downloadAndZip(ZipArchiveOutputStream zipOutputStream, String awsKey, String destName) {
if (!s3Service.existsAssetByName(awsKey)) {
String error = "ZipService - downloadAndZip - file with following aws key does not exist: " + awsKey;
LOGGER.error(error);
throw new RuntimeException(error);
}
ZipArchiveEntry entry = new ZipArchiveEntry(destName);
try (InputStream inputStream = s3Service.getAssetByName(awsKey)) {
zipOutputStream.putArchiveEntry(entry);
byte[] buffer = new byte[1024];
int len;
while ((len = inputStream.read(buffer)) > 0) {
zipOutputStream.write(buffer, 0, len);
}
zipOutputStream.closeArchiveEntry();
} catch (Exception e) {
LOGGER.error("ZipService - downloadAndZip - failed processing of: " + awsKey + " due to error: " + e);
throw new RuntimeException(e);
}
}
}