#java #threadpool #java-threads
Вопрос:
Этот класс проходит путь, и для каждого файла, в который он попадает, он запускает поток в ThreadPoolExecutor, который собирает данные ему объекты в хэш-карту. У меня есть другой поток, который отслеживает хэш-карту, и когда на этой карте содержится 5000 элементов, она сбрасывается в базу данных MySQL, а затем записанные записи удаляются из хэш-карты, и все продолжается в том же духе.
Однако, как только ходок доберется до того места, где он поразил более 2 миллионов файлов, фактическая очистка хэш-карты отстала почти на миллион записей, поэтому я хотел бы иметь возможность приостановить перемещение папки до тех пор, пока дампы данных не догонят, а затем возобновить … повторите полоскание …
Можно ли приостановить этот урок после того, как он начнется? ИЛИ, возможно, есть какой-нибудь способ замедлить это?
public class WalkFilePaths implements Runnable{
public WalkFilePaths(Path rootPath, ThreadPoolExecutor executor) {
this.rootPath = rootPath;
this.executor = executor;
}
private final Path rootPath;
private static ThreadPoolExecutor executor;
private static final FileDataManager fileDataManager = new FileDataManager();
@Override public void run() {
try {
FolderWalker folderWalker = new FolderWalker();
Files.walkFileTree(rootPath,folderWalker);
}
catch (IOException e) {e.printStackTrace();}
}
public static class FolderWalker extends SimpleFileVisitor<Path> {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) {
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult visitFile(Path path, BasicFileAttributes attrs) {
if(attrs.isRegularFile()) {
executor.execute(fileDataManager.addFileMap(new FileDataModel(path.toFile(), attrs.creationTime().toInstant(), attrs.lastAccessTime().toInstant())));
}
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException e) {
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult visitFileFailed(Path file, IOException exc) {
return FileVisitResult.CONTINUE;
}
}
}
Ответ №1:
Вы можете управлять процессом с помощью счетного семафора.
Концепция этого проста: вы начинаете с начального значения 5_000 разрешений, выпущенных на вашем семафоре; каждый раз, когда ваш код запрашивает новый файл, ему потребуется одно разрешение. Когда у семафора закончатся разрешения, ваш код будет ждать (блокируя вызов semaphore.acquire()
), пока разрешения не будут выпущены снова. Ваш существующий код release()
должен будет обновляться каждый раз, когда он использует накопленные данные.
Упрощенная реализация вышеупомянутой концепции была бы:
class FileDataManager {
private static final int BATCH_SIZE = 5_000;
private final Semaphore semaphore = new Semaphore(BATCH_SIZE);
private Map<String, String> data = new HashMap<>(BATCH_SIZE);
Runnable addFileMap(FileDataModel fileDataModel) {
try {
//Try to acquire a permit, or wait (blocking call) until a permit is available
semaphore.acquire();
return new Runnable() {
@Override
public void run() {
//Process file...
data.put(fileDataModel.toString(), fileDataModel.toString());
}
};
} catch (InterruptedException ex) {
Logger.getLogger(FileDataManager.class.getName()).log(Level.SEVERE, null, ex);
throw new RuntimeException(ex);
}
}
public int accumulatedFileCount() {
return data.size();
}
public void releasePermits() {
semaphore.release(BATCH_SIZE);
}
}
Другой поток, отслеживающий карту, затем:
//...
if (fileDataManager.accumulatedFileCount() >= 5_000) {
// store data in RDBMS
fileDataManager.releasePermits();
}
//...