Feat/migrate aws sdk v3 (#698)

* chore(cloud-runner): migrate/replace deps aws-sdk v2 to v3

* chore(aws): refactor aws services to support SDK v3

* chore(aws): refactor aws runner to support SDK v3

* chore(aws): update dist

* fix(aws): error handling wrap try/catch to avoid unhandled promise rejections.

* fix(aws): keeping the syntax simpler for arrays
This commit is contained in:
Matheus Costa
2025-04-10 17:48:14 -03:00
committed by GitHub
parent 9d6bdcbdc5
commit 81ed299e10
12 changed files with 79070 additions and 40424 deletions
@@ -1,4 +1,19 @@
import * as AWS from 'aws-sdk';
import {
DescribeTasksCommand,
ECS,
RunTaskCommand,
RunTaskCommandInput,
Task,
waitUntilTasksRunning,
} from '@aws-sdk/client-ecs';
import {
DescribeStreamCommand,
DescribeStreamCommandOutput,
GetRecordsCommand,
GetRecordsCommandOutput,
GetShardIteratorCommand,
Kinesis,
} from '@aws-sdk/client-kinesis';
import CloudRunnerEnvironmentVariable from '../../options/cloud-runner-environment-variable';
import * as core from '@actions/core';
import CloudRunnerAWSTaskDef from './cloud-runner-aws-task-def';
@@ -12,8 +27,8 @@ import CloudRunnerOptions from '../../options/cloud-runner-options';
import GitHub from '../../../github';
class AWSTaskRunner {
public static ECS: AWS.ECS;
public static Kinesis: AWS.Kinesis;
public static ECS: ECS;
public static Kinesis: Kinesis;
private static readonly encodedUnderscore = `$252F`;
static async runTask(
taskDef: CloudRunnerAWSTaskDef,
@@ -60,7 +75,7 @@ class AWSTaskRunner {
throw new Error(`Container Overrides length must be at most 8192`);
}
const task = await AWSTaskRunner.ECS.runTask(runParameters).promise();
const task = await AWSTaskRunner.ECS.send(new RunTaskCommand(runParameters as RunTaskCommandInput));
const taskArn = task.tasks?.[0].taskArn || '';
CloudRunnerLogger.log('Cloud runner job is starting');
await AWSTaskRunner.waitUntilTaskRunning(taskArn, cluster);
@@ -108,7 +123,13 @@ class AWSTaskRunner {
private static async waitUntilTaskRunning(taskArn: string, cluster: string) {
try {
await AWSTaskRunner.ECS.waitFor('tasksRunning', { tasks: [taskArn], cluster }).promise();
await waitUntilTasksRunning(
{
client: AWSTaskRunner.ECS,
maxWaitTime: 120,
},
{ tasks: [taskArn], cluster },
);
} catch (error_) {
const error = error_ as Error;
await new Promise((resolve) => setTimeout(resolve, 3000));
@@ -124,10 +145,7 @@ class AWSTaskRunner {
}
static async describeTasks(clusterName: string, taskArn: string) {
const tasks = await AWSTaskRunner.ECS.describeTasks({
cluster: clusterName,
tasks: [taskArn],
}).promise();
const tasks = await AWSTaskRunner.ECS.send(new DescribeTasksCommand({ cluster: clusterName, tasks: [taskArn] }));
if (tasks.tasks?.[0]) {
return tasks.tasks?.[0];
} else {
@@ -169,9 +187,7 @@ class AWSTaskRunner {
output: string,
shouldCleanup: boolean,
) {
const records = await AWSTaskRunner.Kinesis.getRecords({
ShardIterator: iterator,
}).promise();
const records = await AWSTaskRunner.Kinesis.send(new GetRecordsCommand({ ShardIterator: iterator }));
iterator = records.NextShardIterator || '';
({ shouldReadLogs, output, shouldCleanup } = AWSTaskRunner.logRecords(
records,
@@ -184,7 +200,7 @@ class AWSTaskRunner {
return { iterator, shouldReadLogs, output, shouldCleanup };
}
private static checkStreamingShouldContinue(taskData: AWS.ECS.Task, timestamp: number, shouldReadLogs: boolean) {
private static checkStreamingShouldContinue(taskData: Task, timestamp: number, shouldReadLogs: boolean) {
if (taskData?.lastStatus === 'UNKNOWN') {
CloudRunnerLogger.log('## Cloud runner job unknwon');
}
@@ -204,15 +220,17 @@ class AWSTaskRunner {
}
private static logRecords(
records: AWS.Kinesis.GetRecordsOutput,
records: GetRecordsCommandOutput,
iterator: string,
shouldReadLogs: boolean,
output: string,
shouldCleanup: boolean,
) {
if (records.Records.length > 0 && iterator) {
for (const record of records.Records) {
const json = JSON.parse(zlib.gunzipSync(Buffer.from(record.Data as string, 'base64')).toString('utf8'));
if ((records.Records ?? []).length > 0 && iterator) {
for (const record of records.Records ?? []) {
const json = JSON.parse(
zlib.gunzipSync(Buffer.from(record.Data as unknown as string, 'base64')).toString('utf8'),
);
if (json.messageType === 'DATA_MESSAGE') {
for (const logEvent of json.logEvents) {
({ shouldReadLogs, shouldCleanup, output } = FollowLogStreamService.handleIteration(
@@ -230,19 +248,19 @@ class AWSTaskRunner {
}
private static async getLogStream(kinesisStreamName: string) {
return await AWSTaskRunner.Kinesis.describeStream({
StreamName: kinesisStreamName,
}).promise();
return await AWSTaskRunner.Kinesis.send(new DescribeStreamCommand({ StreamName: kinesisStreamName }));
}
private static async getLogIterator(stream: AWS.Kinesis.DescribeStreamOutput) {
private static async getLogIterator(stream: DescribeStreamCommandOutput) {
return (
(
await AWSTaskRunner.Kinesis.getShardIterator({
ShardIteratorType: 'TRIM_HORIZON',
StreamName: stream.StreamDescription.StreamName,
ShardId: stream.StreamDescription.Shards[0].ShardId,
}).promise()
await AWSTaskRunner.Kinesis.send(
new GetShardIteratorCommand({
ShardIteratorType: 'TRIM_HORIZON',
StreamName: stream.StreamDescription?.StreamName ?? '',
ShardId: stream.StreamDescription?.Shards?.[0]?.ShardId || '',
}),
)
).ShardIterator || ''
);
}