package eu.svjatoslav.alyverkko_cli.commands.mail_correspondant;
import eu.svjatoslav.alyverkko_cli.*;
+import eu.svjatoslav.alyverkko_cli.configuration.ConfigurationHelper;
import eu.svjatoslav.alyverkko_cli.model.Model;
import eu.svjatoslav.alyverkko_cli.model.ModelLibrary;
import eu.svjatoslav.commons.cli_helper.parameter_parser.Parser;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.*;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
import static eu.svjatoslav.alyverkko_cli.Main.configuration;
-import static eu.svjatoslav.alyverkko_cli.configuration.ConfigurationHelper.getConfigurationFile;
-import static eu.svjatoslav.alyverkko_cli.configuration.ConfigurationHelper.loadConfiguration;
import static eu.svjatoslav.commons.file.IOHelper.getFileContentsAsString;
import static eu.svjatoslav.commons.file.IOHelper.saveToFile;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
+
/**
- * <p>Monitors a designated mail directory for text files containing "TOCOMPUTE:" markers.
- * When new or modified files are detected, it processes them with appropriate AI models
- * and appends the results using a standardized format.</p>
- * <p>The processing pipeline includes:
- * <ol>
- * <li>Initial scan of existing files</li>
- * <li>WatchService registration for real-time monitoring</li>
- * <li>Query construction from file metadata</li>
- * <li>AI response formatting and appending</li>
- * </ol>
- * </p>
- * <p>This implementation uses a sleep delay (1 second) after detecting file changes to allow complete writes before
- * processing. The response format follows org-mode conventions for easy reading and further processing.</p>
+ * The MailCorrespondentCommand continuously monitors a specified mail
+ * directory for new or modified text files, checks if they have a
+ * "TOCOMPUTE:" marker, and if so, adds them to a priority queue to be
+ * processed in priority order. Once processed, results are appended to
+ * the same file.
+ * <p>
+ * Usage:
+ * <pre>
+ * alyverkko-cli mail
+ * </pre>
*/
-
public class MailCorrespondentCommand implements Command {
/**
*/
File mailDir;
+ /**
+ * Priority queue of tasks to process, sorted by priority and a
+ * random tiebreaker.
+ */
+ private final PriorityQueue<TaskQueueEntry> taskQueue;
+
+ public MailCorrespondentCommand() {
+ Comparator<TaskQueueEntry> comparator = (a, b) -> {
+ int priorityCompare = Integer.compare(b.priority, a.priority);
+ if (priorityCompare != 0) {
+ return priorityCompare;
+ }
+ return a.tiebreaker.compareTo(b.tiebreaker);
+ };
+ this.taskQueue = new PriorityQueue<>(comparator);
+ }
+
/**
* @return the name of this command, i.e., "mail".
*/
/**
* Executes the "mail" command, loading configuration, starting a
- * WatchService on the mail directory, and running an infinite loop
- * that processes newly discovered tasks.
+ * WatchService on the mail directory, adding existing files to the
+ * task queue, and processing tasks in priority order.
*
* @param cliArguments the command-line arguments following the "mail" subcommand.
* @throws IOException if reading/writing tasks fails.
return;
}
- configuration = loadConfiguration(getConfigurationFile(configFileOption));
+ configuration = ConfigurationHelper.loadConfiguration(ConfigurationHelper.getConfigurationFile(configFileOption));
if (configuration == null) {
System.out.println("Failed to load configuration file");
return;
// Set up directory watch service
initializeFileWatcher();
- // Process any existing files that might already be in the directory
+ // Add all existing mail files to the queue
initialMailScanAndReply();
System.out.println("Mail correspondent running. Press CTRL+c to terminate.");
- // Main loop: watch for file events
+ // Main loop: process tasks from the queue in priority order
while (true) {
- WatchKey key;
- try {
- key = directoryWatcher.take();
- } catch (InterruptedException e) {
- System.out.println("Interrupted while waiting for file system events. Exiting.");
- break;
- }
+ // Process the highest priority task if available
+ if (!taskQueue.isEmpty()) processTask(taskQueue.poll());
- System.out.println("Detected filesystem event.");
+ // Check for filesystem events
+ WatchKey key = directoryWatcher.poll();
// Sleep briefly to allow the file to be fully written
Thread.sleep(1000);
- processDetectedFilesystemEvents(key);
+ if (key != null) {
+ System.out.println("Detected filesystem events in mail directory. Processing ... ");
+ processDetectedFilesystemEvents(key);
+ }
if (!key.reset()) {
break;
/**
* Performs an initial scan of existing files in the mail directory,
- * processing those that need AI inference (i.e., that start with "TOCOMPUTE:").
+ * adding those that need processing to the task queue.
*
- * @throws IOException if reading files fails.
- * @throws InterruptedException if the thread is interrupted.
+ * @throws IOException if file scanning fails.
*/
- private void initialMailScanAndReply() throws IOException, InterruptedException {
+ private void initialMailScanAndReply() throws IOException {
File[] files = mailDir.listFiles();
- if (files == null) return;
+ if (files == null) {
+ return;
+ }
for (File file : files) {
- processMailIfNeeded(file);
+ considerFileForQueuing(file.toPath());
}
}
return false;
}
+ // Ensure the file exists
+ if (!file.exists()) {
+ return false;
+ }
+
// Check if it's a regular file
if (!file.isFile()) {
return false;
* @throws IOException if file reading fails.
*/
private static boolean fileHasToComputeMarker(File file) throws IOException {
+ String firstLine = getFirstLine(file);
+ return firstLine != null && firstLine.startsWith("TOCOMPUTE:");
+ }
+
+ private static String getFirstLine(File file) throws IOException {
try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
- String firstLine = reader.readLine();
- return firstLine != null && firstLine.startsWith("TOCOMPUTE:");
+ return reader.readLine();
}
}
saveAiResponseToFile(file, mailQuery, aiGeneratedResponse);
}
- private static void saveAiResponseToFile(File file, MailQuery mailQuery, String aiGeneratedResponse) throws IOException {
+ private static void saveAiResponseToFile(File file, MailQuery mailQuery, String aiResponse) throws IOException {
// Build new content
StringBuilder resultFileContent = new StringBuilder();
// Append the AI response block
resultFileContent
.append("* ASSISTANT:\n")
- .append(aiGeneratedResponse)
+ .append(aiResponse)
.append("\n");
// Write the combined result back to the same file
saveToFile(file, resultFileContent.toString());
}
+
+ /**
+ * Processes a task by reading the file, building the MailQuery,
+ * running the AI query, and saving the response.
+ *
+ * @param entry the task entry containing the file path and priority.
+ */
+ private void processTask(TaskQueueEntry entry) throws IOException {
+ Path filePath = entry.getFilePath();
+ File file = filePath.toFile();
+
+ if (!isMailProcessingNeeded(file)) {
+ System.out.println("Ignoring file: " + filePath.getFileName() + " (does not need processing now)");
+ return;
+ }
+
+ try {
+ MailQuery mailQuery = buildMailQueryFromFile(file);
+ AiTask aiTask = new AiTask(mailQuery);
+ String aiGeneratedResponse = aiTask.runAiQuery();
+
+ saveAiResponseToFile(file, mailQuery, aiGeneratedResponse);
+ } catch (IOException | InterruptedException | RuntimeException e) {
+ e.printStackTrace();
+ }
+ }
+
+
+ /**
+ * Builds a string for the first line of the output file indicating
+ * that processing is done.
+ *
+ * @param mailQuery the query that was processed.
+ * @return a string for the first line.
+ */
private static String getDoneLine(MailQuery mailQuery) {
- return "DONE: prompt=" + mailQuery.systemPromptName + " model="+ mailQuery.model.alias + " duration=" + getDuration(mailQuery.startTime, mailQuery.endTime) + "\n";
+ return "DONE: prompt=" + mailQuery.systemPromptName + " model=" + mailQuery.model.alias + " duration=" + getDuration(mailQuery.startTime, mailQuery.endTime) + "\n";
}
+
private static String getDuration(long startTime, long endTime) {
long durationMillis = endTime - startTime;
return durationHours + "h";
}
+ /**
+ * Builds a MailQuery object from the contents of a file.
+ *
+ * @param file the file to read.
+ * @return the constructed MailQuery.
+ * @throws IOException if reading the file fails.
+ */
private MailQuery buildMailQueryFromFile(File file) throws IOException {
MailQuery result = new MailQuery();
- // Read the mail content
String inputFileContent = getFileContentsAsString(file);
-
- // Split into first line and user prompt
int firstNewLineIndex = inputFileContent.indexOf('\n');
if (firstNewLineIndex == -1) {
- throw new IllegalArgumentException("Input file is only one line long. Content: " + inputFileContent);
+ throw new IllegalArgumentException("Input file is only one line long.");
}
- // The First line should start with "TOCOMPUTE:" and contain settings
String firstLine = inputFileContent.substring(0, firstNewLineIndex);
Map<String, String> fileProcessingSettings = parseSettings(firstLine);
}
result.model = modelOptional.get();
+ // Set priority
+ String priorityStr = fileProcessingSettings.get("priority");
+ result.priority = 0;
+ if (priorityStr != null) {
+ try {
+ result.priority = Integer.parseInt(priorityStr);
+ } catch (NumberFormatException e) {
+ System.err.println("Invalid priority in file: " + priorityStr);
+ }
+ }
+
return result;
}
+
/**
* Parses the "TOCOMPUTE:" line, which should look like:
* <pre>TOCOMPUTE: key1=value1 key2=value2 ...</pre>
}
/**
- * Handles the filesystem events from the WatchService (e.g. file creation
- * or modification), then processes those files if necessary.
+ * Handles the filesystem events from the WatchService (e.g., file
+ * creation or modification), then adds the file to the task queue
+ * if it needs processing.
*
* @param key the watch key containing the events.
- * @throws IOException if file reading/writing fails.
- * @throws InterruptedException if the AI process is interrupted.
+ * @throws IOException if file processing fails.
*/
- private void processDetectedFilesystemEvents(WatchKey key) throws IOException, InterruptedException {
+ private void processDetectedFilesystemEvents(WatchKey key) throws IOException {
for (WatchEvent<?> event : key.pollEvents()) {
WatchEvent.Kind<?> kind = event.kind();
- // Skip OVERFLOW event
+ // Skip OVERFLOW events
if (kind == StandardWatchEventKinds.OVERFLOW) {
continue;
}
Path filename = ((WatchEvent<Path>) event).context();
System.out.println("Event: " + kind + " for file: " + filename);
- // Process the file
+ // Add to task queue if needed
if (kind == ENTRY_CREATE || kind == ENTRY_MODIFY) {
- File file = mailDir.toPath().resolve(filename).toFile();
- processMailIfNeeded(file);
+ Path filePath = mailDir.toPath().resolve(filename);
+ considerFileForQueuing(filePath);
}
}
}
this.directoryWatcher = FileSystems.getDefault().newWatchService();
Paths.get(mailDir.getAbsolutePath()).register(directoryWatcher, ENTRY_CREATE, ENTRY_MODIFY);
}
+
+ /**
+ * Adds a file to the task queue if it needs processing.
+ *
+ * @param filePath the path to the file to check.
+ * @throws IOException if reading the first line fails.
+ */
+ private void considerFileForQueuing(Path filePath) throws IOException {
+ File file = filePath.toFile();
+ if (!isMailProcessingNeeded(file)) return;
+
+ String firstLine = getFirstLine(file);
+ Map<String, String> settings = parseSettings(firstLine);
+ int priority = 0;
+ String priorityStr = settings.get("priority");
+ if (priorityStr != null) {
+ try {
+ priority = Integer.parseInt(priorityStr);
+ } catch (NumberFormatException e) {
+ System.err.println("Invalid priority in file " + filePath.getFileName() + ": " + priorityStr);
+ }
+ }
+ taskQueue.offer(new TaskQueueEntry(filePath, priority));
+ }
+
+ /**
+ * A static nested class representing a task in the queue, with a
+ * priority and a tiebreaker for sorting.
+ */
+ private static class TaskQueueEntry implements Comparable<TaskQueueEntry> {
+ private final Path filePath;
+ private final int priority;
+ private final String tiebreaker;
+
+ public TaskQueueEntry(Path filePath, int priority) {
+ this.filePath = filePath;
+ this.priority = priority;
+ this.tiebreaker = UUID.randomUUID().toString();
+ }
+
+ public Path getFilePath() {
+ return filePath;
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public String getTiebreaker() {
+ return tiebreaker;
+ }
+
+ @Override
+ public int compareTo(TaskQueueEntry other) {
+ int priorityCompare = Integer.compare(this.priority, other.priority);
+ if (priorityCompare != 0) {
+ return -priorityCompare; // higher priority first
+ }
+ return this.tiebreaker.compareTo(other.tiebreaker);
+ }
+ }
+
}