mirror of
https://github.com/game-ci/unity-builder.git
synced 2026-05-31 22:06:16 -07:00
Compare commits
5 Commits
feature/in
...
feature/ho
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3cac1845e3 | ||
|
|
41f00bd1f9 | ||
|
|
7c0c4c2072 | ||
|
|
1bb31f3e98 | ||
|
|
49b37f7831 |
@@ -1,5 +1,4 @@
|
||||
{
|
||||
"root": true,
|
||||
"plugins": ["jest", "@typescript-eslint", "prettier", "unicorn"],
|
||||
"extends": ["plugin:unicorn/recommended", "plugin:github/recommended", "plugin:prettier/recommended"],
|
||||
"parser": "@typescript-eslint/parser",
|
||||
|
||||
37
action.yml
37
action.yml
@@ -279,24 +279,35 @@ inputs:
|
||||
description:
|
||||
'[Orchestrator] Specifies the repo for the unity builder. Useful if you forked the repo for testing, features, or
|
||||
fixes.'
|
||||
syncStrategy:
|
||||
description: 'Workspace sync strategy: full, git-delta, direct-input, storage-pull'
|
||||
|
||||
hotRunnerEnabled:
|
||||
description: '[HotRunner] Use persistent hot runner for builds (requires pre-registered runners)'
|
||||
required: false
|
||||
default: 'full'
|
||||
syncInputRef:
|
||||
description: 'URI for direct-input or storage-pull content (storage://remote/path or file path)'
|
||||
default: 'false'
|
||||
hotRunnerTransport:
|
||||
description: '[HotRunner] Transport protocol for hot runner communication: websocket, grpc, named-pipe'
|
||||
required: false
|
||||
syncStorageRemote:
|
||||
description: 'rclone remote name for storage-backed inputs (defaults to rcloneRemote)'
|
||||
default: 'websocket'
|
||||
hotRunnerHost:
|
||||
description: '[HotRunner] Hot runner host address'
|
||||
required: false
|
||||
syncRevertAfter:
|
||||
description: 'Revert overlaid changes after job completion'
|
||||
default: 'localhost'
|
||||
hotRunnerPort:
|
||||
description: '[HotRunner] Hot runner port number'
|
||||
required: false
|
||||
default: '9090'
|
||||
hotRunnerHealthInterval:
|
||||
description: '[HotRunner] Health check interval in seconds'
|
||||
required: false
|
||||
default: '30'
|
||||
hotRunnerMaxIdle:
|
||||
description: '[HotRunner] Maximum idle time in seconds before recycling runner'
|
||||
required: false
|
||||
default: '3600'
|
||||
hotRunnerFallbackToCold:
|
||||
description: '[HotRunner] Fall back to cold build if no hot runner available'
|
||||
required: false
|
||||
default: 'true'
|
||||
syncStatePath:
|
||||
description: 'Path to sync state file for delta tracking'
|
||||
required: false
|
||||
default: '.game-ci/sync-state.json'
|
||||
|
||||
outputs:
|
||||
volume:
|
||||
|
||||
1285
dist/index.js
generated
vendored
1285
dist/index.js
generated
vendored
File diff suppressed because it is too large
Load Diff
2
dist/index.js.map
generated
vendored
2
dist/index.js.map
generated
vendored
File diff suppressed because one or more lines are too long
131
src/index.ts
131
src/index.ts
@@ -3,8 +3,8 @@ import { Action, BuildParameters, Cache, Orchestrator, Docker, ImageTag, Output
|
||||
import { Cli } from './model/cli/cli';
|
||||
import MacBuilder from './model/mac-builder';
|
||||
import PlatformSetup from './model/platform-setup';
|
||||
import { IncrementalSyncService } from './model/orchestrator/services/sync';
|
||||
import { SyncStrategy } from './model/orchestrator/services/sync/sync-state';
|
||||
import { HotRunnerService } from './model/orchestrator/services/hot-runner';
|
||||
import { HotRunnerConfig } from './model/orchestrator/services/hot-runner/hot-runner-types';
|
||||
|
||||
async function runMain() {
|
||||
try {
|
||||
@@ -23,35 +23,46 @@ async function runMain() {
|
||||
|
||||
let exitCode = -1;
|
||||
|
||||
if (buildParameters.providerStrategy === 'local') {
|
||||
core.info('Building locally');
|
||||
// Hot runner path: attempt to use a persistent Unity editor instance
|
||||
if (buildParameters.hotRunnerEnabled) {
|
||||
core.info('[HotRunner] Hot runner mode enabled, attempting hot build...');
|
||||
|
||||
// Apply incremental sync strategy before build
|
||||
const syncStrategy = buildParameters.syncStrategy as SyncStrategy;
|
||||
if (syncStrategy !== 'full') {
|
||||
core.info(`[Sync] Applying sync strategy: ${syncStrategy}`);
|
||||
await applySyncStrategy(buildParameters, workspace);
|
||||
}
|
||||
const hotRunnerConfig: HotRunnerConfig = {
|
||||
enabled: true,
|
||||
transport: buildParameters.hotRunnerTransport,
|
||||
host: buildParameters.hotRunnerHost,
|
||||
port: buildParameters.hotRunnerPort,
|
||||
healthCheckInterval: buildParameters.hotRunnerHealthInterval,
|
||||
maxIdleTime: buildParameters.hotRunnerMaxIdle,
|
||||
maxJobsBeforeRecycle: 0, // no automatic recycle by job count
|
||||
};
|
||||
|
||||
await PlatformSetup.setup(buildParameters, actionFolder);
|
||||
exitCode =
|
||||
process.platform === 'darwin'
|
||||
? await MacBuilder.run(actionFolder)
|
||||
: await Docker.run(baseImage.toString(), {
|
||||
workspace,
|
||||
actionFolder,
|
||||
...buildParameters,
|
||||
});
|
||||
const hotRunnerService = new HotRunnerService();
|
||||
|
||||
// Revert overlays after job completion if configured
|
||||
if (buildParameters.syncRevertAfter && syncStrategy !== 'full') {
|
||||
core.info('[Sync] Reverting overlay changes after job completion');
|
||||
try {
|
||||
await IncrementalSyncService.revertOverlays(workspace, buildParameters.syncStatePath);
|
||||
} catch (revertError) {
|
||||
core.warning(`[Sync] Overlay revert failed: ${(revertError as Error).message}`);
|
||||
try {
|
||||
await hotRunnerService.initialize(hotRunnerConfig);
|
||||
const result = await hotRunnerService.submitBuild(buildParameters, (output) => {
|
||||
core.info(output);
|
||||
});
|
||||
|
||||
exitCode = result.exitCode;
|
||||
core.info(`[HotRunner] Build completed with exit code ${exitCode}`);
|
||||
await hotRunnerService.shutdown();
|
||||
} catch (hotRunnerError) {
|
||||
await hotRunnerService.shutdown();
|
||||
|
||||
if (buildParameters.hotRunnerFallbackToCold) {
|
||||
core.warning(
|
||||
`[HotRunner] Hot runner failed: ${(hotRunnerError as Error).message}. Falling back to cold build.`,
|
||||
);
|
||||
exitCode = await runColdBuild(buildParameters, baseImage, workspace, actionFolder);
|
||||
} else {
|
||||
throw hotRunnerError;
|
||||
}
|
||||
}
|
||||
} else if (buildParameters.providerStrategy === 'local') {
|
||||
core.info('Building locally');
|
||||
exitCode = await runColdBuild(buildParameters, baseImage, workspace, actionFolder);
|
||||
} else {
|
||||
await Orchestrator.run(buildParameters, baseImage.toString());
|
||||
exitCode = 0;
|
||||
@@ -70,57 +81,27 @@ async function runMain() {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply the configured sync strategy to the workspace before build.
|
||||
*/
|
||||
async function applySyncStrategy(buildParameters: BuildParameters, workspace: string): Promise<void> {
|
||||
const strategy = buildParameters.syncStrategy as SyncStrategy;
|
||||
const resolvedStrategy = IncrementalSyncService.resolveStrategy(strategy, workspace, buildParameters.syncStatePath);
|
||||
async function runColdBuild(
|
||||
buildParameters: BuildParameters,
|
||||
baseImage: ImageTag,
|
||||
workspace: string,
|
||||
actionFolder: string,
|
||||
): Promise<number> {
|
||||
if (buildParameters.providerStrategy === 'local') {
|
||||
core.info('Building locally');
|
||||
await PlatformSetup.setup(buildParameters, actionFolder);
|
||||
|
||||
if (resolvedStrategy === 'full') {
|
||||
core.info('[Sync] Resolved to full sync (no incremental state available)');
|
||||
return process.platform === 'darwin'
|
||||
? await MacBuilder.run(actionFolder)
|
||||
: await Docker.run(baseImage.toString(), {
|
||||
workspace,
|
||||
actionFolder,
|
||||
...buildParameters,
|
||||
});
|
||||
} else {
|
||||
await Orchestrator.run(buildParameters, baseImage.toString());
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
switch (resolvedStrategy) {
|
||||
case 'git-delta': {
|
||||
const targetReference = buildParameters.gitSha || buildParameters.branch;
|
||||
const changedFiles = await IncrementalSyncService.syncGitDelta(
|
||||
workspace,
|
||||
targetReference,
|
||||
buildParameters.syncStatePath,
|
||||
);
|
||||
core.info(`[Sync] Git delta sync applied: ${changedFiles} file(s) changed`);
|
||||
break;
|
||||
}
|
||||
case 'direct-input': {
|
||||
if (!buildParameters.syncInputRef) {
|
||||
throw new Error('[Sync] direct-input strategy requires syncInputRef to be set');
|
||||
}
|
||||
const overlays = await IncrementalSyncService.applyDirectInput(
|
||||
workspace,
|
||||
buildParameters.syncInputRef,
|
||||
buildParameters.syncStorageRemote || undefined,
|
||||
buildParameters.syncStatePath,
|
||||
);
|
||||
core.info(`[Sync] Direct input applied: ${overlays.length} overlay(s)`);
|
||||
break;
|
||||
}
|
||||
case 'storage-pull': {
|
||||
if (!buildParameters.syncInputRef) {
|
||||
throw new Error('[Sync] storage-pull strategy requires syncInputRef to be set');
|
||||
}
|
||||
const pulledFiles = await IncrementalSyncService.syncStoragePull(workspace, buildParameters.syncInputRef, {
|
||||
rcloneRemote: buildParameters.syncStorageRemote || undefined,
|
||||
syncRevertAfter: buildParameters.syncRevertAfter,
|
||||
statePath: buildParameters.syncStatePath,
|
||||
});
|
||||
core.info(`[Sync] Storage pull complete: ${pulledFiles.length} file(s)`);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
core.warning(`[Sync] Unknown sync strategy: ${resolvedStrategy}`);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -106,11 +106,13 @@ class BuildParameters {
|
||||
public cacheUnityInstallationOnMac!: boolean;
|
||||
public unityHubVersionOnMac!: string;
|
||||
public dockerWorkspacePath!: string;
|
||||
public syncStrategy!: string;
|
||||
public syncInputRef!: string;
|
||||
public syncStorageRemote!: string;
|
||||
public syncRevertAfter!: boolean;
|
||||
public syncStatePath!: string;
|
||||
public hotRunnerEnabled!: boolean;
|
||||
public hotRunnerTransport!: 'websocket' | 'grpc' | 'named-pipe';
|
||||
public hotRunnerHost!: string;
|
||||
public hotRunnerPort!: number;
|
||||
public hotRunnerHealthInterval!: number;
|
||||
public hotRunnerMaxIdle!: number;
|
||||
public hotRunnerFallbackToCold!: boolean;
|
||||
|
||||
public static shouldUseRetainedWorkspaceMode(buildParameters: BuildParameters) {
|
||||
return buildParameters.maxRetainedWorkspaces > 0 && Orchestrator.lockedWorkspace !== ``;
|
||||
@@ -247,11 +249,13 @@ class BuildParameters {
|
||||
cacheUnityInstallationOnMac: Input.cacheUnityInstallationOnMac,
|
||||
unityHubVersionOnMac: Input.unityHubVersionOnMac,
|
||||
dockerWorkspacePath: Input.dockerWorkspacePath,
|
||||
syncStrategy: Input.syncStrategy,
|
||||
syncInputRef: Input.syncInputRef,
|
||||
syncStorageRemote: Input.syncStorageRemote,
|
||||
syncRevertAfter: Input.syncRevertAfter,
|
||||
syncStatePath: Input.syncStatePath,
|
||||
hotRunnerEnabled: Input.hotRunnerEnabled,
|
||||
hotRunnerTransport: Input.hotRunnerTransport,
|
||||
hotRunnerHost: Input.hotRunnerHost,
|
||||
hotRunnerPort: Input.hotRunnerPort,
|
||||
hotRunnerHealthInterval: Input.hotRunnerHealthInterval,
|
||||
hotRunnerMaxIdle: Input.hotRunnerMaxIdle,
|
||||
hotRunnerFallbackToCold: Input.hotRunnerFallbackToCold,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -241,28 +241,6 @@ class Input {
|
||||
return Input.getInput('dockerWorkspacePath') ?? '/github/workspace';
|
||||
}
|
||||
|
||||
static get syncStrategy(): string {
|
||||
return Input.getInput('syncStrategy') ?? 'full';
|
||||
}
|
||||
|
||||
static get syncInputRef(): string {
|
||||
return Input.getInput('syncInputRef') ?? '';
|
||||
}
|
||||
|
||||
static get syncStorageRemote(): string {
|
||||
return Input.getInput('syncStorageRemote') ?? '';
|
||||
}
|
||||
|
||||
static get syncRevertAfter(): boolean {
|
||||
const input = Input.getInput('syncRevertAfter') ?? 'true';
|
||||
|
||||
return input === 'true';
|
||||
}
|
||||
|
||||
static get syncStatePath(): string {
|
||||
return Input.getInput('syncStatePath') ?? '.game-ci/sync-state.json';
|
||||
}
|
||||
|
||||
static get dockerCpuLimit(): string {
|
||||
return Input.getInput('dockerCpuLimit') ?? os.cpus().length.toString();
|
||||
}
|
||||
@@ -304,6 +282,38 @@ class Input {
|
||||
return Input.getInput('skipActivation')?.toLowerCase() ?? 'false';
|
||||
}
|
||||
|
||||
static get hotRunnerEnabled(): boolean {
|
||||
const input = Input.getInput('hotRunnerEnabled') ?? false;
|
||||
|
||||
return input === 'true';
|
||||
}
|
||||
|
||||
static get hotRunnerTransport(): 'websocket' | 'grpc' | 'named-pipe' {
|
||||
return (Input.getInput('hotRunnerTransport') ?? 'websocket') as 'websocket' | 'grpc' | 'named-pipe';
|
||||
}
|
||||
|
||||
static get hotRunnerHost(): string {
|
||||
return Input.getInput('hotRunnerHost') ?? 'localhost';
|
||||
}
|
||||
|
||||
static get hotRunnerPort(): number {
|
||||
return Number.parseInt(Input.getInput('hotRunnerPort') ?? '9090', 10);
|
||||
}
|
||||
|
||||
static get hotRunnerHealthInterval(): number {
|
||||
return Number.parseInt(Input.getInput('hotRunnerHealthInterval') ?? '30', 10);
|
||||
}
|
||||
|
||||
static get hotRunnerMaxIdle(): number {
|
||||
return Number.parseInt(Input.getInput('hotRunnerMaxIdle') ?? '3600', 10);
|
||||
}
|
||||
|
||||
static get hotRunnerFallbackToCold(): boolean {
|
||||
const input = Input.getInput('hotRunnerFallbackToCold') ?? 'true';
|
||||
|
||||
return input === 'true';
|
||||
}
|
||||
|
||||
public static ToEnvVarFormat(input: string) {
|
||||
if (input.toUpperCase() === input) {
|
||||
return input;
|
||||
|
||||
@@ -15,24 +15,15 @@ import BuildParameters from '../../build-parameters';
|
||||
import { Cli } from '../../cli/cli';
|
||||
import OrchestratorOptions from '../options/orchestrator-options';
|
||||
import ResourceTracking from '../services/core/resource-tracking';
|
||||
import { IncrementalSyncService } from '../services/sync';
|
||||
import { SyncStrategy } from '../services/sync/sync-state';
|
||||
|
||||
export class RemoteClient {
|
||||
@CliFunction(`remote-cli-pre-build`, `sets up a repository, usually before a game-ci build`)
|
||||
static async setupRemoteClient() {
|
||||
OrchestratorLogger.log(`bootstrap game ci orchestrator...`);
|
||||
await ResourceTracking.logDiskUsageSnapshot('remote-cli-pre-build (start)');
|
||||
|
||||
const syncStrategy = (Orchestrator.buildParameters.syncStrategy || 'full') as SyncStrategy;
|
||||
|
||||
if (syncStrategy !== 'full') {
|
||||
OrchestratorLogger.log(`[Sync] Using incremental sync strategy: ${syncStrategy}`);
|
||||
await RemoteClient.handleIncrementalSync(syncStrategy);
|
||||
} else if (!(await RemoteClient.handleRetainedWorkspace())) {
|
||||
if (!(await RemoteClient.handleRetainedWorkspace())) {
|
||||
await RemoteClient.bootstrapRepository();
|
||||
}
|
||||
|
||||
await RemoteClient.replaceLargePackageReferencesWithSharedReferences();
|
||||
await RemoteClient.runCustomHookFiles(`before-build`);
|
||||
}
|
||||
@@ -166,20 +157,6 @@ export class RemoteClient {
|
||||
|
||||
await RemoteClient.runCustomHookFiles(`after-build`);
|
||||
|
||||
// Revert sync overlays if configured
|
||||
const syncStrategy = (Orchestrator.buildParameters.syncStrategy || 'full') as SyncStrategy;
|
||||
if (Orchestrator.buildParameters.syncRevertAfter && syncStrategy !== 'full') {
|
||||
try {
|
||||
OrchestratorLogger.log('[Sync] Reverting overlay changes after job completion');
|
||||
await IncrementalSyncService.revertOverlays(
|
||||
OrchestratorFolders.repoPathAbsolute,
|
||||
Orchestrator.buildParameters.syncStatePath,
|
||||
);
|
||||
} catch (revertError: any) {
|
||||
RemoteClientLogger.logWarning(`[Sync] Overlay revert failed: ${revertError.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
// WIP - need to give the pod permissions to create config map
|
||||
await RemoteClientLogger.handleLogManagementPostJob();
|
||||
} catch (error: any) {
|
||||
@@ -252,78 +229,6 @@ export class RemoteClient {
|
||||
RemoteClientLogger.log(JSON.stringify(error, undefined, 4));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle incremental sync strategies (git-delta, direct-input, storage-pull).
|
||||
*
|
||||
* For git-delta: requires an existing workspace with sync state; fetches and applies
|
||||
* only changed files.
|
||||
*
|
||||
* For direct-input and storage-pull: requires an existing workspace; applies overlay
|
||||
* content on top.
|
||||
*
|
||||
* Falls back to full bootstrapRepository() if incremental sync cannot proceed.
|
||||
*/
|
||||
private static async handleIncrementalSync(strategy: SyncStrategy): Promise<void> {
|
||||
const buildParameters = Orchestrator.buildParameters;
|
||||
const workspacePath = OrchestratorFolders.repoPathAbsolute;
|
||||
const statePath = buildParameters.syncStatePath;
|
||||
|
||||
// Resolve strategy — may fall back to 'full' if no state exists
|
||||
const resolvedStrategy = IncrementalSyncService.resolveStrategy(strategy, workspacePath, statePath);
|
||||
|
||||
if (resolvedStrategy === 'full') {
|
||||
OrchestratorLogger.log('[Sync] Falling back to full bootstrap');
|
||||
if (!(await RemoteClient.handleRetainedWorkspace())) {
|
||||
await RemoteClient.bootstrapRepository();
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
switch (resolvedStrategy) {
|
||||
case 'git-delta': {
|
||||
const targetReference = buildParameters.gitSha || buildParameters.branch;
|
||||
OrchestratorLogger.log(`[Sync] Git delta sync to ${targetReference}`);
|
||||
const changedFiles = await IncrementalSyncService.syncGitDelta(workspacePath, targetReference, statePath);
|
||||
OrchestratorLogger.log(`[Sync] Git delta complete: ${changedFiles} file(s) updated`);
|
||||
break;
|
||||
}
|
||||
case 'direct-input': {
|
||||
const inputReference = buildParameters.syncInputRef;
|
||||
if (!inputReference) {
|
||||
throw new Error('[Sync] direct-input strategy requires syncInputRef');
|
||||
}
|
||||
OrchestratorLogger.log(`[Sync] Applying direct input: ${inputReference}`);
|
||||
await IncrementalSyncService.applyDirectInput(
|
||||
workspacePath,
|
||||
inputReference,
|
||||
buildParameters.syncStorageRemote || undefined,
|
||||
statePath,
|
||||
);
|
||||
break;
|
||||
}
|
||||
case 'storage-pull': {
|
||||
const storageUri = buildParameters.syncInputRef;
|
||||
if (!storageUri) {
|
||||
throw new Error('[Sync] storage-pull strategy requires syncInputRef');
|
||||
}
|
||||
OrchestratorLogger.log(`[Sync] Storage pull from: ${storageUri}`);
|
||||
await IncrementalSyncService.syncStoragePull(workspacePath, storageUri, {
|
||||
rcloneRemote: buildParameters.syncStorageRemote || undefined,
|
||||
syncRevertAfter: buildParameters.syncRevertAfter,
|
||||
statePath,
|
||||
});
|
||||
break;
|
||||
}
|
||||
default:
|
||||
OrchestratorLogger.logWarning(`[Sync] Unknown strategy: ${resolvedStrategy}, falling back to full`);
|
||||
if (!(await RemoteClient.handleRetainedWorkspace())) {
|
||||
await RemoteClient.bootstrapRepository();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static async bootstrapRepository() {
|
||||
await OrchestratorSystem.Run(
|
||||
`mkdir -p ${OrchestratorFolders.ToLinuxFolder(OrchestratorFolders.uniqueOrchestratorJobFolderAbsolute)}`,
|
||||
|
||||
5
src/model/orchestrator/runners/README.md
Normal file
5
src/model/orchestrator/runners/README.md
Normal file
@@ -0,0 +1,5 @@
|
||||
# Hot Runner Protocol
|
||||
|
||||
Extensible runner registration and persistent Unity editor provider protocol.
|
||||
|
||||
See GitHub Issue for full specification.
|
||||
@@ -0,0 +1,159 @@
|
||||
import OrchestratorLogger from '../core/orchestrator-logger';
|
||||
import { HotRunnerRegistry } from './hot-runner-registry';
|
||||
import { HotRunnerJobRequest, HotRunnerJobResult, HotRunnerStatus, HotRunnerTransport } from './hot-runner-types';
|
||||
|
||||
const POLL_INTERVAL_MS = 1000;
|
||||
|
||||
// eslint-disable-next-line no-unused-vars
|
||||
export type OutputCallback = (output: string) => void;
|
||||
|
||||
export class HotRunnerDispatcher {
|
||||
private transports: Map<string, HotRunnerTransport>;
|
||||
|
||||
constructor(transports: Map<string, HotRunnerTransport>) {
|
||||
this.transports = transports;
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispatch a job to an available hot runner matching the request's build target.
|
||||
* If no runner is immediately available, waits up to the request timeout.
|
||||
* Returns the job result, or throws if no runner becomes available in time.
|
||||
*/
|
||||
async dispatchJob(
|
||||
request: HotRunnerJobRequest,
|
||||
registry: HotRunnerRegistry,
|
||||
unityVersion: string,
|
||||
onOutput?: OutputCallback,
|
||||
): Promise<HotRunnerJobResult> {
|
||||
OrchestratorLogger.log(`[HotRunner] Dispatching job ${request.jobId} (target: ${request.buildTarget})`);
|
||||
|
||||
// Find or wait for an available runner
|
||||
let runner = registry.findAvailableRunner({
|
||||
unityVersion,
|
||||
platform: request.buildTarget,
|
||||
});
|
||||
|
||||
if (!runner) {
|
||||
OrchestratorLogger.log(
|
||||
`[HotRunner] No idle runner available for ${unityVersion}/${request.buildTarget}, waiting...`,
|
||||
);
|
||||
runner = await this.waitForRunner({ unityVersion, platform: request.buildTarget }, request.timeout, registry);
|
||||
}
|
||||
|
||||
// Mark runner as busy
|
||||
registry.updateRunner(runner.id, {
|
||||
state: 'busy',
|
||||
currentJob: request.jobId,
|
||||
});
|
||||
|
||||
const transport = this.transports.get(runner.id);
|
||||
if (!transport) {
|
||||
registry.updateRunner(runner.id, { state: 'idle', currentJob: undefined });
|
||||
throw new Error(`[HotRunner] No transport available for runner ${runner.id}`);
|
||||
}
|
||||
|
||||
OrchestratorLogger.log(`[HotRunner] Sending job ${request.jobId} to runner ${runner.id}`);
|
||||
|
||||
const startTime = Date.now();
|
||||
|
||||
try {
|
||||
const result = await this.executeWithTimeout(transport, request);
|
||||
|
||||
const duration = Date.now() - startTime;
|
||||
OrchestratorLogger.log(
|
||||
`[HotRunner] Job ${request.jobId} completed on runner ${runner.id} in ${duration}ms (exit: ${result.exitCode})`,
|
||||
);
|
||||
|
||||
if (onOutput && result.output) {
|
||||
onOutput(result.output);
|
||||
}
|
||||
|
||||
// Mark runner as idle and increment job count
|
||||
const currentStatus = registry.getRunner(runner.id);
|
||||
registry.updateRunner(runner.id, {
|
||||
state: 'idle',
|
||||
currentJob: undefined,
|
||||
lastJobCompleted: request.jobId,
|
||||
jobsCompleted: (currentStatus?.jobsCompleted ?? 0) + 1,
|
||||
});
|
||||
|
||||
return result;
|
||||
} catch (error: any) {
|
||||
OrchestratorLogger.logWarning(`[HotRunner] Job ${request.jobId} failed on runner ${runner.id}: ${error.message}`);
|
||||
|
||||
// Mark runner as idle despite failure -- the health monitor will recycle if needed
|
||||
registry.updateRunner(runner.id, {
|
||||
state: 'idle',
|
||||
currentJob: undefined,
|
||||
});
|
||||
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for an available runner matching the requirements.
|
||||
* Polls the registry at a fixed interval until one becomes available or timeout expires.
|
||||
*/
|
||||
async waitForRunner(
|
||||
requirements: { unityVersion: string; platform: string },
|
||||
timeoutMs: number,
|
||||
registry: HotRunnerRegistry,
|
||||
): Promise<HotRunnerStatus> {
|
||||
const deadline = Date.now() + timeoutMs;
|
||||
|
||||
while (Date.now() < deadline) {
|
||||
const runner = registry.findAvailableRunner(requirements);
|
||||
if (runner) {
|
||||
OrchestratorLogger.log(`[HotRunner] Runner ${runner.id} became available`);
|
||||
|
||||
return runner;
|
||||
}
|
||||
|
||||
await this.sleep(Math.min(POLL_INTERVAL_MS, deadline - Date.now()));
|
||||
}
|
||||
|
||||
throw new Error(
|
||||
`[HotRunner] Timed out waiting for available runner (${requirements.unityVersion}/${requirements.platform}) after ${timeoutMs}ms`,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a job on a transport with a timeout guard.
|
||||
* On timeout, disconnects the transport to release the connection
|
||||
* and prevent the orphaned sendJob promise from holding resources.
|
||||
*/
|
||||
private async executeWithTimeout(
|
||||
transport: HotRunnerTransport,
|
||||
request: HotRunnerJobRequest,
|
||||
): Promise<HotRunnerJobResult> {
|
||||
const TIMEOUT_SENTINEL = Symbol('timeout');
|
||||
|
||||
const timeoutPromise = new Promise<typeof TIMEOUT_SENTINEL>((resolve) => {
|
||||
setTimeout(() => {
|
||||
resolve(TIMEOUT_SENTINEL);
|
||||
}, request.timeout);
|
||||
});
|
||||
|
||||
const result = await Promise.race([transport.sendJob(request), timeoutPromise]);
|
||||
|
||||
if (result === TIMEOUT_SENTINEL) {
|
||||
// Disconnect the transport to clean up the orphaned sendJob call
|
||||
try {
|
||||
await transport.disconnect();
|
||||
} catch (disconnectError: any) {
|
||||
OrchestratorLogger.logWarning(
|
||||
`[HotRunner] Error disconnecting transport after timeout for job ${request.jobId}: ${disconnectError.message}`,
|
||||
);
|
||||
}
|
||||
|
||||
throw new Error(`[HotRunner] Job ${request.jobId} timed out after ${request.timeout}ms`);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private sleep(ms: number): Promise<void> {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,186 @@
|
||||
import OrchestratorLogger from '../core/orchestrator-logger';
|
||||
import { HotRunnerRegistry } from './hot-runner-registry';
|
||||
import { HotRunnerTransport } from './hot-runner-types';
|
||||
|
||||
export class HotRunnerHealthMonitor {
|
||||
private intervalHandle: ReturnType<typeof setInterval> | undefined;
|
||||
private registry: HotRunnerRegistry | undefined;
|
||||
private transports: Map<string, HotRunnerTransport> = new Map();
|
||||
|
||||
/**
|
||||
* Start periodic health monitoring for all registered runners.
|
||||
*/
|
||||
startMonitoring(registry: HotRunnerRegistry, interval: number, transports: Map<string, HotRunnerTransport>): void {
|
||||
if (this.intervalHandle) {
|
||||
this.stopMonitoring();
|
||||
}
|
||||
|
||||
this.registry = registry;
|
||||
this.transports = transports;
|
||||
|
||||
OrchestratorLogger.log(`[HotRunner] Starting health monitoring (interval: ${interval}s)`);
|
||||
|
||||
this.intervalHandle = setInterval(() => {
|
||||
this.runHealthChecks().catch((error: any) => {
|
||||
OrchestratorLogger.logWarning(`[HotRunner] Health check cycle failed: ${error.message}`);
|
||||
});
|
||||
}, interval * 1000);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop periodic health monitoring.
|
||||
*/
|
||||
stopMonitoring(): void {
|
||||
if (this.intervalHandle) {
|
||||
clearInterval(this.intervalHandle);
|
||||
this.intervalHandle = undefined;
|
||||
OrchestratorLogger.log(`[HotRunner] Health monitoring stopped`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check health of a specific runner by ID. Returns true if healthy.
|
||||
*/
|
||||
async checkHealth(runnerId: string): Promise<boolean> {
|
||||
if (!this.registry) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const transport = this.transports.get(runnerId);
|
||||
if (!transport) {
|
||||
OrchestratorLogger.logWarning(`[HotRunner] No transport for runner ${runnerId}`);
|
||||
this.registry.updateRunner(runnerId, {
|
||||
state: 'unhealthy',
|
||||
lastHealthCheck: new Date().toISOString(),
|
||||
});
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
const healthy = await transport.healthCheck();
|
||||
if (healthy) {
|
||||
const status = await transport.getStatus();
|
||||
this.registry.updateRunner(runnerId, {
|
||||
lastHealthCheck: new Date().toISOString(),
|
||||
memoryUsageMB: status.memoryUsageMB,
|
||||
uptime: status.uptime,
|
||||
libraryHash: status.libraryHash,
|
||||
});
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
OrchestratorLogger.logWarning(`[HotRunner] Runner ${runnerId} health check returned false`);
|
||||
this.registry.updateRunner(runnerId, {
|
||||
state: 'unhealthy',
|
||||
lastHealthCheck: new Date().toISOString(),
|
||||
});
|
||||
|
||||
return false;
|
||||
} catch (error: any) {
|
||||
OrchestratorLogger.logWarning(`[HotRunner] Runner ${runnerId} health check failed: ${error.message}`);
|
||||
this.registry.updateRunner(runnerId, {
|
||||
state: 'unhealthy',
|
||||
lastHealthCheck: new Date().toISOString(),
|
||||
});
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark an unhealthy runner for cleanup and disconnect its transport.
|
||||
*/
|
||||
async recycleUnhealthyRunner(runnerId: string): Promise<void> {
|
||||
if (!this.registry) {
|
||||
return;
|
||||
}
|
||||
|
||||
OrchestratorLogger.log(`[HotRunner] Recycling unhealthy runner ${runnerId}`);
|
||||
this.registry.updateRunner(runnerId, { state: 'stopping' });
|
||||
|
||||
const transport = this.transports.get(runnerId);
|
||||
if (transport) {
|
||||
try {
|
||||
await transport.disconnect();
|
||||
} catch (error: any) {
|
||||
OrchestratorLogger.logWarning(`[HotRunner] Error disconnecting runner ${runnerId}: ${error.message}`);
|
||||
}
|
||||
this.transports.delete(runnerId);
|
||||
}
|
||||
|
||||
this.registry.unregisterRunner(runnerId);
|
||||
OrchestratorLogger.log(`[HotRunner] Runner ${runnerId} recycled and removed`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Recycle a runner that has been idle longer than the maximum idle time.
|
||||
*/
|
||||
async recycleIdleRunner(runnerId: string, maxIdleTime: number): Promise<void> {
|
||||
if (!this.registry) {
|
||||
return;
|
||||
}
|
||||
|
||||
const runner = this.registry.getRunner(runnerId);
|
||||
if (!runner || runner.state !== 'idle') {
|
||||
return;
|
||||
}
|
||||
|
||||
const lastCheckTime = new Date(runner.lastHealthCheck).getTime();
|
||||
const now = Date.now();
|
||||
const idleSeconds = (now - lastCheckTime) / 1000;
|
||||
|
||||
if (idleSeconds >= maxIdleTime) {
|
||||
OrchestratorLogger.log(
|
||||
`[HotRunner] Runner ${runnerId} idle for ${Math.floor(idleSeconds)}s (max: ${maxIdleTime}s), recycling`,
|
||||
);
|
||||
await this.recycleUnhealthyRunner(runnerId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Run health checks and idle-recycle checks for all registered runners.
|
||||
*/
|
||||
private async runHealthChecks(): Promise<void> {
|
||||
if (!this.registry) {
|
||||
return;
|
||||
}
|
||||
|
||||
const runners = this.registry.listRunners();
|
||||
|
||||
for (const runner of runners) {
|
||||
if (runner.state === 'stopping') {
|
||||
continue;
|
||||
}
|
||||
|
||||
const healthy = await this.checkHealth(runner.id);
|
||||
|
||||
if (!healthy && runner.state !== 'starting') {
|
||||
await this.recycleUnhealthyRunner(runner.id);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check for idle timeout
|
||||
const config = this.registry.getConfig(runner.id);
|
||||
if (config && runner.state === 'idle') {
|
||||
await this.recycleIdleRunner(runner.id, config.maxIdleTime);
|
||||
}
|
||||
|
||||
// Check for max jobs before recycle
|
||||
if (config && config.maxJobsBeforeRecycle > 0 && runner.jobsCompleted >= config.maxJobsBeforeRecycle) {
|
||||
OrchestratorLogger.log(
|
||||
`[HotRunner] Runner ${runner.id} reached max jobs (${runner.jobsCompleted}/${config.maxJobsBeforeRecycle}), recycling`,
|
||||
);
|
||||
await this.recycleUnhealthyRunner(runner.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether health monitoring is currently active.
|
||||
*/
|
||||
get isMonitoring(): boolean {
|
||||
return this.intervalHandle !== undefined;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,315 @@
|
||||
import fs from 'node:fs';
|
||||
import path from 'node:path';
|
||||
import { customAlphabet } from 'nanoid';
|
||||
import OrchestratorLogger from '../core/orchestrator-logger';
|
||||
import { HotRunnerConfig, HotRunnerStatus } from './hot-runner-types';
|
||||
|
||||
const generateId = customAlphabet('abcdefghijklmnopqrstuvwxyz0123456789', 12);
|
||||
|
||||
const PERSISTENCE_FILENAME = 'hot-runners.json';
|
||||
|
||||
const VALID_RUNNER_STATES: ReadonlySet<string> = new Set(['idle', 'busy', 'starting', 'stopping', 'unhealthy']);
|
||||
|
||||
export interface HotRunnerFilter {
|
||||
platform?: string;
|
||||
state?: string;
|
||||
unityVersion?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate that a restored runner entry has all required fields with correct types.
|
||||
* Returns true if the entry is a valid HotRunnerStatus, false otherwise.
|
||||
*/
|
||||
function isValidRunnerStatus(entry: unknown): entry is HotRunnerStatus {
|
||||
if (typeof entry !== 'object' || entry === null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const record = entry as Record<string, unknown>;
|
||||
|
||||
return (
|
||||
typeof record.id === 'string' &&
|
||||
record.id.length > 0 &&
|
||||
typeof record.state === 'string' &&
|
||||
VALID_RUNNER_STATES.has(record.state) &&
|
||||
typeof record.unityVersion === 'string' &&
|
||||
typeof record.platform === 'string' &&
|
||||
typeof record.uptime === 'number' &&
|
||||
typeof record.jobsCompleted === 'number' &&
|
||||
typeof record.lastHealthCheck === 'string' &&
|
||||
typeof record.memoryUsageMB === 'number'
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate that a restored config entry has all required fields with correct types.
|
||||
* Returns true if the entry is a valid HotRunnerConfig, false otherwise.
|
||||
*/
|
||||
function isValidRunnerConfig(entry: unknown): entry is HotRunnerConfig {
|
||||
if (typeof entry !== 'object' || entry === null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const record = entry as Record<string, unknown>;
|
||||
|
||||
return (
|
||||
typeof record.enabled === 'boolean' &&
|
||||
typeof record.transport === 'string' &&
|
||||
['websocket', 'grpc', 'named-pipe'].includes(record.transport) &&
|
||||
typeof record.host === 'string' &&
|
||||
typeof record.port === 'number' &&
|
||||
typeof record.healthCheckInterval === 'number' &&
|
||||
typeof record.maxIdleTime === 'number' &&
|
||||
typeof record.maxJobsBeforeRecycle === 'number'
|
||||
);
|
||||
}
|
||||
|
||||
export class HotRunnerRegistry {
|
||||
private runners: Map<string, HotRunnerStatus> = new Map();
|
||||
private configs: Map<string, HotRunnerConfig> = new Map();
|
||||
private persistencePath: string;
|
||||
|
||||
constructor(persistenceDirectory?: string) {
|
||||
this.persistencePath = persistenceDirectory ? path.join(persistenceDirectory, PERSISTENCE_FILENAME) : '';
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a new hot runner. Returns the generated runner ID.
|
||||
*/
|
||||
registerRunner(config: HotRunnerConfig): string {
|
||||
const id = `hr-${generateId()}`;
|
||||
|
||||
const status: HotRunnerStatus = {
|
||||
id,
|
||||
state: 'starting',
|
||||
unityVersion: config.unityVersion ?? 'unknown',
|
||||
platform: config.platform ?? 'unknown',
|
||||
uptime: 0,
|
||||
jobsCompleted: 0,
|
||||
lastHealthCheck: new Date().toISOString(),
|
||||
memoryUsageMB: 0,
|
||||
};
|
||||
|
||||
this.runners.set(id, status);
|
||||
this.configs.set(id, config);
|
||||
OrchestratorLogger.log(`[HotRunner] Registered runner ${id} (${status.unityVersion}/${status.platform})`);
|
||||
|
||||
this.persist();
|
||||
|
||||
return id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a runner from the registry.
|
||||
*/
|
||||
unregisterRunner(id: string): void {
|
||||
const existed = this.runners.delete(id);
|
||||
this.configs.delete(id);
|
||||
|
||||
if (existed) {
|
||||
OrchestratorLogger.log(`[HotRunner] Unregistered runner ${id}`);
|
||||
this.persist();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a runner's current status by ID.
|
||||
*/
|
||||
getRunner(id: string): HotRunnerStatus | undefined {
|
||||
return this.runners.get(id);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a runner's config by ID.
|
||||
*/
|
||||
getConfig(id: string): HotRunnerConfig | undefined {
|
||||
return this.configs.get(id);
|
||||
}
|
||||
|
||||
/**
|
||||
* List all runners, optionally filtered by platform, state, or Unity version.
|
||||
*/
|
||||
listRunners(filter?: HotRunnerFilter): HotRunnerStatus[] {
|
||||
let results = [...this.runners.values()];
|
||||
|
||||
if (filter?.platform) {
|
||||
results = results.filter((runner) => runner.platform === filter.platform);
|
||||
}
|
||||
|
||||
if (filter?.state) {
|
||||
results = results.filter((runner) => runner.state === filter.state);
|
||||
}
|
||||
|
||||
if (filter?.unityVersion) {
|
||||
results = results.filter((runner) => runner.unityVersion === filter.unityVersion);
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find an idle runner matching the given Unity version and platform requirements.
|
||||
*/
|
||||
findAvailableRunner(requirements: { unityVersion: string; platform: string }): HotRunnerStatus | undefined {
|
||||
return this.listRunners({
|
||||
state: 'idle',
|
||||
unityVersion: requirements.unityVersion,
|
||||
platform: requirements.platform,
|
||||
})[0];
|
||||
}
|
||||
|
||||
/**
|
||||
* Update a runner's status fields. Merges partial updates into existing status.
|
||||
*/
|
||||
updateRunner(id: string, update: Partial<HotRunnerStatus>): void {
|
||||
const existing = this.runners.get(id);
|
||||
if (!existing) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.runners.set(id, { ...existing, ...update, id });
|
||||
this.persist();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the total number of registered runners.
|
||||
*/
|
||||
get size(): number {
|
||||
return this.runners.size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate all runners in the registry and reset invalid ones to 'unhealthy'.
|
||||
* Returns the number of runners that were repaired.
|
||||
*/
|
||||
validateAndRepair(): number {
|
||||
let repaired = 0;
|
||||
|
||||
for (const [id, status] of this.runners) {
|
||||
// Cast to unknown to bypass the type guard narrowing to 'never',
|
||||
// since the Map is typed as HotRunnerStatus but entries may have
|
||||
// been corrupted via direct deserialization or unsafe casts.
|
||||
const entry = status as unknown as Record<string, unknown>;
|
||||
|
||||
if (!isValidRunnerStatus(entry)) {
|
||||
OrchestratorLogger.logWarning(`[HotRunner] Runner ${id} has invalid state, marking as unhealthy`);
|
||||
this.runners.set(id, {
|
||||
id,
|
||||
state: 'unhealthy',
|
||||
unityVersion: typeof entry.unityVersion === 'string' ? entry.unityVersion : 'unknown',
|
||||
platform: typeof entry.platform === 'string' ? entry.platform : 'unknown',
|
||||
uptime: typeof entry.uptime === 'number' ? entry.uptime : 0,
|
||||
jobsCompleted: typeof entry.jobsCompleted === 'number' ? entry.jobsCompleted : 0,
|
||||
lastHealthCheck: typeof entry.lastHealthCheck === 'string' ? entry.lastHealthCheck : new Date().toISOString(),
|
||||
memoryUsageMB: typeof entry.memoryUsageMB === 'number' ? entry.memoryUsageMB : 0,
|
||||
});
|
||||
repaired++;
|
||||
}
|
||||
}
|
||||
|
||||
if (repaired > 0) {
|
||||
this.persist();
|
||||
}
|
||||
|
||||
return repaired;
|
||||
}
|
||||
|
||||
/**
|
||||
* Persist current registry state to disk for crash recovery.
|
||||
* Validates data before writing to prevent persisting corrupt state.
|
||||
*/
|
||||
private persist(): void {
|
||||
if (!this.persistencePath) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// Validate data before persisting
|
||||
for (const [id, status] of this.runners) {
|
||||
if (!isValidRunnerStatus(status)) {
|
||||
OrchestratorLogger.logWarning(`[HotRunner] Skipping persistence -- runner ${id} has invalid state`);
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
const data = {
|
||||
runners: Object.fromEntries(this.runners),
|
||||
configs: Object.fromEntries(this.configs),
|
||||
};
|
||||
const directory = path.dirname(this.persistencePath);
|
||||
if (!fs.existsSync(directory)) {
|
||||
fs.mkdirSync(directory, { recursive: true });
|
||||
}
|
||||
|
||||
fs.writeFileSync(this.persistencePath, JSON.stringify(data, undefined, 2));
|
||||
} catch (error: any) {
|
||||
OrchestratorLogger.logWarning(`[HotRunner] Failed to persist registry: ${error.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Load registry state from disk. Returns the number of runners restored.
|
||||
* Validates each restored entry and discards corrupt entries with warnings.
|
||||
* If the persistence file itself is corrupt (invalid JSON), starts with
|
||||
* an empty registry.
|
||||
*/
|
||||
loadFromDisk(): number {
|
||||
if (!this.persistencePath || !fs.existsSync(this.persistencePath)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
let data: any;
|
||||
|
||||
try {
|
||||
const raw = fs.readFileSync(this.persistencePath, 'utf8');
|
||||
data = JSON.parse(raw);
|
||||
} catch (error: any) {
|
||||
OrchestratorLogger.logWarning(
|
||||
`[HotRunner] Persistence file is corrupt, starting with empty registry: ${error.message}`,
|
||||
);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (typeof data !== 'object' || data === null) {
|
||||
OrchestratorLogger.logWarning('[HotRunner] Persistence file has invalid structure, starting with empty registry');
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
let discarded = 0;
|
||||
|
||||
if (data.runners && typeof data.runners === 'object') {
|
||||
for (const [id, status] of Object.entries(data.runners)) {
|
||||
if (isValidRunnerStatus(status)) {
|
||||
this.runners.set(id, status);
|
||||
} else {
|
||||
OrchestratorLogger.logWarning(`[HotRunner] Discarding invalid runner entry '${id}' from persistence file`);
|
||||
discarded++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (data.configs && typeof data.configs === 'object') {
|
||||
for (const [id, config] of Object.entries(data.configs)) {
|
||||
// Only restore configs for runners that were successfully restored
|
||||
if (this.runners.has(id)) {
|
||||
if (isValidRunnerConfig(config)) {
|
||||
this.configs.set(id, config);
|
||||
} else {
|
||||
OrchestratorLogger.logWarning(`[HotRunner] Discarding invalid config entry '${id}' from persistence file`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (discarded > 0) {
|
||||
OrchestratorLogger.logWarning(`[HotRunner] Discarded ${discarded} invalid runner(s) from persistence file`);
|
||||
}
|
||||
|
||||
OrchestratorLogger.log(`[HotRunner] Restored ${this.runners.size} runner(s) from disk`);
|
||||
|
||||
return this.runners.size;
|
||||
}
|
||||
}
|
||||
166
src/model/orchestrator/services/hot-runner/hot-runner-service.ts
Normal file
166
src/model/orchestrator/services/hot-runner/hot-runner-service.ts
Normal file
@@ -0,0 +1,166 @@
|
||||
import OrchestratorLogger from '../core/orchestrator-logger';
|
||||
import { HotRunnerRegistry } from './hot-runner-registry';
|
||||
import { HotRunnerHealthMonitor } from './hot-runner-health-monitor';
|
||||
import { HotRunnerDispatcher, OutputCallback } from './hot-runner-dispatcher';
|
||||
import {
|
||||
HotRunnerConfig,
|
||||
HotRunnerJobRequest,
|
||||
HotRunnerJobResult,
|
||||
HotRunnerStatus,
|
||||
HotRunnerTransport,
|
||||
} from './hot-runner-types';
|
||||
import BuildParameters from '../../../build-parameters';
|
||||
|
||||
export class HotRunnerService {
|
||||
private registry: HotRunnerRegistry;
|
||||
private healthMonitor: HotRunnerHealthMonitor;
|
||||
private dispatcher: HotRunnerDispatcher;
|
||||
private transports: Map<string, HotRunnerTransport> = new Map();
|
||||
private config: HotRunnerConfig | undefined;
|
||||
|
||||
constructor(persistenceDirectory?: string) {
|
||||
this.registry = new HotRunnerRegistry(persistenceDirectory);
|
||||
this.healthMonitor = new HotRunnerHealthMonitor();
|
||||
this.dispatcher = new HotRunnerDispatcher(this.transports);
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the hot runner service: load persisted state, start health monitoring.
|
||||
*/
|
||||
async initialize(config: HotRunnerConfig): Promise<void> {
|
||||
this.config = config;
|
||||
|
||||
OrchestratorLogger.log(
|
||||
`[HotRunner] Initializing service (transport: ${config.transport}, ${config.host}:${config.port})`,
|
||||
);
|
||||
|
||||
// Attempt to restore previously registered runners from disk
|
||||
const restored = this.registry.loadFromDisk();
|
||||
if (restored > 0) {
|
||||
OrchestratorLogger.log(`[HotRunner] Restored ${restored} runner(s) from persistence`);
|
||||
}
|
||||
|
||||
// Start health monitoring
|
||||
this.healthMonitor.startMonitoring(this.registry, config.healthCheckInterval, this.transports);
|
||||
|
||||
OrchestratorLogger.log(`[HotRunner] Service initialized`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a runner with a transport implementation.
|
||||
* Returns the runner ID.
|
||||
*/
|
||||
registerRunner(config: HotRunnerConfig, transport: HotRunnerTransport): string {
|
||||
const id = this.registry.registerRunner(config);
|
||||
this.transports.set(id, transport);
|
||||
|
||||
return id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Submit a build job to an available hot runner.
|
||||
* Converts BuildParameters to a HotRunnerJobRequest and dispatches.
|
||||
*/
|
||||
async submitBuild(params: BuildParameters, onOutput?: OutputCallback): Promise<HotRunnerJobResult> {
|
||||
const request: HotRunnerJobRequest = {
|
||||
jobId: params.buildGuid || `build-${Date.now()}`,
|
||||
buildMethod: params.buildMethod || undefined,
|
||||
buildTarget: params.targetPlatform,
|
||||
buildPath: params.buildPath,
|
||||
customParameters: params.customParameters ? this.parseCustomParameters(params.customParameters) : undefined,
|
||||
timeout: 30 * 60 * 1000, // 30 minutes default
|
||||
};
|
||||
|
||||
OrchestratorLogger.log(`[HotRunner] Submitting build: ${request.jobId} (target: ${request.buildTarget})`);
|
||||
|
||||
return this.dispatcher.dispatchJob(request, this.registry, params.editorVersion, onOutput);
|
||||
}
|
||||
|
||||
/**
|
||||
* Submit a test job to an available hot runner.
|
||||
* Converts BuildParameters and optional suite config to a test-mode HotRunnerJobRequest.
|
||||
*/
|
||||
async submitTest(
|
||||
params: BuildParameters,
|
||||
suiteConfig?: { testMode?: 'editmode' | 'playmode'; testSuitePath?: string },
|
||||
onOutput?: OutputCallback,
|
||||
): Promise<HotRunnerJobResult> {
|
||||
const request: HotRunnerJobRequest = {
|
||||
jobId: params.buildGuid || `test-${Date.now()}`,
|
||||
buildTarget: params.targetPlatform,
|
||||
customParameters: params.customParameters ? this.parseCustomParameters(params.customParameters) : undefined,
|
||||
timeout: 30 * 60 * 1000, // 30 minutes default
|
||||
testMode: suiteConfig?.testMode ?? 'editmode',
|
||||
testSuitePath: suiteConfig?.testSuitePath,
|
||||
};
|
||||
|
||||
OrchestratorLogger.log(`[HotRunner] Submitting test: ${request.jobId} (mode: ${request.testMode})`);
|
||||
|
||||
return this.dispatcher.dispatchJob(request, this.registry, params.editorVersion, onOutput);
|
||||
}
|
||||
|
||||
/**
|
||||
* Shut down the service: stop health monitoring, disconnect all transports,
|
||||
* and unregister all runners.
|
||||
*/
|
||||
async shutdown(): Promise<void> {
|
||||
OrchestratorLogger.log(`[HotRunner] Shutting down service`);
|
||||
|
||||
this.healthMonitor.stopMonitoring();
|
||||
|
||||
const disconnectPromises: Promise<void>[] = [];
|
||||
for (const [id, transport] of this.transports.entries()) {
|
||||
disconnectPromises.push(
|
||||
transport.disconnect().catch((error: any) => {
|
||||
OrchestratorLogger.logWarning(`[HotRunner] Error disconnecting runner ${id}: ${error.message}`);
|
||||
}),
|
||||
);
|
||||
}
|
||||
await Promise.all(disconnectPromises);
|
||||
|
||||
this.transports.clear();
|
||||
|
||||
OrchestratorLogger.log(`[HotRunner] Service shut down`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the status of all registered runners.
|
||||
*/
|
||||
getStatus(): HotRunnerStatus[] {
|
||||
return this.registry.listRunners();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the underlying registry (for testing or advanced use).
|
||||
*/
|
||||
getRegistry(): HotRunnerRegistry {
|
||||
return this.registry;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse a space-separated custom parameters string into a key-value map.
|
||||
* Handles `-key value` and `-key=value` formats.
|
||||
*/
|
||||
private parseCustomParameters(raw: string): Record<string, string> {
|
||||
const result: Record<string, string> = {};
|
||||
const parts = raw.trim().split(/\s+/);
|
||||
|
||||
for (let i = 0; i < parts.length; i++) {
|
||||
const part = parts[i];
|
||||
if (part.startsWith('-')) {
|
||||
const key = part.replace(/^-+/, '');
|
||||
if (key.includes('=')) {
|
||||
const [k, ...v] = key.split('=');
|
||||
result[k] = v.join('=');
|
||||
} else if (i + 1 < parts.length && !parts[i + 1].startsWith('-')) {
|
||||
result[key] = parts[i + 1];
|
||||
i++;
|
||||
} else {
|
||||
result[key] = 'true';
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,54 @@
|
||||
export interface HotRunnerConfig {
|
||||
enabled: boolean;
|
||||
transport: 'websocket' | 'grpc' | 'named-pipe';
|
||||
host: string;
|
||||
port: number;
|
||||
healthCheckInterval: number; // seconds
|
||||
maxIdleTime: number; // seconds before recycling
|
||||
maxJobsBeforeRecycle: number;
|
||||
unityVersion?: string;
|
||||
platform?: string;
|
||||
}
|
||||
|
||||
export interface HotRunnerStatus {
|
||||
id: string;
|
||||
state: 'idle' | 'busy' | 'starting' | 'stopping' | 'unhealthy';
|
||||
unityVersion: string;
|
||||
platform: string;
|
||||
currentJob?: string;
|
||||
lastJobCompleted?: string;
|
||||
uptime: number;
|
||||
jobsCompleted: number;
|
||||
lastHealthCheck: string;
|
||||
memoryUsageMB: number;
|
||||
libraryHash?: string;
|
||||
}
|
||||
|
||||
export interface HotRunnerJobRequest {
|
||||
jobId: string;
|
||||
buildMethod?: string;
|
||||
buildTarget: string;
|
||||
buildPath?: string;
|
||||
customParameters?: Record<string, string>;
|
||||
timeout: number;
|
||||
testMode?: 'editmode' | 'playmode';
|
||||
testSuitePath?: string;
|
||||
}
|
||||
|
||||
export interface HotRunnerJobResult {
|
||||
jobId: string;
|
||||
success: boolean;
|
||||
exitCode: number;
|
||||
duration: number;
|
||||
output: string;
|
||||
artifacts?: string[];
|
||||
testResults?: string; // path to test result file
|
||||
}
|
||||
|
||||
export interface HotRunnerTransport {
|
||||
connect(config: HotRunnerConfig): Promise<void>;
|
||||
disconnect(): Promise<void>;
|
||||
sendJob(request: HotRunnerJobRequest): Promise<HotRunnerJobResult>;
|
||||
getStatus(): Promise<HotRunnerStatus>;
|
||||
healthCheck(): Promise<boolean>;
|
||||
}
|
||||
740
src/model/orchestrator/services/hot-runner/hot-runner.test.ts
Normal file
740
src/model/orchestrator/services/hot-runner/hot-runner.test.ts
Normal file
@@ -0,0 +1,740 @@
|
||||
import fs from 'node:fs';
|
||||
import { HotRunnerRegistry } from './hot-runner-registry';
|
||||
import { HotRunnerHealthMonitor } from './hot-runner-health-monitor';
|
||||
import { HotRunnerDispatcher } from './hot-runner-dispatcher';
|
||||
import { HotRunnerService } from './hot-runner-service';
|
||||
import {
|
||||
HotRunnerConfig,
|
||||
HotRunnerJobRequest,
|
||||
HotRunnerJobResult,
|
||||
HotRunnerStatus,
|
||||
HotRunnerTransport,
|
||||
} from './hot-runner-types';
|
||||
|
||||
// Mock dependencies
|
||||
jest.mock('node:fs');
|
||||
jest.mock('../core/orchestrator-logger');
|
||||
|
||||
const mockFs = fs as jest.Mocked<typeof fs>;
|
||||
|
||||
function createMockConfig(overrides?: Partial<HotRunnerConfig>): HotRunnerConfig {
|
||||
return {
|
||||
enabled: true,
|
||||
transport: 'websocket',
|
||||
host: 'localhost',
|
||||
port: 9090,
|
||||
healthCheckInterval: 30,
|
||||
maxIdleTime: 3600,
|
||||
maxJobsBeforeRecycle: 100,
|
||||
unityVersion: '2022.3.0f1',
|
||||
platform: 'StandaloneWindows64',
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
function createMockTransport(overrides?: Partial<HotRunnerTransport>): HotRunnerTransport {
|
||||
return {
|
||||
// eslint-disable-next-line unicorn/no-useless-undefined
|
||||
connect: jest.fn().mockResolvedValue(undefined),
|
||||
// eslint-disable-next-line unicorn/no-useless-undefined
|
||||
disconnect: jest.fn().mockResolvedValue(undefined),
|
||||
sendJob: jest.fn().mockResolvedValue({
|
||||
jobId: 'test-job',
|
||||
success: true,
|
||||
exitCode: 0,
|
||||
duration: 5000,
|
||||
output: 'Build succeeded',
|
||||
artifacts: ['build/output.exe'],
|
||||
} as HotRunnerJobResult),
|
||||
getStatus: jest.fn().mockResolvedValue({
|
||||
id: 'mock-runner',
|
||||
state: 'idle',
|
||||
unityVersion: '2022.3.0f1',
|
||||
platform: 'StandaloneWindows64',
|
||||
uptime: 3600,
|
||||
jobsCompleted: 5,
|
||||
lastHealthCheck: new Date().toISOString(),
|
||||
memoryUsageMB: 1024,
|
||||
} as HotRunnerStatus),
|
||||
healthCheck: jest.fn().mockResolvedValue(true),
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
function createMockJobRequest(overrides?: Partial<HotRunnerJobRequest>): HotRunnerJobRequest {
|
||||
return {
|
||||
jobId: 'job-001',
|
||||
buildTarget: 'StandaloneWindows64',
|
||||
timeout: 60000,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
// --- Registry Tests ---
|
||||
|
||||
describe('HotRunnerRegistry', () => {
|
||||
let registry: HotRunnerRegistry;
|
||||
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
registry = new HotRunnerRegistry();
|
||||
});
|
||||
|
||||
it('should register a runner and return an ID', () => {
|
||||
const config = createMockConfig();
|
||||
const id = registry.registerRunner(config);
|
||||
|
||||
expect(id).toMatch(/^hr-/);
|
||||
expect(registry.size).toBe(1);
|
||||
});
|
||||
|
||||
it('should retrieve a registered runner by ID', () => {
|
||||
const config = createMockConfig();
|
||||
const id = registry.registerRunner(config);
|
||||
const runner = registry.getRunner(id);
|
||||
|
||||
expect(runner).toBeDefined();
|
||||
expect(runner!.id).toBe(id);
|
||||
expect(runner!.state).toBe('starting');
|
||||
expect(runner!.unityVersion).toBe('2022.3.0f1');
|
||||
expect(runner!.platform).toBe('StandaloneWindows64');
|
||||
});
|
||||
|
||||
it('should return undefined for unknown runner ID', () => {
|
||||
const runner = registry.getRunner('nonexistent');
|
||||
expect(runner).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should unregister a runner', () => {
|
||||
const id = registry.registerRunner(createMockConfig());
|
||||
expect(registry.size).toBe(1);
|
||||
|
||||
registry.unregisterRunner(id);
|
||||
expect(registry.size).toBe(0);
|
||||
expect(registry.getRunner(id)).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should handle unregistering a nonexistent runner gracefully', () => {
|
||||
registry.unregisterRunner('nonexistent');
|
||||
expect(registry.size).toBe(0);
|
||||
});
|
||||
|
||||
it('should list all runners without filter', () => {
|
||||
registry.registerRunner(createMockConfig({ platform: 'StandaloneWindows64' }));
|
||||
registry.registerRunner(createMockConfig({ platform: 'StandaloneLinux64' }));
|
||||
registry.registerRunner(createMockConfig({ platform: 'StandaloneOSX' }));
|
||||
|
||||
const all = registry.listRunners();
|
||||
expect(all).toHaveLength(3);
|
||||
});
|
||||
|
||||
it('should filter runners by platform', () => {
|
||||
registry.registerRunner(createMockConfig({ platform: 'StandaloneWindows64' }));
|
||||
registry.registerRunner(createMockConfig({ platform: 'StandaloneLinux64' }));
|
||||
registry.registerRunner(createMockConfig({ platform: 'StandaloneWindows64' }));
|
||||
|
||||
const windows = registry.listRunners({ platform: 'StandaloneWindows64' });
|
||||
expect(windows).toHaveLength(2);
|
||||
|
||||
const linux = registry.listRunners({ platform: 'StandaloneLinux64' });
|
||||
expect(linux).toHaveLength(1);
|
||||
});
|
||||
|
||||
it('should filter runners by state', () => {
|
||||
const id1 = registry.registerRunner(createMockConfig());
|
||||
registry.registerRunner(createMockConfig());
|
||||
|
||||
registry.updateRunner(id1, { state: 'idle' });
|
||||
|
||||
// second runner remains in 'starting' state
|
||||
|
||||
const idle = registry.listRunners({ state: 'idle' });
|
||||
expect(idle).toHaveLength(1);
|
||||
expect(idle[0].id).toBe(id1);
|
||||
});
|
||||
|
||||
it('should filter runners by Unity version', () => {
|
||||
registry.registerRunner(createMockConfig({ unityVersion: '2022.3.0f1' }));
|
||||
registry.registerRunner(createMockConfig({ unityVersion: '2023.1.0f1' }));
|
||||
registry.registerRunner(createMockConfig({ unityVersion: '2022.3.0f1' }));
|
||||
|
||||
const v2022 = registry.listRunners({ unityVersion: '2022.3.0f1' });
|
||||
expect(v2022).toHaveLength(2);
|
||||
});
|
||||
|
||||
it('should find an available idle runner matching requirements', () => {
|
||||
const id1 = registry.registerRunner(
|
||||
createMockConfig({ unityVersion: '2022.3.0f1', platform: 'StandaloneWindows64' }),
|
||||
);
|
||||
registry.updateRunner(id1, { state: 'idle' });
|
||||
|
||||
const id2 = registry.registerRunner(
|
||||
createMockConfig({ unityVersion: '2023.1.0f1', platform: 'StandaloneLinux64' }),
|
||||
);
|
||||
registry.updateRunner(id2, { state: 'idle' });
|
||||
|
||||
const found = registry.findAvailableRunner({
|
||||
unityVersion: '2022.3.0f1',
|
||||
platform: 'StandaloneWindows64',
|
||||
});
|
||||
|
||||
expect(found).toBeDefined();
|
||||
expect(found!.id).toBe(id1);
|
||||
});
|
||||
|
||||
it('should return undefined when no runner matches requirements', () => {
|
||||
const id = registry.registerRunner(
|
||||
createMockConfig({ unityVersion: '2022.3.0f1', platform: 'StandaloneWindows64' }),
|
||||
);
|
||||
registry.updateRunner(id, { state: 'idle' });
|
||||
|
||||
const found = registry.findAvailableRunner({
|
||||
unityVersion: '2023.1.0f1',
|
||||
platform: 'StandaloneLinux64',
|
||||
});
|
||||
|
||||
expect(found).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should update runner status fields', () => {
|
||||
const id = registry.registerRunner(createMockConfig());
|
||||
registry.updateRunner(id, { state: 'idle', memoryUsageMB: 2048 });
|
||||
|
||||
const runner = registry.getRunner(id);
|
||||
expect(runner!.state).toBe('idle');
|
||||
expect(runner!.memoryUsageMB).toBe(2048);
|
||||
|
||||
// ID should not be overridden by the update
|
||||
expect(runner!.id).toBe(id);
|
||||
});
|
||||
|
||||
it('should persist and load registry from disk', () => {
|
||||
const persistenceRegistry = new HotRunnerRegistry('/tmp/test');
|
||||
mockFs.existsSync.mockReturnValue(true);
|
||||
mockFs.writeFileSync.mockImplementation(() => {});
|
||||
mockFs.mkdirSync.mockImplementation(() => '' as any);
|
||||
|
||||
const id = persistenceRegistry.registerRunner(createMockConfig());
|
||||
|
||||
// Verify writeFileSync was called for persistence
|
||||
expect(mockFs.writeFileSync).toHaveBeenCalled();
|
||||
const writtenData = JSON.parse((mockFs.writeFileSync as jest.Mock).mock.calls[0][1] as string);
|
||||
expect(writtenData.runners).toBeDefined();
|
||||
expect(writtenData.runners[id]).toBeDefined();
|
||||
});
|
||||
|
||||
it('should load runners from disk on loadFromDisk', () => {
|
||||
const persistenceRegistry = new HotRunnerRegistry('/tmp/test');
|
||||
const storedData = {
|
||||
runners: {
|
||||
'hr-restored': {
|
||||
id: 'hr-restored',
|
||||
state: 'idle',
|
||||
unityVersion: '2022.3.0f1',
|
||||
platform: 'StandaloneWindows64',
|
||||
uptime: 100,
|
||||
jobsCompleted: 3,
|
||||
lastHealthCheck: new Date().toISOString(),
|
||||
memoryUsageMB: 512,
|
||||
},
|
||||
},
|
||||
configs: {
|
||||
'hr-restored': createMockConfig(),
|
||||
},
|
||||
};
|
||||
|
||||
mockFs.existsSync.mockReturnValue(true);
|
||||
mockFs.readFileSync.mockReturnValue(JSON.stringify(storedData));
|
||||
|
||||
const count = persistenceRegistry.loadFromDisk();
|
||||
expect(count).toBe(1);
|
||||
expect(persistenceRegistry.getRunner('hr-restored')).toBeDefined();
|
||||
});
|
||||
|
||||
it('should discard invalid runner entries when loading from disk', () => {
|
||||
const persistenceRegistry = new HotRunnerRegistry('/tmp/test');
|
||||
const storedData = {
|
||||
runners: {
|
||||
'hr-valid': {
|
||||
id: 'hr-valid',
|
||||
state: 'idle',
|
||||
unityVersion: '2022.3.0f1',
|
||||
platform: 'StandaloneWindows64',
|
||||
uptime: 100,
|
||||
jobsCompleted: 3,
|
||||
lastHealthCheck: new Date().toISOString(),
|
||||
memoryUsageMB: 512,
|
||||
},
|
||||
'hr-invalid': {
|
||||
// Missing required fields like state, unityVersion
|
||||
id: 'hr-invalid',
|
||||
},
|
||||
'hr-bad-state': {
|
||||
id: 'hr-bad-state',
|
||||
state: 'nonexistent-state',
|
||||
unityVersion: '2022.3.0f1',
|
||||
platform: 'StandaloneWindows64',
|
||||
uptime: 0,
|
||||
jobsCompleted: 0,
|
||||
lastHealthCheck: new Date().toISOString(),
|
||||
memoryUsageMB: 0,
|
||||
},
|
||||
},
|
||||
configs: {
|
||||
'hr-valid': createMockConfig(),
|
||||
'hr-invalid': createMockConfig(),
|
||||
'hr-bad-state': createMockConfig(),
|
||||
},
|
||||
};
|
||||
|
||||
mockFs.existsSync.mockReturnValue(true);
|
||||
mockFs.readFileSync.mockReturnValue(JSON.stringify(storedData));
|
||||
|
||||
const count = persistenceRegistry.loadFromDisk();
|
||||
expect(count).toBe(1);
|
||||
expect(persistenceRegistry.getRunner('hr-valid')).toBeDefined();
|
||||
expect(persistenceRegistry.getRunner('hr-invalid')).toBeUndefined();
|
||||
expect(persistenceRegistry.getRunner('hr-bad-state')).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should handle corrupt JSON persistence file gracefully', () => {
|
||||
const persistenceRegistry = new HotRunnerRegistry('/tmp/test');
|
||||
|
||||
mockFs.existsSync.mockReturnValue(true);
|
||||
mockFs.readFileSync.mockReturnValue('{ invalid json !!!');
|
||||
|
||||
const count = persistenceRegistry.loadFromDisk();
|
||||
expect(count).toBe(0);
|
||||
expect(persistenceRegistry.size).toBe(0);
|
||||
});
|
||||
|
||||
it('should handle persistence file with invalid top-level structure', () => {
|
||||
const persistenceRegistry = new HotRunnerRegistry('/tmp/test');
|
||||
|
||||
mockFs.existsSync.mockReturnValue(true);
|
||||
mockFs.readFileSync.mockReturnValue('"just a string"');
|
||||
|
||||
const count = persistenceRegistry.loadFromDisk();
|
||||
expect(count).toBe(0);
|
||||
});
|
||||
|
||||
it('should handle persistence file with null runners', () => {
|
||||
const persistenceRegistry = new HotRunnerRegistry('/tmp/test');
|
||||
|
||||
mockFs.existsSync.mockReturnValue(true);
|
||||
mockFs.readFileSync.mockReturnValue('{"runners": null, "configs": null}');
|
||||
|
||||
const count = persistenceRegistry.loadFromDisk();
|
||||
expect(count).toBe(0);
|
||||
});
|
||||
|
||||
it('should validate and repair invalid runners', () => {
|
||||
const persistenceRegistry = new HotRunnerRegistry('/tmp/test');
|
||||
mockFs.existsSync.mockReturnValue(true);
|
||||
mockFs.writeFileSync.mockImplementation(() => {});
|
||||
mockFs.mkdirSync.mockImplementation(() => '' as any);
|
||||
|
||||
// Register a valid runner first
|
||||
const id = persistenceRegistry.registerRunner(createMockConfig());
|
||||
persistenceRegistry.updateRunner(id, { state: 'idle' });
|
||||
|
||||
// Manually corrupt the runner's state by setting an invalid state
|
||||
// (we access via the public API -- updateRunner with a cast)
|
||||
persistenceRegistry.updateRunner(id, { state: 'invalid-state' as any });
|
||||
|
||||
const repaired = persistenceRegistry.validateAndRepair();
|
||||
expect(repaired).toBe(1);
|
||||
|
||||
const runner = persistenceRegistry.getRunner(id);
|
||||
expect(runner!.state).toBe('unhealthy');
|
||||
});
|
||||
|
||||
it('should not discard configs for valid runners when loading from disk', () => {
|
||||
const persistenceRegistry = new HotRunnerRegistry('/tmp/test');
|
||||
const storedData = {
|
||||
runners: {
|
||||
'hr-valid': {
|
||||
id: 'hr-valid',
|
||||
state: 'idle',
|
||||
unityVersion: '2022.3.0f1',
|
||||
platform: 'StandaloneWindows64',
|
||||
uptime: 100,
|
||||
jobsCompleted: 3,
|
||||
lastHealthCheck: new Date().toISOString(),
|
||||
memoryUsageMB: 512,
|
||||
},
|
||||
},
|
||||
configs: {
|
||||
'hr-valid': createMockConfig(),
|
||||
},
|
||||
};
|
||||
|
||||
mockFs.existsSync.mockReturnValue(true);
|
||||
mockFs.readFileSync.mockReturnValue(JSON.stringify(storedData));
|
||||
|
||||
persistenceRegistry.loadFromDisk();
|
||||
expect(persistenceRegistry.getConfig('hr-valid')).toBeDefined();
|
||||
});
|
||||
});
|
||||
|
||||
// --- Health Monitor Tests ---
|
||||
|
||||
describe('HotRunnerHealthMonitor', () => {
|
||||
let monitor: HotRunnerHealthMonitor;
|
||||
let registry: HotRunnerRegistry;
|
||||
let transports: Map<string, HotRunnerTransport>;
|
||||
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
jest.useFakeTimers();
|
||||
monitor = new HotRunnerHealthMonitor();
|
||||
registry = new HotRunnerRegistry();
|
||||
transports = new Map();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
monitor.stopMonitoring();
|
||||
jest.useRealTimers();
|
||||
});
|
||||
|
||||
it('should start and stop monitoring', () => {
|
||||
monitor.startMonitoring(registry, 30, transports);
|
||||
expect(monitor.isMonitoring).toBe(true);
|
||||
|
||||
monitor.stopMonitoring();
|
||||
expect(monitor.isMonitoring).toBe(false);
|
||||
});
|
||||
|
||||
it('should report healthy when transport health check passes', async () => {
|
||||
const id = registry.registerRunner(createMockConfig());
|
||||
registry.updateRunner(id, { state: 'idle' });
|
||||
|
||||
const transport = createMockTransport();
|
||||
transports.set(id, transport);
|
||||
monitor.startMonitoring(registry, 30, transports);
|
||||
|
||||
const healthy = await monitor.checkHealth(id);
|
||||
expect(healthy).toBe(true);
|
||||
expect(transport.healthCheck).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should mark runner as unhealthy when health check fails', async () => {
|
||||
const id = registry.registerRunner(createMockConfig());
|
||||
registry.updateRunner(id, { state: 'idle' });
|
||||
|
||||
const transport = createMockTransport({
|
||||
healthCheck: jest.fn().mockResolvedValue(false),
|
||||
});
|
||||
transports.set(id, transport);
|
||||
monitor.startMonitoring(registry, 30, transports);
|
||||
|
||||
const healthy = await monitor.checkHealth(id);
|
||||
expect(healthy).toBe(false);
|
||||
|
||||
const runner = registry.getRunner(id);
|
||||
expect(runner!.state).toBe('unhealthy');
|
||||
});
|
||||
|
||||
it('should mark runner as unhealthy when health check throws', async () => {
|
||||
const id = registry.registerRunner(createMockConfig());
|
||||
registry.updateRunner(id, { state: 'idle' });
|
||||
|
||||
const transport = createMockTransport({
|
||||
healthCheck: jest.fn().mockRejectedValue(new Error('Connection refused')),
|
||||
});
|
||||
transports.set(id, transport);
|
||||
monitor.startMonitoring(registry, 30, transports);
|
||||
|
||||
const healthy = await monitor.checkHealth(id);
|
||||
expect(healthy).toBe(false);
|
||||
});
|
||||
|
||||
it('should recycle unhealthy runner and remove from registry', async () => {
|
||||
const id = registry.registerRunner(createMockConfig());
|
||||
const transport = createMockTransport();
|
||||
transports.set(id, transport);
|
||||
monitor.startMonitoring(registry, 30, transports);
|
||||
|
||||
await monitor.recycleUnhealthyRunner(id);
|
||||
|
||||
expect(registry.getRunner(id)).toBeUndefined();
|
||||
expect(transport.disconnect).toHaveBeenCalled();
|
||||
expect(transports.has(id)).toBe(false);
|
||||
});
|
||||
|
||||
it('should recycle idle runner when max idle time exceeded', async () => {
|
||||
const id = registry.registerRunner(createMockConfig({ maxIdleTime: 60 }));
|
||||
|
||||
// Set lastHealthCheck to 120 seconds ago
|
||||
const oldDate = new Date(Date.now() - 120 * 1000).toISOString();
|
||||
registry.updateRunner(id, { state: 'idle', lastHealthCheck: oldDate });
|
||||
|
||||
const transport = createMockTransport();
|
||||
transports.set(id, transport);
|
||||
monitor.startMonitoring(registry, 30, transports);
|
||||
|
||||
await monitor.recycleIdleRunner(id, 60);
|
||||
|
||||
expect(registry.getRunner(id)).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should not recycle idle runner when within max idle time', async () => {
|
||||
const id = registry.registerRunner(createMockConfig({ maxIdleTime: 3600 }));
|
||||
registry.updateRunner(id, {
|
||||
state: 'idle',
|
||||
lastHealthCheck: new Date().toISOString(),
|
||||
});
|
||||
|
||||
const transport = createMockTransport();
|
||||
transports.set(id, transport);
|
||||
monitor.startMonitoring(registry, 30, transports);
|
||||
|
||||
await monitor.recycleIdleRunner(id, 3600);
|
||||
|
||||
// Runner should still exist
|
||||
expect(registry.getRunner(id)).toBeDefined();
|
||||
});
|
||||
|
||||
it('should return false when no transport exists for runner', async () => {
|
||||
const id = registry.registerRunner(createMockConfig());
|
||||
|
||||
// Do not set any transport for this runner
|
||||
monitor.startMonitoring(registry, 30, transports);
|
||||
|
||||
const healthy = await monitor.checkHealth(id);
|
||||
expect(healthy).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
// --- Dispatcher Tests ---
|
||||
|
||||
describe('HotRunnerDispatcher', () => {
|
||||
let registry: HotRunnerRegistry;
|
||||
let transports: Map<string, HotRunnerTransport>;
|
||||
let dispatcher: HotRunnerDispatcher;
|
||||
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
registry = new HotRunnerRegistry();
|
||||
transports = new Map();
|
||||
dispatcher = new HotRunnerDispatcher(transports);
|
||||
});
|
||||
|
||||
it('should dispatch a job to an available runner', async () => {
|
||||
const id = registry.registerRunner(createMockConfig());
|
||||
registry.updateRunner(id, { state: 'idle' });
|
||||
|
||||
const transport = createMockTransport();
|
||||
transports.set(id, transport);
|
||||
|
||||
const request = createMockJobRequest();
|
||||
const result = await dispatcher.dispatchJob(request, registry, '2022.3.0f1');
|
||||
|
||||
expect(result.success).toBe(true);
|
||||
expect(result.exitCode).toBe(0);
|
||||
expect(transport.sendJob).toHaveBeenCalledWith(request);
|
||||
});
|
||||
|
||||
it('should mark runner as busy during job execution', async () => {
|
||||
const id = registry.registerRunner(createMockConfig());
|
||||
registry.updateRunner(id, { state: 'idle' });
|
||||
|
||||
const statesDuringJob: string[] = [];
|
||||
const transport = createMockTransport({
|
||||
sendJob: jest.fn().mockImplementation(async () => {
|
||||
const runner = registry.getRunner(id);
|
||||
if (runner) statesDuringJob.push(runner.state);
|
||||
|
||||
return {
|
||||
jobId: 'job-001',
|
||||
success: true,
|
||||
exitCode: 0,
|
||||
duration: 1000,
|
||||
output: 'ok',
|
||||
};
|
||||
}),
|
||||
});
|
||||
transports.set(id, transport);
|
||||
|
||||
await dispatcher.dispatchJob(createMockJobRequest(), registry, '2022.3.0f1');
|
||||
|
||||
expect(statesDuringJob).toContain('busy');
|
||||
|
||||
// After completion, should be idle again
|
||||
const runner = registry.getRunner(id);
|
||||
expect(runner!.state).toBe('idle');
|
||||
});
|
||||
|
||||
it('should increment jobsCompleted after successful dispatch', async () => {
|
||||
const id = registry.registerRunner(createMockConfig());
|
||||
registry.updateRunner(id, { state: 'idle', jobsCompleted: 5 });
|
||||
|
||||
const transport = createMockTransport();
|
||||
transports.set(id, transport);
|
||||
|
||||
await dispatcher.dispatchJob(createMockJobRequest(), registry, '2022.3.0f1');
|
||||
|
||||
const runner = registry.getRunner(id);
|
||||
expect(runner!.jobsCompleted).toBe(6);
|
||||
});
|
||||
|
||||
it('should throw when no runner is available and wait times out', async () => {
|
||||
// No runners registered at all
|
||||
const request = createMockJobRequest({ timeout: 100 });
|
||||
|
||||
await expect(dispatcher.dispatchJob(request, registry, '2022.3.0f1')).rejects.toThrow(/Timed out waiting/);
|
||||
});
|
||||
|
||||
it('should throw when runner has no transport', async () => {
|
||||
const id = registry.registerRunner(createMockConfig());
|
||||
registry.updateRunner(id, { state: 'idle' });
|
||||
|
||||
// No transport set for this runner
|
||||
|
||||
const request = createMockJobRequest();
|
||||
|
||||
await expect(dispatcher.dispatchJob(request, registry, '2022.3.0f1')).rejects.toThrow(/No transport available/);
|
||||
});
|
||||
|
||||
it('should handle job failure and return runner to idle', async () => {
|
||||
const id = registry.registerRunner(createMockConfig());
|
||||
registry.updateRunner(id, { state: 'idle' });
|
||||
|
||||
const transport = createMockTransport({
|
||||
sendJob: jest.fn().mockRejectedValue(new Error('Unity crashed')),
|
||||
});
|
||||
transports.set(id, transport);
|
||||
|
||||
await expect(dispatcher.dispatchJob(createMockJobRequest(), registry, '2022.3.0f1')).rejects.toThrow(
|
||||
'Unity crashed',
|
||||
);
|
||||
|
||||
// Runner should be back to idle despite failure
|
||||
const runner = registry.getRunner(id);
|
||||
expect(runner!.state).toBe('idle');
|
||||
});
|
||||
|
||||
it('should handle job timeout', async () => {
|
||||
const id = registry.registerRunner(createMockConfig());
|
||||
registry.updateRunner(id, { state: 'idle' });
|
||||
|
||||
const transport = createMockTransport({
|
||||
sendJob: jest.fn().mockImplementation(
|
||||
() => new Promise((resolve) => setTimeout(resolve, 60000)), // never resolves within timeout
|
||||
),
|
||||
});
|
||||
transports.set(id, transport);
|
||||
|
||||
const request = createMockJobRequest({ timeout: 50 });
|
||||
|
||||
await expect(dispatcher.dispatchJob(request, registry, '2022.3.0f1')).rejects.toThrow(/timed out/);
|
||||
});
|
||||
|
||||
it('should disconnect transport on job timeout', async () => {
|
||||
const id = registry.registerRunner(createMockConfig());
|
||||
registry.updateRunner(id, { state: 'idle' });
|
||||
|
||||
const transport = createMockTransport({
|
||||
sendJob: jest.fn().mockImplementation(
|
||||
() => new Promise((resolve) => setTimeout(resolve, 60000)), // never resolves within timeout
|
||||
),
|
||||
});
|
||||
transports.set(id, transport);
|
||||
|
||||
const request = createMockJobRequest({ timeout: 50 });
|
||||
|
||||
await expect(dispatcher.dispatchJob(request, registry, '2022.3.0f1')).rejects.toThrow(/timed out/);
|
||||
|
||||
// Transport should have been disconnected to clean up orphaned connection
|
||||
expect(transport.disconnect).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should call output callback with job output', async () => {
|
||||
const id = registry.registerRunner(createMockConfig());
|
||||
registry.updateRunner(id, { state: 'idle' });
|
||||
|
||||
const transport = createMockTransport();
|
||||
transports.set(id, transport);
|
||||
|
||||
const outputCallback = jest.fn();
|
||||
await dispatcher.dispatchJob(createMockJobRequest(), registry, '2022.3.0f1', outputCallback);
|
||||
|
||||
expect(outputCallback).toHaveBeenCalledWith('Build succeeded');
|
||||
});
|
||||
|
||||
it('should wait for runner to become available', async () => {
|
||||
const id = registry.registerRunner(createMockConfig());
|
||||
|
||||
// Runner starts in 'starting' state, not idle
|
||||
|
||||
const transport = createMockTransport();
|
||||
transports.set(id, transport);
|
||||
|
||||
// Simulate runner becoming idle after a short delay
|
||||
setTimeout(() => {
|
||||
registry.updateRunner(id, { state: 'idle' });
|
||||
}, 50);
|
||||
|
||||
const request = createMockJobRequest({ timeout: 5000 });
|
||||
const result = await dispatcher.dispatchJob(request, registry, '2022.3.0f1');
|
||||
|
||||
expect(result.success).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
// --- Service Integration Tests ---
|
||||
|
||||
describe('HotRunnerService', () => {
|
||||
let service: HotRunnerService;
|
||||
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
mockFs.existsSync.mockReturnValue(false);
|
||||
service = new HotRunnerService();
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await service.shutdown();
|
||||
});
|
||||
|
||||
it('should initialize and shut down cleanly', async () => {
|
||||
const config = createMockConfig();
|
||||
await service.initialize(config);
|
||||
|
||||
const status = service.getStatus();
|
||||
expect(status).toEqual([]);
|
||||
|
||||
await service.shutdown();
|
||||
});
|
||||
|
||||
it('should register a runner with transport', async () => {
|
||||
await service.initialize(createMockConfig());
|
||||
|
||||
const transport = createMockTransport();
|
||||
const id = service.registerRunner(createMockConfig(), transport);
|
||||
|
||||
expect(id).toMatch(/^hr-/);
|
||||
expect(service.getStatus()).toHaveLength(1);
|
||||
});
|
||||
|
||||
it('should disconnect all transports on shutdown', async () => {
|
||||
await service.initialize(createMockConfig());
|
||||
|
||||
const transport1 = createMockTransport();
|
||||
const transport2 = createMockTransport();
|
||||
service.registerRunner(createMockConfig(), transport1);
|
||||
service.registerRunner(createMockConfig(), transport2);
|
||||
|
||||
await service.shutdown();
|
||||
|
||||
expect(transport1.disconnect).toHaveBeenCalled();
|
||||
expect(transport2.disconnect).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should expose the underlying registry', async () => {
|
||||
await service.initialize(createMockConfig());
|
||||
const registry = service.getRegistry();
|
||||
|
||||
expect(registry).toBeInstanceOf(HotRunnerRegistry);
|
||||
});
|
||||
});
|
||||
11
src/model/orchestrator/services/hot-runner/index.ts
Normal file
11
src/model/orchestrator/services/hot-runner/index.ts
Normal file
@@ -0,0 +1,11 @@
|
||||
export { HotRunnerService } from './hot-runner-service';
|
||||
export { HotRunnerRegistry } from './hot-runner-registry';
|
||||
export { HotRunnerHealthMonitor } from './hot-runner-health-monitor';
|
||||
export { HotRunnerDispatcher } from './hot-runner-dispatcher';
|
||||
export type {
|
||||
HotRunnerConfig,
|
||||
HotRunnerStatus,
|
||||
HotRunnerJobRequest,
|
||||
HotRunnerJobResult,
|
||||
HotRunnerTransport,
|
||||
} from './hot-runner-types';
|
||||
@@ -1,315 +0,0 @@
|
||||
import fs from 'node:fs';
|
||||
import path from 'node:path';
|
||||
import { OrchestratorSystem } from '../core/orchestrator-system';
|
||||
import OrchestratorLogger from '../core/orchestrator-logger';
|
||||
import { SyncState, SyncStrategy } from './sync-state';
|
||||
import { SyncStateManager } from './sync-state-manager';
|
||||
|
||||
/**
|
||||
* Service for incremental workspace synchronization.
|
||||
*
|
||||
* Supports multiple sync strategies:
|
||||
* - full: Traditional clone + cache restore (default)
|
||||
* - git-delta: Fetch and apply only changed files since last sync
|
||||
* - direct-input: Apply file changes passed as job input (no git push required)
|
||||
* - storage-pull: Fetch changed files from rclone-backed generic storage
|
||||
*/
|
||||
export class IncrementalSyncService {
|
||||
/**
|
||||
* Load sync state from the workspace.
|
||||
*/
|
||||
static loadSyncState(workspacePath: string, statePath?: string): SyncState | undefined {
|
||||
return SyncStateManager.loadState(workspacePath, statePath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Save sync state to the workspace.
|
||||
*/
|
||||
static saveSyncState(workspacePath: string, state: SyncState, statePath?: string): void {
|
||||
SyncStateManager.saveState(workspacePath, state, statePath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine the appropriate sync strategy based on workspace state and configuration.
|
||||
*/
|
||||
static resolveStrategy(requestedStrategy: SyncStrategy, workspacePath: string, statePath?: string): SyncStrategy {
|
||||
if (requestedStrategy === 'full') {
|
||||
return 'full';
|
||||
}
|
||||
|
||||
// git-delta requires an existing sync state
|
||||
if (requestedStrategy === 'git-delta') {
|
||||
const state = SyncStateManager.loadState(workspacePath, statePath);
|
||||
if (!state) {
|
||||
OrchestratorLogger.log('[Sync] No sync state found, falling back to full sync');
|
||||
|
||||
return 'full';
|
||||
}
|
||||
|
||||
return 'git-delta';
|
||||
}
|
||||
|
||||
return requestedStrategy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a git-delta sync: fetch latest and apply only changed files.
|
||||
*
|
||||
* @param workspacePath - Path to the git workspace
|
||||
* @param targetReference - Git ref to sync to (commit SHA, branch, tag)
|
||||
* @param statePath - Optional custom path for sync state file
|
||||
* @returns Number of files changed
|
||||
*/
|
||||
static async syncGitDelta(workspacePath: string, targetReference: string, statePath?: string): Promise<number> {
|
||||
const state = SyncStateManager.loadState(workspacePath, statePath);
|
||||
if (!state) {
|
||||
throw new Error('Cannot git-delta sync without existing sync state');
|
||||
}
|
||||
|
||||
OrchestratorLogger.log(`[Sync] Git delta: ${state.lastSyncCommit.slice(0, 8)} -> ${targetReference.slice(0, 8)}`);
|
||||
|
||||
// Fetch latest
|
||||
await OrchestratorSystem.Run(`git -C "${workspacePath}" fetch origin`, true);
|
||||
|
||||
// Get list of changed files
|
||||
const diffOutput = await OrchestratorSystem.Run(
|
||||
`git -C "${workspacePath}" diff --name-only ${state.lastSyncCommit}..${targetReference}`,
|
||||
true,
|
||||
);
|
||||
|
||||
const changedFiles = diffOutput.split('\n').filter(Boolean);
|
||||
OrchestratorLogger.log(`[Sync] ${changedFiles.length} file(s) changed`);
|
||||
|
||||
if (changedFiles.length > 0) {
|
||||
// Checkout target ref
|
||||
await OrchestratorSystem.Run(`git -C "${workspacePath}" checkout ${targetReference}`, true);
|
||||
}
|
||||
|
||||
// Update sync state
|
||||
const newState: SyncState = {
|
||||
lastSyncCommit: targetReference,
|
||||
lastSyncTimestamp: new Date().toISOString(),
|
||||
workspaceHash: SyncStateManager.calculateWorkspaceHash(workspacePath),
|
||||
pendingOverlays: state.pendingOverlays,
|
||||
};
|
||||
SyncStateManager.saveState(workspacePath, newState, statePath);
|
||||
|
||||
return changedFiles.length;
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply a direct input overlay from a local archive or storage URI.
|
||||
*
|
||||
* For storage URIs (storage://remote:bucket/path), the archive is fetched via rclone.
|
||||
* For local paths, the archive is extracted directly.
|
||||
*
|
||||
* @param workspacePath - Path to the workspace
|
||||
* @param inputReference - Local path or storage:// URI to the input archive
|
||||
* @param rcloneRemote - rclone remote name for storage:// URIs (optional, uses URI-embedded remote)
|
||||
* @param statePath - Optional custom path for sync state file
|
||||
* @returns List of overlay paths applied
|
||||
*/
|
||||
static async applyDirectInput(
|
||||
workspacePath: string,
|
||||
inputReference: string,
|
||||
rcloneRemote?: string,
|
||||
statePath?: string,
|
||||
): Promise<string[]> {
|
||||
let localArchive = inputReference;
|
||||
|
||||
// If storage URI, fetch via rclone first
|
||||
if (inputReference.startsWith('storage://')) {
|
||||
const parsed = IncrementalSyncService.parseStorageUri(inputReference);
|
||||
const remote = rcloneRemote || parsed.remote;
|
||||
const remotePath = parsed.path;
|
||||
|
||||
localArchive = path.join(workspacePath, '.game-ci-input-overlay.tar');
|
||||
OrchestratorLogger.log(`[Sync] Fetching input from storage: ${inputReference}`);
|
||||
|
||||
await IncrementalSyncService.executeRcloneCopy(remote, remotePath, path.dirname(localArchive));
|
||||
}
|
||||
|
||||
if (!fs.existsSync(localArchive)) {
|
||||
throw new Error(`Input archive not found: ${localArchive}`);
|
||||
}
|
||||
|
||||
OrchestratorLogger.log(`[Sync] Applying direct input overlay from ${localArchive}`);
|
||||
|
||||
// Extract overlay
|
||||
await OrchestratorSystem.Run(`tar -xf "${localArchive}" -C "${workspacePath}"`, true);
|
||||
|
||||
// Track overlay in sync state
|
||||
const state = SyncStateManager.loadState(workspacePath, statePath) || {
|
||||
lastSyncCommit: '',
|
||||
lastSyncTimestamp: new Date().toISOString(),
|
||||
pendingOverlays: [],
|
||||
};
|
||||
|
||||
state.pendingOverlays.push(localArchive);
|
||||
SyncStateManager.saveState(workspacePath, state, statePath);
|
||||
|
||||
return [localArchive];
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a storage-pull sync: pull changed files from an rclone remote.
|
||||
*
|
||||
* This strategy fetches content from a remote storage backend (S3, GCS, Azure, etc.)
|
||||
* and overlays it onto the workspace. Supports two modes:
|
||||
* - overlay: extract on top of existing workspace (default)
|
||||
* - clean: fresh git checkout, then apply overlay
|
||||
*
|
||||
* @param workspacePath - Path to the workspace
|
||||
* @param storageUri - storage://remote:bucket/path URI pointing to remote content
|
||||
* @param options - Configuration for the storage-pull operation
|
||||
* @returns List of files pulled from storage
|
||||
*/
|
||||
static async syncStoragePull(
|
||||
workspacePath: string,
|
||||
storageUri: string,
|
||||
options: {
|
||||
rcloneRemote?: string;
|
||||
cleanMode?: boolean;
|
||||
syncRevertAfter?: boolean;
|
||||
statePath?: string;
|
||||
} = {},
|
||||
): Promise<string[]> {
|
||||
if (!storageUri.startsWith('storage://')) {
|
||||
throw new Error(`Invalid storage URI: ${storageUri}. Must start with storage://`);
|
||||
}
|
||||
|
||||
// Verify rclone is available
|
||||
try {
|
||||
await OrchestratorSystem.Run('rclone version', true, true);
|
||||
} catch {
|
||||
throw new Error('rclone binary not found. Install rclone to use storage-pull sync strategy.');
|
||||
}
|
||||
|
||||
const parsed = IncrementalSyncService.parseStorageUri(storageUri);
|
||||
const remote = options.rcloneRemote || parsed.remote;
|
||||
const remotePath = parsed.path;
|
||||
|
||||
OrchestratorLogger.log(`[Sync] Storage pull: ${remote}:${remotePath} -> ${workspacePath}`);
|
||||
|
||||
// Clean mode: reset workspace to clean git state before applying overlay
|
||||
if (options.cleanMode) {
|
||||
OrchestratorLogger.log('[Sync] Clean mode: resetting workspace to HEAD');
|
||||
await OrchestratorSystem.Run(`git -C "${workspacePath}" checkout -- .`, true);
|
||||
await OrchestratorSystem.Run(`git -C "${workspacePath}" clean -fd`, true);
|
||||
}
|
||||
|
||||
// Pull from remote storage directly into workspace
|
||||
const rcloneSource = `${remote}:${remotePath}`;
|
||||
await OrchestratorSystem.Run(`rclone copy "${rcloneSource}" "${workspacePath}" --transfers 8 --checkers 16`, true);
|
||||
|
||||
// List what was pulled for tracking
|
||||
let pulledFiles: string[] = [];
|
||||
try {
|
||||
const lsOutput = await OrchestratorSystem.Run(`rclone ls "${rcloneSource}"`, true, true);
|
||||
pulledFiles = lsOutput
|
||||
.split('\n')
|
||||
.filter(Boolean)
|
||||
.map((line) => {
|
||||
// rclone ls outputs: " <size> <path>"
|
||||
const trimmed = line.trim();
|
||||
const spaceIndex = trimmed.indexOf(' ');
|
||||
|
||||
return spaceIndex >= 0 ? trimmed.slice(spaceIndex + 1).trim() : trimmed;
|
||||
})
|
||||
.filter(Boolean);
|
||||
} catch {
|
||||
OrchestratorLogger.logWarning('[Sync] Could not list pulled files from remote');
|
||||
}
|
||||
|
||||
OrchestratorLogger.log(`[Sync] Pulled ${pulledFiles.length} file(s) from storage`);
|
||||
|
||||
// Update sync state with overlay tracking
|
||||
const state = SyncStateManager.loadState(workspacePath, options.statePath) || {
|
||||
lastSyncCommit: '',
|
||||
lastSyncTimestamp: new Date().toISOString(),
|
||||
pendingOverlays: [],
|
||||
};
|
||||
|
||||
state.pendingOverlays.push(storageUri);
|
||||
state.lastSyncTimestamp = new Date().toISOString();
|
||||
state.workspaceHash = SyncStateManager.calculateWorkspaceHash(workspacePath);
|
||||
SyncStateManager.saveState(workspacePath, state, options.statePath);
|
||||
|
||||
return pulledFiles;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse a storage:// URI into remote and path components.
|
||||
*
|
||||
* Supported formats:
|
||||
* - storage://remote:bucket/path (explicit remote with colon separator)
|
||||
* - storage://remote/path (remote name is first path segment)
|
||||
*
|
||||
* @param uri - The storage:// URI to parse
|
||||
* @returns Object with remote name and path
|
||||
*/
|
||||
static parseStorageUri(uri: string): { remote: string; path: string } {
|
||||
if (!uri.startsWith('storage://')) {
|
||||
throw new Error(`Invalid storage URI: ${uri}. Must start with storage://`);
|
||||
}
|
||||
|
||||
const stripped = uri.replace('storage://', '');
|
||||
|
||||
// Check for explicit remote:path format (e.g., "myremote:bucket/path")
|
||||
const colonIndex = stripped.indexOf(':');
|
||||
if (colonIndex > 0) {
|
||||
return {
|
||||
remote: stripped.slice(0, colonIndex),
|
||||
path: stripped.slice(colonIndex + 1),
|
||||
};
|
||||
}
|
||||
|
||||
// Fallback: first segment is remote name (e.g., "myremote/bucket/path")
|
||||
const slashIndex = stripped.indexOf('/');
|
||||
if (slashIndex > 0) {
|
||||
return {
|
||||
remote: stripped.slice(0, slashIndex),
|
||||
path: stripped.slice(slashIndex + 1),
|
||||
};
|
||||
}
|
||||
|
||||
// Just a remote name with no path
|
||||
return {
|
||||
remote: stripped,
|
||||
path: '',
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute rclone copy with standard flags.
|
||||
*/
|
||||
private static async executeRcloneCopy(remote: string, remotePath: string, destinationPath: string): Promise<void> {
|
||||
await OrchestratorSystem.Run(
|
||||
`rclone copy "${remote}:${remotePath}" "${destinationPath}" --transfers 8 --checkers 16`,
|
||||
true,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Revert pending overlays by restoring git state.
|
||||
*/
|
||||
static async revertOverlays(workspacePath: string, statePath?: string): Promise<void> {
|
||||
const state = SyncStateManager.loadState(workspacePath, statePath);
|
||||
if (!state || state.pendingOverlays.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
OrchestratorLogger.log(`[Sync] Reverting ${state.pendingOverlays.length} overlay(s)`);
|
||||
|
||||
await OrchestratorSystem.Run(`git -C "${workspacePath}" checkout -- .`, true);
|
||||
|
||||
// Clean untracked files from overlays
|
||||
await OrchestratorSystem.Run(`git -C "${workspacePath}" clean -fd`, true);
|
||||
|
||||
state.pendingOverlays = [];
|
||||
state.workspaceHash = SyncStateManager.calculateWorkspaceHash(workspacePath);
|
||||
SyncStateManager.saveState(workspacePath, state, statePath);
|
||||
|
||||
OrchestratorLogger.log('[Sync] Overlays reverted');
|
||||
}
|
||||
}
|
||||
@@ -1,544 +0,0 @@
|
||||
import fs from 'node:fs';
|
||||
import path from 'node:path';
|
||||
import { IncrementalSyncService } from './incremental-sync-service';
|
||||
import { SyncStateManager } from './sync-state-manager';
|
||||
import { SyncState } from './sync-state';
|
||||
import { OrchestratorSystem } from '../core/orchestrator-system';
|
||||
import OrchestratorLogger from '../core/orchestrator-logger';
|
||||
|
||||
// Mock dependencies
|
||||
jest.mock('node:fs');
|
||||
jest.mock('../core/orchestrator-system');
|
||||
jest.mock('../core/orchestrator-logger');
|
||||
|
||||
const mockFs = fs as jest.Mocked<typeof fs>;
|
||||
const mockSystem = OrchestratorSystem as jest.Mocked<typeof OrchestratorSystem>;
|
||||
const mockLogger = OrchestratorLogger as jest.Mocked<typeof OrchestratorLogger>;
|
||||
|
||||
afterEach(() => {
|
||||
jest.clearAllMocks();
|
||||
jest.restoreAllMocks();
|
||||
});
|
||||
|
||||
describe('IncrementalSyncService', () => {
|
||||
const workspacePath = '/workspace/project';
|
||||
|
||||
describe('parseStorageUri', () => {
|
||||
it('parses storage://remote:bucket/path format', () => {
|
||||
const result = IncrementalSyncService.parseStorageUri('storage://myremote:mybucket/some/path');
|
||||
expect(result).toEqual({ remote: 'myremote', path: 'mybucket/some/path' });
|
||||
});
|
||||
|
||||
it('parses storage://remote/path format', () => {
|
||||
const result = IncrementalSyncService.parseStorageUri('storage://myremote/mybucket/path');
|
||||
expect(result).toEqual({ remote: 'myremote', path: 'mybucket/path' });
|
||||
});
|
||||
|
||||
it('parses storage://remote:bucket with no sub-path', () => {
|
||||
const result = IncrementalSyncService.parseStorageUri('storage://myremote:mybucket');
|
||||
expect(result).toEqual({ remote: 'myremote', path: 'mybucket' });
|
||||
});
|
||||
|
||||
it('handles remote-only URI without path', () => {
|
||||
const result = IncrementalSyncService.parseStorageUri('storage://myremote');
|
||||
expect(result).toEqual({ remote: 'myremote', path: '' });
|
||||
});
|
||||
|
||||
it('throws on invalid URI without storage:// prefix', () => {
|
||||
expect(() => IncrementalSyncService.parseStorageUri('http://example.com')).toThrow('Invalid storage URI');
|
||||
});
|
||||
|
||||
it('throws on empty URI', () => {
|
||||
expect(() => IncrementalSyncService.parseStorageUri('')).toThrow('Invalid storage URI');
|
||||
});
|
||||
});
|
||||
|
||||
describe('resolveStrategy', () => {
|
||||
it('returns full when full is requested', () => {
|
||||
const result = IncrementalSyncService.resolveStrategy('full', workspacePath);
|
||||
expect(result).toBe('full');
|
||||
});
|
||||
|
||||
it('returns git-delta when sync state exists', () => {
|
||||
const state: SyncState = {
|
||||
lastSyncCommit: 'abc123',
|
||||
lastSyncTimestamp: new Date().toISOString(),
|
||||
pendingOverlays: [],
|
||||
};
|
||||
mockFs.existsSync.mockReturnValue(true);
|
||||
mockFs.readFileSync.mockReturnValue(JSON.stringify(state));
|
||||
|
||||
const result = IncrementalSyncService.resolveStrategy('git-delta', workspacePath);
|
||||
expect(result).toBe('git-delta');
|
||||
});
|
||||
|
||||
it('falls back to full when git-delta requested but no sync state', () => {
|
||||
mockFs.existsSync.mockReturnValue(false);
|
||||
|
||||
const result = IncrementalSyncService.resolveStrategy('git-delta', workspacePath);
|
||||
expect(result).toBe('full');
|
||||
});
|
||||
|
||||
it('returns direct-input as-is', () => {
|
||||
const result = IncrementalSyncService.resolveStrategy('direct-input', workspacePath);
|
||||
expect(result).toBe('direct-input');
|
||||
});
|
||||
|
||||
it('returns storage-pull as-is', () => {
|
||||
const result = IncrementalSyncService.resolveStrategy('storage-pull', workspacePath);
|
||||
expect(result).toBe('storage-pull');
|
||||
});
|
||||
});
|
||||
|
||||
describe('syncGitDelta', () => {
|
||||
const targetReference = 'def456789';
|
||||
|
||||
beforeEach(() => {
|
||||
const state: SyncState = {
|
||||
lastSyncCommit: 'abc123456',
|
||||
lastSyncTimestamp: new Date().toISOString(),
|
||||
pendingOverlays: [],
|
||||
};
|
||||
mockFs.existsSync.mockReturnValue(true);
|
||||
mockFs.readFileSync.mockReturnValue(JSON.stringify(state));
|
||||
});
|
||||
|
||||
it('fetches and checks out changed files', async () => {
|
||||
mockSystem.Run.mockResolvedValueOnce(''); // git fetch
|
||||
mockSystem.Run.mockResolvedValueOnce('file1.txt\nfile2.cs\n'); // git diff
|
||||
mockSystem.Run.mockResolvedValueOnce(''); // git checkout
|
||||
|
||||
const result = await IncrementalSyncService.syncGitDelta(workspacePath, targetReference);
|
||||
|
||||
expect(result).toBe(2);
|
||||
expect(mockSystem.Run).toHaveBeenCalledWith(
|
||||
expect.stringContaining('git -C "/workspace/project" fetch origin'),
|
||||
true,
|
||||
);
|
||||
expect(mockSystem.Run).toHaveBeenCalledWith(
|
||||
expect.stringContaining('diff --name-only abc123456..def456789'),
|
||||
true,
|
||||
);
|
||||
expect(mockSystem.Run).toHaveBeenCalledWith(expect.stringContaining('checkout def456789'), true);
|
||||
});
|
||||
|
||||
it('skips checkout when no files changed', async () => {
|
||||
mockSystem.Run.mockResolvedValueOnce(''); // git fetch
|
||||
mockSystem.Run.mockResolvedValueOnce(''); // git diff (empty)
|
||||
|
||||
const result = await IncrementalSyncService.syncGitDelta(workspacePath, targetReference);
|
||||
|
||||
expect(result).toBe(0);
|
||||
|
||||
// Should only have fetch + diff calls, no checkout
|
||||
expect(mockSystem.Run).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it('throws when no sync state exists', async () => {
|
||||
mockFs.existsSync.mockReturnValue(false);
|
||||
|
||||
await expect(IncrementalSyncService.syncGitDelta(workspacePath, targetReference)).rejects.toThrow(
|
||||
'Cannot git-delta sync without existing sync state',
|
||||
);
|
||||
});
|
||||
|
||||
it('saves updated sync state after delta sync', async () => {
|
||||
mockSystem.Run.mockResolvedValueOnce(''); // git fetch
|
||||
mockSystem.Run.mockResolvedValueOnce('file1.txt\n'); // git diff
|
||||
mockSystem.Run.mockResolvedValueOnce(''); // git checkout
|
||||
|
||||
await IncrementalSyncService.syncGitDelta(workspacePath, targetReference);
|
||||
|
||||
expect(mockFs.writeFileSync).toHaveBeenCalled();
|
||||
const writeCall = mockFs.writeFileSync.mock.calls[0];
|
||||
const savedState = JSON.parse(writeCall[1] as string) as SyncState;
|
||||
expect(savedState.lastSyncCommit).toBe(targetReference);
|
||||
});
|
||||
});
|
||||
|
||||
describe('applyDirectInput', () => {
|
||||
it('extracts a local archive to workspace', async () => {
|
||||
const archivePath = '/tmp/overlay.tar';
|
||||
mockFs.existsSync.mockImplementation((p: fs.PathLike) => {
|
||||
if (p === archivePath) return true;
|
||||
|
||||
// State file path does not exist
|
||||
return false;
|
||||
});
|
||||
mockSystem.Run.mockResolvedValueOnce(''); // tar extract
|
||||
|
||||
const result = await IncrementalSyncService.applyDirectInput(workspacePath, archivePath);
|
||||
|
||||
expect(result).toEqual([archivePath]);
|
||||
expect(mockSystem.Run).toHaveBeenCalledWith(expect.stringContaining('tar -xf "/tmp/overlay.tar"'), true);
|
||||
});
|
||||
|
||||
it('fetches archive from storage URI via rclone then extracts', async () => {
|
||||
const storageUri = 'storage://s3remote:builds/overlay.tar';
|
||||
|
||||
mockFs.existsSync.mockImplementation((p: fs.PathLike) => {
|
||||
const pathString = p.toString();
|
||||
if (pathString.includes('.game-ci-input-overlay.tar')) return true;
|
||||
|
||||
return false;
|
||||
});
|
||||
mockSystem.Run.mockResolvedValue(''); // rclone copy + tar extract
|
||||
|
||||
const result = await IncrementalSyncService.applyDirectInput(workspacePath, storageUri);
|
||||
|
||||
expect(result.length).toBe(1);
|
||||
expect(mockSystem.Run).toHaveBeenCalledWith(
|
||||
expect.stringContaining('rclone copy "s3remote:builds/overlay.tar"'),
|
||||
true,
|
||||
);
|
||||
});
|
||||
|
||||
it('throws when local archive does not exist', async () => {
|
||||
mockFs.existsSync.mockReturnValue(false);
|
||||
|
||||
await expect(IncrementalSyncService.applyDirectInput(workspacePath, '/missing/archive.tar')).rejects.toThrow(
|
||||
'Input archive not found',
|
||||
);
|
||||
});
|
||||
|
||||
it('tracks overlay in sync state', async () => {
|
||||
const archivePath = '/tmp/overlay.tar';
|
||||
mockFs.existsSync.mockImplementation((p: fs.PathLike) => {
|
||||
if (p === archivePath) return true;
|
||||
|
||||
return false;
|
||||
});
|
||||
mockSystem.Run.mockResolvedValueOnce('');
|
||||
|
||||
await IncrementalSyncService.applyDirectInput(workspacePath, archivePath);
|
||||
|
||||
expect(mockFs.writeFileSync).toHaveBeenCalled();
|
||||
const writeCall = mockFs.writeFileSync.mock.calls[0];
|
||||
const savedState = JSON.parse(writeCall[1] as string) as SyncState;
|
||||
expect(savedState.pendingOverlays).toContain(archivePath);
|
||||
});
|
||||
});
|
||||
|
||||
describe('syncStoragePull', () => {
|
||||
const storageUri = 'storage://s3:game-builds/latest';
|
||||
|
||||
beforeEach(() => {
|
||||
mockFs.existsSync.mockReturnValue(false); // no existing state
|
||||
});
|
||||
|
||||
it('pulls files from rclone remote into workspace', async () => {
|
||||
mockSystem.Run.mockResolvedValueOnce('rclone v1.60.0'); // version check
|
||||
mockSystem.Run.mockResolvedValueOnce(''); // rclone copy
|
||||
mockSystem.Run.mockResolvedValueOnce(' 1234 file1.txt\n 5678 dir/file2.cs\n'); // rclone ls
|
||||
|
||||
const result = await IncrementalSyncService.syncStoragePull(workspacePath, storageUri);
|
||||
|
||||
expect(result).toEqual(['file1.txt', 'dir/file2.cs']);
|
||||
expect(mockSystem.Run).toHaveBeenCalledWith(
|
||||
expect.stringContaining('rclone copy "s3:game-builds/latest" "/workspace/project"'),
|
||||
true,
|
||||
);
|
||||
});
|
||||
|
||||
it('uses custom rclone remote when provided', async () => {
|
||||
mockSystem.Run.mockResolvedValueOnce('rclone v1.60.0'); // version
|
||||
mockSystem.Run.mockResolvedValueOnce(''); // rclone copy
|
||||
mockSystem.Run.mockResolvedValueOnce(''); // rclone ls
|
||||
|
||||
await IncrementalSyncService.syncStoragePull(workspacePath, storageUri, {
|
||||
rcloneRemote: 'custom-remote',
|
||||
});
|
||||
|
||||
expect(mockSystem.Run).toHaveBeenCalledWith(
|
||||
expect.stringContaining('rclone copy "custom-remote:game-builds/latest"'),
|
||||
true,
|
||||
);
|
||||
});
|
||||
|
||||
it('resets workspace in clean mode before pull', async () => {
|
||||
mockSystem.Run.mockResolvedValueOnce('rclone v1.60.0'); // version
|
||||
mockSystem.Run.mockResolvedValueOnce(''); // git checkout -- .
|
||||
mockSystem.Run.mockResolvedValueOnce(''); // git clean -fd
|
||||
mockSystem.Run.mockResolvedValueOnce(''); // rclone copy
|
||||
mockSystem.Run.mockResolvedValueOnce(''); // rclone ls
|
||||
|
||||
await IncrementalSyncService.syncStoragePull(workspacePath, storageUri, { cleanMode: true });
|
||||
|
||||
expect(mockSystem.Run).toHaveBeenCalledWith(
|
||||
expect.stringContaining('git -C "/workspace/project" checkout -- .'),
|
||||
true,
|
||||
);
|
||||
expect(mockSystem.Run).toHaveBeenCalledWith(
|
||||
expect.stringContaining('git -C "/workspace/project" clean -fd'),
|
||||
true,
|
||||
);
|
||||
});
|
||||
|
||||
it('throws on invalid storage URI', async () => {
|
||||
await expect(IncrementalSyncService.syncStoragePull(workspacePath, 'http://example.com')).rejects.toThrow(
|
||||
'Invalid storage URI',
|
||||
);
|
||||
});
|
||||
|
||||
it('throws when rclone binary is not available', async () => {
|
||||
mockSystem.Run.mockRejectedValueOnce(new Error('command not found: rclone'));
|
||||
|
||||
await expect(IncrementalSyncService.syncStoragePull(workspacePath, storageUri)).rejects.toThrow(
|
||||
'rclone binary not found',
|
||||
);
|
||||
});
|
||||
|
||||
it('saves sync state with overlay tracking', async () => {
|
||||
mockSystem.Run.mockResolvedValueOnce('rclone v1.60.0'); // version
|
||||
mockSystem.Run.mockResolvedValueOnce(''); // rclone copy
|
||||
mockSystem.Run.mockResolvedValueOnce(' 100 a.txt\n'); // rclone ls
|
||||
|
||||
await IncrementalSyncService.syncStoragePull(workspacePath, storageUri);
|
||||
|
||||
expect(mockFs.writeFileSync).toHaveBeenCalled();
|
||||
const writeCall = mockFs.writeFileSync.mock.calls[0];
|
||||
const savedState = JSON.parse(writeCall[1] as string) as SyncState;
|
||||
expect(savedState.pendingOverlays).toContain(storageUri);
|
||||
});
|
||||
|
||||
it('handles rclone ls failure gracefully', async () => {
|
||||
mockSystem.Run.mockResolvedValueOnce('rclone v1.60.0'); // version
|
||||
mockSystem.Run.mockResolvedValueOnce(''); // rclone copy
|
||||
mockSystem.Run.mockRejectedValueOnce(new Error('ls failed')); // rclone ls fails
|
||||
|
||||
const result = await IncrementalSyncService.syncStoragePull(workspacePath, storageUri);
|
||||
|
||||
expect(result).toEqual([]);
|
||||
expect(mockLogger.logWarning).toHaveBeenCalledWith(expect.stringContaining('Could not list pulled files'));
|
||||
});
|
||||
});
|
||||
|
||||
describe('revertOverlays', () => {
|
||||
it('reverts git state and cleans untracked files', async () => {
|
||||
const state: SyncState = {
|
||||
lastSyncCommit: 'abc123',
|
||||
lastSyncTimestamp: new Date().toISOString(),
|
||||
pendingOverlays: ['/tmp/overlay.tar', 'storage://s3:builds/content'],
|
||||
};
|
||||
mockFs.existsSync.mockReturnValue(true);
|
||||
mockFs.readFileSync.mockReturnValue(JSON.stringify(state));
|
||||
mockSystem.Run.mockResolvedValue('');
|
||||
|
||||
await IncrementalSyncService.revertOverlays(workspacePath);
|
||||
|
||||
expect(mockSystem.Run).toHaveBeenCalledWith(
|
||||
expect.stringContaining('git -C "/workspace/project" checkout -- .'),
|
||||
true,
|
||||
);
|
||||
expect(mockSystem.Run).toHaveBeenCalledWith(
|
||||
expect.stringContaining('git -C "/workspace/project" clean -fd'),
|
||||
true,
|
||||
);
|
||||
});
|
||||
|
||||
it('clears pending overlays in saved state', async () => {
|
||||
const state: SyncState = {
|
||||
lastSyncCommit: 'abc123',
|
||||
lastSyncTimestamp: new Date().toISOString(),
|
||||
pendingOverlays: ['/tmp/overlay.tar'],
|
||||
};
|
||||
mockFs.existsSync.mockReturnValue(true);
|
||||
mockFs.readFileSync.mockReturnValue(JSON.stringify(state));
|
||||
mockSystem.Run.mockResolvedValue('');
|
||||
|
||||
await IncrementalSyncService.revertOverlays(workspacePath);
|
||||
|
||||
expect(mockFs.writeFileSync).toHaveBeenCalled();
|
||||
const writeCall = mockFs.writeFileSync.mock.calls[0];
|
||||
const savedState = JSON.parse(writeCall[1] as string) as SyncState;
|
||||
expect(savedState.pendingOverlays).toEqual([]);
|
||||
});
|
||||
|
||||
it('does nothing when no overlays are pending', async () => {
|
||||
const state: SyncState = {
|
||||
lastSyncCommit: 'abc123',
|
||||
lastSyncTimestamp: new Date().toISOString(),
|
||||
pendingOverlays: [],
|
||||
};
|
||||
mockFs.existsSync.mockReturnValue(true);
|
||||
mockFs.readFileSync.mockReturnValue(JSON.stringify(state));
|
||||
|
||||
await IncrementalSyncService.revertOverlays(workspacePath);
|
||||
|
||||
expect(mockSystem.Run).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('does nothing when no sync state exists', async () => {
|
||||
mockFs.existsSync.mockReturnValue(false);
|
||||
|
||||
await IncrementalSyncService.revertOverlays(workspacePath);
|
||||
|
||||
expect(mockSystem.Run).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('SyncStateManager', () => {
|
||||
const workspacePath = '/workspace/project';
|
||||
|
||||
describe('loadState', () => {
|
||||
it('returns parsed state from default path', () => {
|
||||
const state: SyncState = {
|
||||
lastSyncCommit: 'abc123',
|
||||
lastSyncTimestamp: '2026-01-01T00:00:00.000Z',
|
||||
pendingOverlays: [],
|
||||
};
|
||||
mockFs.existsSync.mockReturnValue(true);
|
||||
mockFs.readFileSync.mockReturnValue(JSON.stringify(state));
|
||||
|
||||
const result = SyncStateManager.loadState(workspacePath);
|
||||
|
||||
expect(result).toEqual(state);
|
||||
expect(mockFs.readFileSync).toHaveBeenCalledWith(path.join(workspacePath, '.game-ci/sync-state.json'), 'utf8');
|
||||
});
|
||||
|
||||
it('uses custom state path when provided', () => {
|
||||
const state: SyncState = {
|
||||
lastSyncCommit: 'abc123',
|
||||
lastSyncTimestamp: '2026-01-01T00:00:00.000Z',
|
||||
pendingOverlays: [],
|
||||
};
|
||||
mockFs.existsSync.mockReturnValue(true);
|
||||
mockFs.readFileSync.mockReturnValue(JSON.stringify(state));
|
||||
|
||||
SyncStateManager.loadState(workspacePath, 'custom/state.json');
|
||||
|
||||
expect(mockFs.readFileSync).toHaveBeenCalledWith(path.join(workspacePath, 'custom/state.json'), 'utf8');
|
||||
});
|
||||
|
||||
it('returns undefined when state file does not exist', () => {
|
||||
mockFs.existsSync.mockReturnValue(false);
|
||||
|
||||
const result = SyncStateManager.loadState(workspacePath);
|
||||
|
||||
expect(result).toBeUndefined();
|
||||
});
|
||||
|
||||
it('returns undefined and logs warning on malformed JSON', () => {
|
||||
mockFs.existsSync.mockReturnValue(true);
|
||||
mockFs.readFileSync.mockReturnValue('not-valid-json{{{');
|
||||
|
||||
const result = SyncStateManager.loadState(workspacePath);
|
||||
|
||||
expect(result).toBeUndefined();
|
||||
expect(mockLogger.logWarning).toHaveBeenCalledWith(expect.stringContaining('Failed to load sync state'));
|
||||
});
|
||||
});
|
||||
|
||||
describe('saveState', () => {
|
||||
it('writes state to default path with pretty JSON', () => {
|
||||
const state: SyncState = {
|
||||
lastSyncCommit: 'abc123',
|
||||
lastSyncTimestamp: '2026-01-01T00:00:00.000Z',
|
||||
pendingOverlays: ['overlay1'],
|
||||
};
|
||||
mockFs.existsSync.mockReturnValue(true);
|
||||
|
||||
SyncStateManager.saveState(workspacePath, state);
|
||||
|
||||
expect(mockFs.writeFileSync).toHaveBeenCalledWith(
|
||||
path.join(workspacePath, '.game-ci/sync-state.json'),
|
||||
JSON.stringify(state, undefined, 2),
|
||||
'utf8',
|
||||
);
|
||||
});
|
||||
|
||||
it('creates parent directories if they do not exist', () => {
|
||||
const state: SyncState = {
|
||||
lastSyncCommit: 'abc123',
|
||||
lastSyncTimestamp: '2026-01-01T00:00:00.000Z',
|
||||
pendingOverlays: [],
|
||||
};
|
||||
mockFs.existsSync.mockReturnValue(false);
|
||||
|
||||
SyncStateManager.saveState(workspacePath, state);
|
||||
|
||||
expect(mockFs.mkdirSync).toHaveBeenCalledWith(expect.stringContaining('.game-ci'), { recursive: true });
|
||||
});
|
||||
|
||||
it('logs warning on write failure instead of throwing', () => {
|
||||
const state: SyncState = {
|
||||
lastSyncCommit: 'abc123',
|
||||
lastSyncTimestamp: '2026-01-01T00:00:00.000Z',
|
||||
pendingOverlays: [],
|
||||
};
|
||||
mockFs.existsSync.mockReturnValue(true);
|
||||
mockFs.writeFileSync.mockImplementation(() => {
|
||||
throw new Error('Permission denied');
|
||||
});
|
||||
|
||||
// Should not throw
|
||||
SyncStateManager.saveState(workspacePath, state);
|
||||
|
||||
expect(mockLogger.logWarning).toHaveBeenCalledWith(expect.stringContaining('Failed to save sync state'));
|
||||
});
|
||||
});
|
||||
|
||||
describe('calculateWorkspaceHash', () => {
|
||||
it('produces a deterministic SHA-256 hash from workspace files', () => {
|
||||
mockFs.existsSync.mockReturnValue(true);
|
||||
mockFs.readFileSync.mockImplementation((filePath: fs.PathOrFileDescriptor) => {
|
||||
const p = filePath.toString();
|
||||
if (p.includes('ProjectVersion.txt')) return 'm_EditorVersion: 2022.3.10f1';
|
||||
if (p.includes('manifest.json')) return '{"dependencies":{}}';
|
||||
if (p.includes('packages-lock.json')) return '{"dependencies":{}}';
|
||||
if (p.includes('csc.rsp')) return '-nullable+';
|
||||
|
||||
return '';
|
||||
});
|
||||
|
||||
const hash1 = SyncStateManager.calculateWorkspaceHash(workspacePath);
|
||||
const hash2 = SyncStateManager.calculateWorkspaceHash(workspacePath);
|
||||
|
||||
expect(hash1).toBe(hash2);
|
||||
expect(hash1).toHaveLength(64); // SHA-256 hex digest
|
||||
});
|
||||
|
||||
it('produces different hashes for different workspace content', () => {
|
||||
let callCount = 0;
|
||||
mockFs.existsSync.mockReturnValue(true);
|
||||
mockFs.readFileSync.mockImplementation(() => {
|
||||
callCount++;
|
||||
|
||||
return callCount <= 4 ? 'content-v1' : 'content-v2';
|
||||
});
|
||||
|
||||
const hash1 = SyncStateManager.calculateWorkspaceHash(workspacePath);
|
||||
const hash2 = SyncStateManager.calculateWorkspaceHash(workspacePath);
|
||||
|
||||
expect(hash1).not.toBe(hash2);
|
||||
});
|
||||
|
||||
it('includes missing file markers in hash for absent files', () => {
|
||||
mockFs.existsSync.mockReturnValue(false);
|
||||
|
||||
const hash = SyncStateManager.calculateWorkspaceHash(workspacePath);
|
||||
|
||||
expect(hash).toHaveLength(64);
|
||||
});
|
||||
});
|
||||
|
||||
describe('hasDrifted', () => {
|
||||
it('returns false when workspace hash matches', () => {
|
||||
mockFs.existsSync.mockReturnValue(false);
|
||||
const savedHash = SyncStateManager.calculateWorkspaceHash(workspacePath);
|
||||
|
||||
const result = SyncStateManager.hasDrifted(workspacePath, savedHash);
|
||||
|
||||
expect(result).toBe(false);
|
||||
});
|
||||
|
||||
it('returns true when workspace hash differs', () => {
|
||||
mockFs.existsSync.mockReturnValue(false);
|
||||
|
||||
const result = SyncStateManager.hasDrifted(workspacePath, 'some-old-hash-that-will-not-match');
|
||||
|
||||
expect(result).toBe(true);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1,3 +0,0 @@
|
||||
export { SyncState, SyncStrategy } from './sync-state';
|
||||
export { IncrementalSyncService } from './incremental-sync-service';
|
||||
export { SyncStateManager } from './sync-state-manager';
|
||||
@@ -1,125 +0,0 @@
|
||||
import fs from 'node:fs';
|
||||
import path from 'node:path';
|
||||
import crypto from 'node:crypto';
|
||||
import OrchestratorLogger from '../core/orchestrator-logger';
|
||||
import { SyncState } from './sync-state';
|
||||
|
||||
/**
|
||||
* Manages persistent sync state for incremental workspace updates.
|
||||
*
|
||||
* The sync state tracks what has been synced to a workspace, enabling
|
||||
* delta-based updates instead of full clones. State is stored as a JSON
|
||||
* file in the workspace (default: .game-ci/sync-state.json).
|
||||
*/
|
||||
export class SyncStateManager {
|
||||
static readonly DEFAULT_STATE_PATH = '.game-ci/sync-state.json';
|
||||
|
||||
/**
|
||||
* Key workspace files whose content is hashed for drift detection.
|
||||
* Changes to any of these files indicate the workspace may have been
|
||||
* modified outside of the sync system.
|
||||
*/
|
||||
private static readonly WORKSPACE_HASH_FILES = [
|
||||
'ProjectSettings/ProjectVersion.txt',
|
||||
'Packages/manifest.json',
|
||||
'Packages/packages-lock.json',
|
||||
'Assets/csc.rsp',
|
||||
];
|
||||
|
||||
/**
|
||||
* Load sync state from the workspace.
|
||||
*
|
||||
* @param workspacePath - Root path of the workspace
|
||||
* @param statePath - Relative path to the state file (default: .game-ci/sync-state.json)
|
||||
* @returns The loaded sync state, or undefined if no state exists or parsing fails
|
||||
*/
|
||||
static loadState(workspacePath: string, statePath?: string): SyncState | undefined {
|
||||
const resolvedPath = path.join(workspacePath, statePath || SyncStateManager.DEFAULT_STATE_PATH);
|
||||
|
||||
if (!fs.existsSync(resolvedPath)) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const content = fs.readFileSync(resolvedPath, 'utf8');
|
||||
|
||||
return JSON.parse(content) as SyncState;
|
||||
} catch {
|
||||
OrchestratorLogger.logWarning(`[SyncState] Failed to load sync state from ${resolvedPath}`);
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Save sync state to the workspace.
|
||||
*
|
||||
* Creates parent directories if they do not exist.
|
||||
*
|
||||
* @param workspacePath - Root path of the workspace
|
||||
* @param state - The sync state to persist
|
||||
* @param statePath - Relative path to the state file (default: .game-ci/sync-state.json)
|
||||
*/
|
||||
static saveState(workspacePath: string, state: SyncState, statePath?: string): void {
|
||||
const resolvedPath = path.join(workspacePath, statePath || SyncStateManager.DEFAULT_STATE_PATH);
|
||||
|
||||
try {
|
||||
const directory = path.dirname(resolvedPath);
|
||||
if (!fs.existsSync(directory)) {
|
||||
fs.mkdirSync(directory, { recursive: true });
|
||||
}
|
||||
|
||||
fs.writeFileSync(resolvedPath, JSON.stringify(state, undefined, 2), 'utf8');
|
||||
OrchestratorLogger.log(
|
||||
`[SyncState] State saved: commit=${state.lastSyncCommit}, overlays=${state.pendingOverlays.length}`,
|
||||
);
|
||||
} catch (error: any) {
|
||||
OrchestratorLogger.logWarning(`[SyncState] Failed to save sync state: ${error.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate a SHA-256 hash of key workspace files for drift detection.
|
||||
*
|
||||
* Hashes the content of known workspace files (ProjectVersion.txt,
|
||||
* manifest.json, etc.) to produce a fingerprint. If the hash changes
|
||||
* between syncs, the workspace may have been modified externally.
|
||||
*
|
||||
* Files that do not exist are skipped (their absence is part of the hash).
|
||||
*
|
||||
* @param workspacePath - Root path of the workspace
|
||||
* @returns Hex-encoded SHA-256 hash string
|
||||
*/
|
||||
static calculateWorkspaceHash(workspacePath: string): string {
|
||||
const hash = crypto.createHash('sha256');
|
||||
|
||||
for (const relativePath of SyncStateManager.WORKSPACE_HASH_FILES) {
|
||||
const filePath = path.join(workspacePath, relativePath);
|
||||
try {
|
||||
if (fs.existsSync(filePath)) {
|
||||
const content = fs.readFileSync(filePath, 'utf8');
|
||||
hash.update(`${relativePath}:${content}`);
|
||||
} else {
|
||||
hash.update(`${relativePath}:__missing__`);
|
||||
}
|
||||
} catch {
|
||||
hash.update(`${relativePath}:__error__`);
|
||||
}
|
||||
}
|
||||
|
||||
return hash.digest('hex');
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the workspace has drifted from a previously saved hash.
|
||||
*
|
||||
* @param workspacePath - Root path of the workspace
|
||||
* @param savedHash - The previously saved workspace hash to compare against
|
||||
* @returns true if the current workspace hash differs from the saved hash
|
||||
*/
|
||||
static hasDrifted(workspacePath: string, savedHash: string): boolean {
|
||||
const currentHash = SyncStateManager.calculateWorkspaceHash(workspacePath);
|
||||
|
||||
return currentHash !== savedHash;
|
||||
}
|
||||
}
|
||||
@@ -1,19 +0,0 @@
|
||||
/**
|
||||
* Persistent sync state for incremental workspace updates.
|
||||
* Stored on the runner to track what has already been synced.
|
||||
*/
|
||||
export interface SyncState {
|
||||
/** Last successfully synced git commit SHA */
|
||||
lastSyncCommit: string;
|
||||
|
||||
/** ISO 8601 timestamp of last sync */
|
||||
lastSyncTimestamp: string;
|
||||
|
||||
/** SHA-256 hash of workspace state (optional) */
|
||||
workspaceHash?: string;
|
||||
|
||||
/** List of overlay paths that haven't been reverted */
|
||||
pendingOverlays: string[];
|
||||
}
|
||||
|
||||
export type SyncStrategy = 'full' | 'git-delta' | 'direct-input' | 'storage-pull';
|
||||
Reference in New Issue
Block a user