fix(hot-runner): validate persisted registry state and add dispatcher safeguards

Validate runner entries when loading from hot-runners.json. Discard
corrupted entries with warnings. Add validateAndRepair() method for
runtime recovery. Validate data before persisting to prevent writing
corrupt state. Handle corrupt persistence files (invalid JSON)
gracefully. Rewrite executeWithTimeout using Promise.race to clean up
transport connections on timeout. Fix pre-existing ESLint violations
in dispatcher and test files.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
frostebite
2026-03-05 13:00:47 +00:00
parent 1bb31f3e98
commit 7c0c4c2072
5 changed files with 465 additions and 64 deletions
Generated Vendored
+136 -27
View File
@@ -9796,23 +9796,28 @@ class HotRunnerDispatcher {
}
/**
* Execute a job on a transport with a timeout guard.
* On timeout, disconnects the transport to release the connection
* and prevent the orphaned sendJob promise from holding resources.
*/
async executeWithTimeout(transport, request) {
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
reject(new Error(`[HotRunner] Job ${request.jobId} timed out after ${request.timeout}ms`));
const TIMEOUT_SENTINEL = Symbol('timeout');
const timeoutPromise = new Promise((resolve) => {
setTimeout(() => {
resolve(TIMEOUT_SENTINEL);
}, request.timeout);
transport
.sendJob(request)
.then((result) => {
clearTimeout(timer);
resolve(result);
})
.catch((error) => {
clearTimeout(timer);
reject(error);
});
});
const result = await Promise.race([transport.sendJob(request), timeoutPromise]);
if (result === TIMEOUT_SENTINEL) {
// Disconnect the transport to clean up the orphaned sendJob call
try {
await transport.disconnect();
}
catch (disconnectError) {
orchestrator_logger_1.default.logWarning(`[HotRunner] Error disconnecting transport after timeout for job ${request.jobId}: ${disconnectError.message}`);
}
throw new Error(`[HotRunner] Job ${request.jobId} timed out after ${request.timeout}ms`);
}
return result;
}
sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
@@ -10006,6 +10011,45 @@ const nanoid_1 = __nccwpck_require__(17592);
const orchestrator_logger_1 = __importDefault(__nccwpck_require__(32549));
const generateId = (0, nanoid_1.customAlphabet)('abcdefghijklmnopqrstuvwxyz0123456789', 12);
const PERSISTENCE_FILENAME = 'hot-runners.json';
const VALID_RUNNER_STATES = new Set(['idle', 'busy', 'starting', 'stopping', 'unhealthy']);
/**
* Validate that a restored runner entry has all required fields with correct types.
* Returns true if the entry is a valid HotRunnerStatus, false otherwise.
*/
function isValidRunnerStatus(entry) {
if (typeof entry !== 'object' || entry === null) {
return false;
}
const record = entry;
return (typeof record.id === 'string' &&
record.id.length > 0 &&
typeof record.state === 'string' &&
VALID_RUNNER_STATES.has(record.state) &&
typeof record.unityVersion === 'string' &&
typeof record.platform === 'string' &&
typeof record.uptime === 'number' &&
typeof record.jobsCompleted === 'number' &&
typeof record.lastHealthCheck === 'string' &&
typeof record.memoryUsageMB === 'number');
}
/**
* Validate that a restored config entry has all required fields with correct types.
* Returns true if the entry is a valid HotRunnerConfig, false otherwise.
*/
function isValidRunnerConfig(entry) {
if (typeof entry !== 'object' || entry === null) {
return false;
}
const record = entry;
return (typeof record.enabled === 'boolean' &&
typeof record.transport === 'string' &&
['websocket', 'grpc', 'named-pipe'].includes(record.transport) &&
typeof record.host === 'string' &&
typeof record.port === 'number' &&
typeof record.healthCheckInterval === 'number' &&
typeof record.maxIdleTime === 'number' &&
typeof record.maxJobsBeforeRecycle === 'number');
}
class HotRunnerRegistry {
constructor(persistenceDirectory) {
this.runners = new Map();
@@ -10099,14 +10143,53 @@ class HotRunnerRegistry {
get size() {
return this.runners.size;
}
/**
* Validate all runners in the registry and reset invalid ones to 'unhealthy'.
* Returns the number of runners that were repaired.
*/
validateAndRepair() {
let repaired = 0;
for (const [id, status] of this.runners) {
// Cast to unknown to bypass the type guard narrowing to 'never',
// since the Map is typed as HotRunnerStatus but entries may have
// been corrupted via direct deserialization or unsafe casts.
const entry = status;
if (!isValidRunnerStatus(entry)) {
orchestrator_logger_1.default.logWarning(`[HotRunner] Runner ${id} has invalid state, marking as unhealthy`);
this.runners.set(id, {
id,
state: 'unhealthy',
unityVersion: typeof entry.unityVersion === 'string' ? entry.unityVersion : 'unknown',
platform: typeof entry.platform === 'string' ? entry.platform : 'unknown',
uptime: typeof entry.uptime === 'number' ? entry.uptime : 0,
jobsCompleted: typeof entry.jobsCompleted === 'number' ? entry.jobsCompleted : 0,
lastHealthCheck: typeof entry.lastHealthCheck === 'string' ? entry.lastHealthCheck : new Date().toISOString(),
memoryUsageMB: typeof entry.memoryUsageMB === 'number' ? entry.memoryUsageMB : 0,
});
repaired++;
}
}
if (repaired > 0) {
this.persist();
}
return repaired;
}
/**
* Persist current registry state to disk for crash recovery.
* Validates data before writing to prevent persisting corrupt state.
*/
persist() {
if (!this.persistencePath) {
return;
}
try {
// Validate data before persisting
for (const [id, status] of this.runners) {
if (!isValidRunnerStatus(status)) {
orchestrator_logger_1.default.logWarning(`[HotRunner] Skipping persistence -- runner ${id} has invalid state`);
return;
}
}
const data = {
runners: Object.fromEntries(this.runners),
configs: Object.fromEntries(this.configs),
@@ -10123,31 +10206,57 @@ class HotRunnerRegistry {
}
/**
* Load registry state from disk. Returns the number of runners restored.
* Validates each restored entry and discards corrupt entries with warnings.
* If the persistence file itself is corrupt (invalid JSON), starts with
* an empty registry.
*/
loadFromDisk() {
if (!this.persistencePath || !node_fs_1.default.existsSync(this.persistencePath)) {
return 0;
}
let data;
try {
const raw = node_fs_1.default.readFileSync(this.persistencePath, 'utf8');
const data = JSON.parse(raw);
if (data.runners) {
for (const [id, status] of Object.entries(data.runners)) {
this.runners.set(id, status);
}
}
if (data.configs) {
for (const [id, config] of Object.entries(data.configs)) {
this.configs.set(id, config);
}
}
orchestrator_logger_1.default.log(`[HotRunner] Restored ${this.runners.size} runner(s) from disk`);
return this.runners.size;
data = JSON.parse(raw);
}
catch (error) {
orchestrator_logger_1.default.logWarning(`[HotRunner] Failed to load registry from disk: ${error.message}`);
orchestrator_logger_1.default.logWarning(`[HotRunner] Persistence file is corrupt, starting with empty registry: ${error.message}`);
return 0;
}
if (typeof data !== 'object' || data === null) {
orchestrator_logger_1.default.logWarning('[HotRunner] Persistence file has invalid structure, starting with empty registry');
return 0;
}
let discarded = 0;
if (data.runners && typeof data.runners === 'object') {
for (const [id, status] of Object.entries(data.runners)) {
if (isValidRunnerStatus(status)) {
this.runners.set(id, status);
}
else {
orchestrator_logger_1.default.logWarning(`[HotRunner] Discarding invalid runner entry '${id}' from persistence file`);
discarded++;
}
}
}
if (data.configs && typeof data.configs === 'object') {
for (const [id, config] of Object.entries(data.configs)) {
// Only restore configs for runners that were successfully restored
if (this.runners.has(id)) {
if (isValidRunnerConfig(config)) {
this.configs.set(id, config);
}
else {
orchestrator_logger_1.default.logWarning(`[HotRunner] Discarding invalid config entry '${id}' from persistence file`);
}
}
}
}
if (discarded > 0) {
orchestrator_logger_1.default.logWarning(`[HotRunner] Discarded ${discarded} invalid runner(s) from persistence file`);
}
orchestrator_logger_1.default.log(`[HotRunner] Restored ${this.runners.size} runner(s) from disk`);
return this.runners.size;
}
}
exports.HotRunnerRegistry = HotRunnerRegistry;
Generated Vendored
+1 -1
View File
File diff suppressed because one or more lines are too long