Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 52 additions & 61 deletions app/http.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
use OpenRuntimes\Executor\Usage;
use Swoole\Process;
use Swoole\Runtime;
use Swoole\Table;
use Swoole\Timer;
use Utopia\CLI\Console;
use Utopia\Logger\Log;
Expand Down Expand Up @@ -87,39 +86,22 @@
* Create a Swoole table to store runtime information
*/
$register->set('activeRuntimes', function () {
$table = new Table(1024);

$table->column('id', Table::TYPE_STRING, 256);
$table->column('created', Table::TYPE_FLOAT);
$table->column('updated', Table::TYPE_FLOAT);
$table->column('name', Table::TYPE_STRING, 256);
$table->column('hostname', Table::TYPE_STRING, 256);
$table->column('status', Table::TYPE_STRING, 128);
$table->column('key', Table::TYPE_STRING, 256);
$table->create();

return $table;
$state = [];

return $state;
});

/**
* Create a Swoole table of usage stats (separate for host and containers)
*/
$register->set('statsContainers', function () {
$table = new Table(1024);

$table->column('usage', Table::TYPE_FLOAT, 8);
$table->create();

return $table;
$state = []; // has usage float key
return $state;
});

$register->set('statsHost', function () {
$table = new Table(1024);

$table->column('usage', Table::TYPE_FLOAT, 8);
$table->create();

return $table;
$state = []; // has usage float key
return $state;
});

/** Set Resources */
Expand Down Expand Up @@ -256,7 +238,10 @@ function getStorageDevice(string $root): Device
}
}

