使用线程同时对多个文件进行哈希 (sha1)

发布于 2024-12-13 17:31:31 字数 1518 浏览 3 评论 0原文

我有N个大文件(不少于250M)要哈希。这些文件位于 P 物理驱动器上。

我想用最大 K 个活动线程同时对它们进行哈希处理,但每个物理驱动器不能对超过 M 个文件进行哈希处理,因为它会减慢整个过程(我运行了一个测试,解析 61 个文件,并且使用 8 个线程,它比有 1 个线程;文件几乎都在同一个磁盘上)。

我想知道对此的最佳方法是什么:

  • 我可以使用 Executors.newFixedThreadPool(K)
  • 然后我将使用计数器提交任务以确定是否应该添加新任务。

我的代码是:

int K = 8;
int M = 1;
Queue<Path> queue = null; // get the files to hash
final ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(K);
final ConcurrentMap<FileStore, Integer> counter = new ConcurrentHashMap<>();
final ConcurrentMap<FileStore, Integer> maxCounter = new ConcurrentHashMap<>();
for (FileStore store : FileSystems.getDefault().getFileStores()) {
  counter.put(store, 0);
  maxCounter.put(store, M);
}
List<Future<Result>> result = new ArrayList<>();
while (!queue.isEmpty()) {
  final Path current = queue.poll();
  final FileStore store = Files.getFileStore(current);
  if (counter.get(store) < maxCounter.get(store)) {
    result.add(newFixedThreadPool.submit(new Callable<Result>() {

      @Override
      public Entry<Path, String> call() throws Exception {
        counter.put(store, counter.get(store) + 1);
        String hash = null; // Hash the file
        counter.put(store, counter.get(store) - 1);
        return new Result(path, hash);
      }

    }));
  } else queue.offer(current);
}

抛开潜在的非线程安全操作(就像我如何玩计数器),有没有更好的方法来实现我的目标?

我还认为这里的循环可能有点太多了,因为它可能会占用很多进程(几乎就像无限循环)。

I have N big files (no less than 250M) to hash. Those files are on P physical drives.

I'd like to hash them concurrently with maximum K active threads but I can not hash more than M files per physical drives because it slows down the whole process (I ran a test, parsing 61 files, and with 8 threads it was slower than with 1 thread; the file were almost all on the same disk).

I am wondering what would be the best approach to this :

  • I could use a Executors.newFixedThreadPool(K)
  • then I would submit the task using a counter to determine if I should add a new task.

My code would be :

int K = 8;
int M = 1;
Queue<Path> queue = null; // get the files to hash
final ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(K);
final ConcurrentMap<FileStore, Integer> counter = new ConcurrentHashMap<>();
final ConcurrentMap<FileStore, Integer> maxCounter = new ConcurrentHashMap<>();
for (FileStore store : FileSystems.getDefault().getFileStores()) {
  counter.put(store, 0);
  maxCounter.put(store, M);
}
List<Future<Result>> result = new ArrayList<>();
while (!queue.isEmpty()) {
  final Path current = queue.poll();
  final FileStore store = Files.getFileStore(current);
  if (counter.get(store) < maxCounter.get(store)) {
    result.add(newFixedThreadPool.submit(new Callable<Result>() {

      @Override
      public Entry<Path, String> call() throws Exception {
        counter.put(store, counter.get(store) + 1);
        String hash = null; // Hash the file
        counter.put(store, counter.get(store) - 1);
        return new Result(path, hash);
      }

    }));
  } else queue.offer(current);
}

Tossing aside the potential non thread safe operation (like how I play with counter), is there a better way to achieve my goal ?

I also think the loop here might be a little too much, as it may take up a lot of process (almost like an infinite loop).

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

嘴硬脾气大 2024-12-20 17:31:31

如果驱动器硬件配置在编译时未知,并且可能会被更改/升级,则很容易为每个驱动器使用线程池并使线程计数可由用户配置。我不熟悉“newFixedThreadPool” - 线程计数是一个可以在运行时更改以优化性能的属性吗?

If the drive hardware configuration is not known at compile time, and may be chaged/upgraded, it's tempting to use a thread pool per drive and make the thread counts user-configurable. I am not famililar with 'newFixedThreadPool' - is the thread count a property that can be changed at run time to optimize performance?

第几種人 2024-12-20 17:31:31

经过很长时间,我找到了一个解决方案来满足我的需求:我使用了 ExecutorService ,而不是整数计数器或 AtomicInteger 或其他任何东西,并且每个提交的任务都使用信号量 共享一个驱动器的每个文件。

喜欢:

ConcurrentMap<FileStore, Semaphore> map = new ConcurrentHashMap<>();
ExecutorService es = Executors.newFixedThreadPool(10);
for (Path path : listFile()) {
  final FileStore store = Files.getFileStore(path);
  final Semaphore semaphore = map.computeIfAbsent(store, key -> new Semaphore(getAllocatedCredits(store)));
  final int cost = computeCost(path);
  es.submit(() -> {
    semaphore.acquire(cost);
    try {
      ... some work ...
    } finally {
      semaphore.release(cost);
    }
  });
}


int getAllocatedCredits(FileStore store) {return 2;}
int computeCost(Path path) {return 1;}

请注意 Java 8 的帮助,尤其是在 computeIfAbsentsubmit 中。

After much time, I've found a solution to achieve my need: instead of integer counter, or AtomicInteger or whatever, I've used an ExecutorService and each submitted task use a Semaphore shared across each file of one drive.

Like:

ConcurrentMap<FileStore, Semaphore> map = new ConcurrentHashMap<>();
ExecutorService es = Executors.newFixedThreadPool(10);
for (Path path : listFile()) {
  final FileStore store = Files.getFileStore(path);
  final Semaphore semaphore = map.computeIfAbsent(store, key -> new Semaphore(getAllocatedCredits(store)));
  final int cost = computeCost(path);
  es.submit(() -> {
    semaphore.acquire(cost);
    try {
      ... some work ...
    } finally {
      semaphore.release(cost);
    }
  });
}


int getAllocatedCredits(FileStore store) {return 2;}
int computeCost(Path path) {return 1;}

Notice the help of Java 8, especially in computeIfAbsent and submit.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文