From 806e7f932a58b64c5926007181218150d9186ce7 Mon Sep 17 00:00:00 2001 From: Svjatoslav Agejenko Date: Wed, 21 Jan 2026 23:34:38 +0200 Subject: [PATCH] Refactor task queue implementation to use `Map` for priority management and simplify processing logic --- .../task_processor/TaskPriorityQueue.java | 66 ++++++------------- .../commands/task_processor/TaskProcess.java | 5 ++ .../task_processor/TaskProcessorCommand.java | 59 +++-------------- 3 files changed, 35 insertions(+), 95 deletions(-) diff --git a/src/main/java/eu/svjatoslav/alyverkko_cli/commands/task_processor/TaskPriorityQueue.java b/src/main/java/eu/svjatoslav/alyverkko_cli/commands/task_processor/TaskPriorityQueue.java index 23d12df..e64bdb1 100644 --- a/src/main/java/eu/svjatoslav/alyverkko_cli/commands/task_processor/TaskPriorityQueue.java +++ b/src/main/java/eu/svjatoslav/alyverkko_cli/commands/task_processor/TaskPriorityQueue.java @@ -1,8 +1,7 @@ package eu.svjatoslav.alyverkko_cli.commands.task_processor; +import java.nio.file.Path; import java.util.*; -import java.util.function.Predicate; -import eu.svjatoslav.alyverkko_cli.commands.task_processor.TaskProcessorCommand.TaskQueueEntry; /** @@ -10,53 +9,30 @@ import eu.svjatoslav.alyverkko_cli.commands.task_processor.TaskProcessorCommand. * Uses a TreeSet for efficient insertion, polling, and iteration. */ public class TaskPriorityQueue { - private final TreeSet tasks; - - /** - * Constructs a new empty task priority queue. - */ - public TaskPriorityQueue() { - tasks = new TreeSet<>(); - } - - /** - * Adds a task to the priority queue. - * - * @param task the task to add (must not be null) - */ - public void add(TaskQueueEntry task) { - tasks.add(task); - } - - /** - * Removes and returns the highest priority task from the queue. - * - * @return the highest priority task, or null if the queue is empty - */ - public TaskQueueEntry poll() { - if (tasks.isEmpty()) { - return null; + // Path to priority + private final Map tasks = new HashMap<>(); + + public Path poll() { + if (tasks.isEmpty()) return null; + + Integer highestPriority = null; + Path filePath = null; + for (Map.Entry entry : tasks.entrySet()) { + if (highestPriority == null || highestPriority < (Integer) entry.getValue()) { + highestPriority = (Integer) entry.getValue(); + filePath = (Path) entry.getKey(); + } } - TaskQueueEntry task = tasks.first(); - tasks.remove(task); - return task; + + tasks.remove(filePath); + return filePath; } - /** - * Removes tasks matching the given predicate. - * - * @param predicate the condition for removal (must not be null) - */ - public void removeIf(Predicate predicate) { - tasks.removeIf(predicate); + public void add(Path filePath, int priority) { + tasks.put(filePath, priority); } - /** - * Returns all tasks in the queue in order of priority (highest priority first). - * - * @return a new list containing all tasks in priority order - */ - public List getAllTasks() { - return new ArrayList<>(tasks); + public void remove(Path filePath) { + tasks.remove(filePath); } } diff --git a/src/main/java/eu/svjatoslav/alyverkko_cli/commands/task_processor/TaskProcess.java b/src/main/java/eu/svjatoslav/alyverkko_cli/commands/task_processor/TaskProcess.java index 32f18b9..9235d3e 100644 --- a/src/main/java/eu/svjatoslav/alyverkko_cli/commands/task_processor/TaskProcess.java +++ b/src/main/java/eu/svjatoslav/alyverkko_cli/commands/task_processor/TaskProcess.java @@ -221,6 +221,11 @@ public class TaskProcess { args.add("--ctx-size " + task.model.getContextSizeTokens()); args.add("--batch-size 512"); + + // Maps AI model from filesystem to RAM without preloading it in advance. + // Reduces RAM usage and speeds up startup. + args.add("--mmap"); + args.add("--single-turn"); args.add("-n -1"); args.add("--file " + inputFile); diff --git a/src/main/java/eu/svjatoslav/alyverkko_cli/commands/task_processor/TaskProcessorCommand.java b/src/main/java/eu/svjatoslav/alyverkko_cli/commands/task_processor/TaskProcessorCommand.java index 0653afa..9fbaa51 100644 --- a/src/main/java/eu/svjatoslav/alyverkko_cli/commands/task_processor/TaskProcessorCommand.java +++ b/src/main/java/eu/svjatoslav/alyverkko_cli/commands/task_processor/TaskProcessorCommand.java @@ -108,7 +108,7 @@ public class TaskProcessorCommand implements Command { // Main loop: process tasks from the queue in priority order while (true) { // Process the highest priority task if available - TaskQueueEntry nextTask = taskQueue.poll(); + Path nextTask = taskQueue.poll(); if (nextTask != null) processTask(nextTask); // Check for filesystem events @@ -224,12 +224,11 @@ public class TaskProcessorCommand implements Command { * * @param entry the task entry containing the file path and priority. */ - private void processTask(TaskQueueEntry entry) throws IOException { - Path filePath = entry.getFilePath(); - File file = filePath.toFile(); + private void processTask(Path taskPath) throws IOException { + File file = taskPath.toFile(); if (!isMailProcessingNeeded(file)) { - System.out.println("Ignoring file: " + filePath.getFileName() + " (does not need processing now)"); + System.out.println("Ignoring file: " + taskPath.getFileName() + " (does not need processing now)"); return; } @@ -331,7 +330,6 @@ public class TaskProcessorCommand implements Command { return result; } - /** * Parses the "TOCOMPUTE:" line, which should look like: *
TOCOMPUTE: key1=value1 key2=value2 ...
@@ -351,15 +349,13 @@ public class TaskProcessorCommand implements Command { // Example format: "TOCOMPUTE: prompt=writer model=mistral" String[] parts = toComputeLine.substring("TOCOMPUTE: ".length()).split("\\s+"); - Map settings = new HashMap<>(); + Map result = new HashMap<>(); for (String part : parts) { String[] keyValue = part.split("="); - if (keyValue.length == 2) { - settings.put(keyValue[0], keyValue[1]); - } + if (keyValue.length == 2) result.put(keyValue[0], keyValue[1]); } - return settings; + return result; } /** @@ -471,44 +467,7 @@ public class TaskProcessorCommand implements Command { System.err.println("Invalid priority in file " + filePath.getFileName() + ": " + priorityStr); } } - taskQueue.add(new TaskQueueEntry(filePath, priority)); - } - - /** - * A static nested class representing a task in the queue, with a - * priority and a tiebreaker for sorting. - */ - public static class TaskQueueEntry implements Comparable { - private final Path filePath; - private final int priority; - private final String tiebreaker; - - public TaskQueueEntry(Path filePath, int priority) { - this.filePath = filePath; - this.priority = priority; - this.tiebreaker = UUID.randomUUID().toString(); - } - - public Path getFilePath() { - return filePath; - } - - public int getPriority() { - return priority; - } - - public String getTiebreaker() { - return tiebreaker; - } - - @Override - public int compareTo(TaskQueueEntry other) { - int priorityCompare = Integer.compare(this.priority, other.priority); - if (priorityCompare != 0) { - return -priorityCompare; // higher priority first - } - return this.tiebreaker.compareTo(other.tiebreaker); - } + taskQueue.add(filePath, priority); } /** @@ -517,7 +476,7 @@ public class TaskProcessorCommand implements Command { * @param filePath the file path to match and remove from the queue. */ private void removeTasksForFile(Path filePath) { - taskQueue.removeIf(entry -> entry.getFilePath().equals(filePath)); + taskQueue.remove(filePath); } } -- 2.20.1