function removeAllRuntimes(Table $activeRuntimes, Orchestration $orchestration): void
/**
* @param array<string, mixed> $activeRuntimes
*/
function removeAllRuntimes(array $activeRuntimes, Orchestration $orchestration): void
{
Console::log('Cleaning up containers...');

Expand All @@ -275,8 +260,8 @@ function removeAllRuntimes(Table $activeRuntimes, Orchestration $orchestration):

$activeRuntimeId = $container->getLabels()['openruntimes-runtime-id'];

if (!$activeRuntimes->exists($activeRuntimeId)) {
$activeRuntimes->del($activeRuntimeId);
if (!\array_key_exists($activeRuntimeId, $activeRuntimes)) {
unset($activeRuntimes[$activeRuntimeId]);
}

Console::success('Removed container ' . $container->getName());
Expand Down Expand Up @@ -387,16 +372,16 @@ function removeAllRuntimes(Table $activeRuntimes, Orchestration $orchestration):
->inject('activeRuntimes')
->inject('response')
->inject('log')
->action(function (string $runtimeId, string $image, string $entrypoint, string $source, string $destination, array $variables, string $runtimeEntrypoint, string $command, int $timeout, bool $remove, int $cpus, int $memory, string $version, Orchestration $orchestration, Table $activeRuntimes, Response $response, Log $log) {
->action(function (string $runtimeId, string $image, string $entrypoint, string $source, string $destination, array $variables, string $runtimeEntrypoint, string $command, int $timeout, bool $remove, int $cpus, int $memory, string $version, Orchestration $orchestration, array $activeRuntimes, Response $response, Log $log) {
$activeRuntimeId = $runtimeId; // Used with Swoole table (key)
$runtimeId = System::getHostname() . '-' . $runtimeId; // Used in Docker (name)

$runtimeHostname = \uniqid();

$log->addTag('runtimeId', $activeRuntimeId);

if ($activeRuntimes->exists($activeRuntimeId)) {
if ($activeRuntimes->get($activeRuntimeId)['status'] == 'pending') {
if (\array_key_exists($activeRuntimeId, $activeRuntimes)) {
if ($activeRuntimes['activeRuntimeId']['status'] == 'pending') {
throw new \Exception('A runtime with the same ID is already being created. Attempt a execution soon.', 500);
}

Expand All @@ -410,15 +395,15 @@ function removeAllRuntimes(Table $activeRuntimes, Orchestration $orchestration):

$secret = \bin2hex(\random_bytes(16));

$activeRuntimes->set($activeRuntimeId, [
$activeRuntimes[$activeRuntimeId] = [
'id' => $containerId,
'name' => $activeRuntimeId,
'hostname' => $runtimeHostname,
'created' => $startTime,
'updated' => $startTime,
'status' => 'pending',
'key' => $secret,
]);
];

/**
* Temporary file paths in the executor
Expand Down Expand Up @@ -563,15 +548,15 @@ function removeAllRuntimes(Table $activeRuntimes, Orchestration $orchestration):
'duration' => $duration,
]);

$activeRuntimes->set($activeRuntimeId, [
$activeRuntimes[$activeRuntimeId] = [
'id' => $containerId,
'name' => $activeRuntimeId,
'hostname' => $runtimeHostname,
'created' => $startTime,
'updated' => \microtime(true),
'status' => 'Up ' . \round($duration, 2) . 's',
'key' => $secret,
]);
];
} catch (Throwable $th) {
$error = $th->getMessage() . $output;

Expand Down Expand Up @@ -604,7 +589,7 @@ function removeAllRuntimes(Table $activeRuntimes, Orchestration $orchestration):
} catch (Throwable $th) {
}

$activeRuntimes->del($activeRuntimeId);
unset($activeRuntimes[$activeRuntimeId]);

throw new Exception($error, 500);
}
Expand All @@ -621,7 +606,7 @@ function removeAllRuntimes(Table $activeRuntimes, Orchestration $orchestration):
} catch (Throwable $th) {
}

$activeRuntimes->del($activeRuntimeId);
unset($activeRuntimes[$activeRuntimeId]);
}

$response
Expand All @@ -633,7 +618,7 @@ function removeAllRuntimes(Table $activeRuntimes, Orchestration $orchestration):
->desc("List currently active runtimes")
->inject('activeRuntimes')
->inject('response')
->action(function (Table $activeRuntimes, Response $response) {
->action(function (array $activeRuntimes, Response $response) {
$runtimes = [];

foreach ($activeRuntimes as $runtime) {
Expand All @@ -651,16 +636,16 @@ function removeAllRuntimes(Table $activeRuntimes, Orchestration $orchestration):
->inject('activeRuntimes')
->inject('response')
->inject('log')
->action(function (string $runtimeId, Table $activeRuntimes, Response $response, Log $log) {
->action(function (string $runtimeId, array $activeRuntimes, Response $response, Log $log) {
$activeRuntimeId = $runtimeId; // Used with Swoole table (key)

$log->addTag('runtimeId', $activeRuntimeId);

if (!$activeRuntimes->exists($activeRuntimeId)) {
if (!\array_key_exists($activeRuntimeId, $activeRuntimes)) {
throw new Exception('Runtime not found', 404);
}

$runtime = $activeRuntimes->get($activeRuntimeId);
$runtime = $activeRuntimes[$activeRuntimeId];

$response
->setStatusCode(Response::STATUS_CODE_OK)
Expand All @@ -674,18 +659,19 @@ function removeAllRuntimes(Table $activeRuntimes, Orchestration $orchestration):
->inject('activeRuntimes')
->inject('response')
->inject('log')
->action(function (string $runtimeId, Orchestration $orchestration, Table $activeRuntimes, Response $response, Log $log) {
->action(function (string $runtimeId, Orchestration $orchestration, array $activeRuntimes, Response $response, Log $log) {
$activeRuntimeId = $runtimeId; // Used with Swoole table (key)
$runtimeId = System::getHostname() . '-' . $runtimeId; // Used in Docker (name)

$log->addTag('runtimeId', $activeRuntimeId);

if (!$activeRuntimes->exists($activeRuntimeId)) {

if (!\array_key_exists($activeRuntimeId, $activeRuntimes)) {
throw new Exception('Runtime not found', 404);
}

$orchestration->remove($runtimeId, true);
$activeRuntimes->del($activeRuntimeId);
unset($activeRuntimes[$activeRuntimeId]);

$response
->setStatusCode(Response::STATUS_CODE_OK)
Expand Down Expand Up @@ -714,7 +700,7 @@ function removeAllRuntimes(Table $activeRuntimes, Orchestration $orchestration):
->inject('response')
->inject('log')
->action(
function (string $runtimeId, ?string $payload, string $path, string $method, array $headers, int $timeout, string $image, string $source, string $entrypoint, array $variables, int $cpus, int $memory, string $version, string $runtimeEntrypoint, Table $activeRuntimes, Response $response, Log $log) {
function (string $runtimeId, ?string $payload, string $path, string $method, array $headers, int $timeout, string $image, string $source, string $entrypoint, array $variables, int $cpus, int $memory, string $version, string $runtimeEntrypoint, array $activeRuntimes, Response $response, Log $log) {
if (empty($payload)) {
$payload = '';
}
Expand All @@ -731,7 +717,7 @@ function (string $runtimeId, ?string $payload, string $path, string $method, arr
$coldStartDuration = 0;

// Prepare runtime
if (!$activeRuntimes->exists($activeRuntimeId)) {
if (!\array_key_exists($activeRuntimeId, $activeRuntimes)) {
if (empty($image) || empty($source) || empty($entrypoint)) {
throw new Exception('Runtime not found. Please start it first or provide runtime-related parameters.', 401);
}
Expand Down Expand Up @@ -816,13 +802,13 @@ function (string $runtimeId, ?string $payload, string $path, string $method, arr
}

// Update swoole table
$runtime = $activeRuntimes->get($activeRuntimeId) ?? [];
$runtime = $activeRuntimes[$activeRuntimeId] ?? [];
$runtime['updated'] = \time();
$activeRuntimes->set($activeRuntimeId, $runtime);
$activeRuntimes[$activeRuntimeId] = $runtime;

// Ensure runtime started
for ($i = 0; $i < 10; $i++) {
if ($activeRuntimes->get($activeRuntimeId)['status'] !== 'pending') {
if ($activeRuntimes[$activeRuntimeId]['status'] !== 'pending') {
break;
}

Expand All @@ -834,7 +820,7 @@ function (string $runtimeId, ?string $payload, string $path, string $method, arr
}

// Ensure we have secret
$runtime = $activeRuntimes->get($activeRuntimeId);
$runtime = $activeRuntimes[$activeRuntimeId];
$hostname = $runtime['hostname'];
$secret = $runtime['key'];
if (empty($secret)) {
Expand Down Expand Up @@ -1056,9 +1042,9 @@ function (string $runtimeId, ?string $payload, string $path, string $method, arr
}

// Update swoole table
$runtime = $activeRuntimes->get($activeRuntimeId);
$runtime = $activeRuntimes[$activeRuntimeId];
$runtime['updated'] = \microtime(true);
$activeRuntimes->set($activeRuntimeId, $runtime);
$activeRuntimes[$activeRuntimeId] = $runtime;

// Finish request
$response
Expand All @@ -1073,13 +1059,13 @@ function (string $runtimeId, ?string $payload, string $path, string $method, arr
->inject('statsHost')
->inject('statsContainers')
->inject('response')
->action(function (Table $statsHost, Table $statsContainers, Response $response) {
->action(function (array $statsHost, array $statsContainers, Response $response) {
$output = [
'status' => 'pass',
'runtimes' => []
];

$hostUsage = $statsHost->get('host', 'usage') ?? null;
$hostUsage = ($statsHost['host'] ?? [])['usage'] ?? null;
$output['usage'] = $hostUsage;

foreach ($statsContainers as $hostname => $stat) {
Expand Down Expand Up @@ -1213,7 +1199,7 @@ function (string $runtimeId, ?string $payload, string $path, string $method, arr
} catch (\Throwable $th) {
Console::error('Inactive Runtime deletion failed: ' . $th->getMessage());
} finally {
$activeRuntimes->del($activeRuntimeId);
unset($activeRuntimes[$activeRuntimeId]);
}
});
}
Expand Down Expand Up @@ -1249,43 +1235,48 @@ function (string $runtimeId, ?string $payload, string $path, string $method, arr
* Get usage stats every X seconds to update swoole table
*/
Console::info('Starting stats interval...');
function getStats(Table $statsHost, Table $statsContainers, Orchestration $orchestration, bool $recursive = false): void

/**
* @param array<string, mixed> $statsHost
* @param array<string, mixed> $statsContainers
*/
function getStats(array $statsHost, array $statsContainers, Orchestration $orchestration, bool $recursive = false): void
{
// Get usage stats
$usage = new Usage($orchestration);
$usage->run();

// Update host usage stats
if ($usage->getHostUsage() !== null) {
$oldStat = $statsHost->get('host', 'usage') ?? null;
$oldStat = ($statsHost['host'] ?? [])['usage'] ?? null;

if ($oldStat === null) {
$stat = $usage->getHostUsage();
} else {
$stat = ($oldStat + $usage->getHostUsage()) / 2;
}

$statsHost->set('host', ['usage' => $stat]);
$statsHost['host'] = ['usage' => $stat];
}

// Update runtime usage stats
foreach ($usage->getRuntimesUsage() as $runtime => $usageStat) {
$oldStat = $statsContainers->get($runtime, 'usage') ?? null;
$oldStat = ($statsContainers[$runtime] ?? [])['usage'] ?? null;

if ($oldStat === null) {
$stat = $usageStat;
} else {
$stat = ($oldStat + $usageStat) / 2;
}

$statsContainers->set($runtime, ['usage' => $stat]);
$statsContainers[$runtime] = ['usage' => $stat];
}

// Delete gone runtimes
$runtimes = \array_keys($usage->getRuntimesUsage());
foreach ($statsContainers as $hostname => $stat) {
if (!(\in_array($hostname, $runtimes))) {
$statsContainers->delete($hostname);
unset($statsContainers[$hostname]);
}
}

Expand Down