Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 235 additions & 0 deletions lib/domain/Functions.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
const crypto = require('crypto');
const semver = require('semver');

const log = require('../support/log');
const { StdoutLogStorage, DefaultLogStorage } = require('./LogStorage');

const Metric = require('./Metric');

const Pipeline = require('./Pipeline');
const ErrorTracker = require('./ErrorTracker');

function codeFileName(namespace, codeId, version) {
if (version === undefined || version === 'latest') {
return `${namespace}/${codeId}.js`;
}
return `${namespace}/${codeId}/${version}.js`;
}

class Functions {
constructor(storage, sandbox, req) {
this.storage = storage;
this.sandbox = sandbox;
this.req = req;
}

async updateVersion(namespace, id, version) {
if (!version) {
return;
}

// Latest versions are saved to namespace
let latest = {};
const ns = await this.storage.getNamespace(namespace) || { namespace };

if (ns.latest) {
latest = ns.latest;
}

if (Object.prototype.hasOwnProperty.call(latest, id)) {
const curVersion = latest[id];
if (semver.gt(version, curVersion)) {
latest[id] = version;
}
} else {
latest[id] = version;
}

ns.latest = latest;
await this.storage.putNamespace(namespace, ns);
}

async create(namespace, id, version, code, env) {
let v = version;
const filename = codeFileName(namespace, id, version);

if (version === 'latest') {
v = null;
}

const invalid = this.sandbox.testSyntaxError(filename, code, {
console: new StdoutLogStorage(namespace, id, v).console,
});
if (invalid) {
this.req.log.error(`Failed to post code: ${invalid.error}`);
return {
status: 400,
body: invalid,
};
}

const hash = crypto.createHash('sha1').update(code).digest('hex');
const data = { id, version, code, hash };

if (env) {
data.env = env;
}

try {
await this.storage.putCode(namespace, id, v, data);

await this.updateVersion(namespace, id, v);

return {
status: 200,
body: data,
};
} catch (err) {
log.error(`[${namespace}:${id}:${version}] ${err}`);
return {
status: 500,
body: { error: err.message },
};
}
}

async run(namespace, id, version) {
let v = version;
const filename = codeFileName(namespace, id, version);
const metric = new Metric('function-run');

const ns = await this.storage.getNamespace(namespace);
if (!ns) {
v = null;
} else if (ns.latest && version === 'latest') {
v = ns.latest[id];
}

const logStorage = new DefaultLogStorage(namespace, id, v, this.req);

let code;
try {
code = await this.storage.getCodeByCache(namespace, id, v, {
preCache: (preCode) => {
preCode.script = this.sandbox.compileCode(filename, preCode.code);
return preCode;
},
});

if (!code) {
const errMsg = v ? `Code '${namespace}/${id}/${v}' was not found` : `Code '${namespace}/${id}' was not found`;
return {
status: 404,
body: { error: errMsg },
};
}
} catch (err) {
return {
status: err.statusCode || 500,
body: { error: err.message },
};
}

try {
const options = {
console: logStorage.console,
env: code.env,
};
const result = await this.sandbox.runScript(code.script, this.req, options);

const spent = metric.observeFunctionRun({ namespace, id, version, status: result.status });
logStorage.flush({
status: result.status,
requestTime: spent,
});
return result;
} catch (err) {
logStorage.console.error(`Failed to run function: ${err}`);
logStorage.console.error(err.stack);
const status = err.statusCode || 500;
const errResult = {
status,
body: { error: err.message },
};

const spent = metric.observeFunctionRun({ namespace, id, version, status });

const logResult = logStorage.flush({
status,
requestTime: spent,
});

const { sentryDSN } = code;

const extra = Object.assign({ body: this.req.body }, logResult || {});
const errTracker = new ErrorTracker({
sentryDSN,
filename,
extra,
tags: { codeHash: code.hash },
code: code.code,
});
errTracker.notify(err);
return errResult;
}
}

async runPipeline(stepsInput) {
const metric = new Metric('pipeline-run');

const stepsPromises = stepsInput.map(async (step) => {
const [namespace, id, version] = step.split('/', 3);
const ns = await this.storage.getNamespace(namespace);

// Return versioned function
if (version !== undefined || version !== 'latest') {
return { namespace, id, version };
}

// Handle latest and unversioned functions
let v = version;
if (!ns) {
v = null;
} else if (ns.latest && version === 'latest') {
v = ns.latest[id];
}
return { namespace, id, version: v };
});

const steps = await Promise.all(stepsPromises);

try {
const codes = await this.storage.getCodesByCache(steps, {
preCache: (code) => {
const filename = codeFileName(code.namespace, code.id, code.version);
code.script = this.sandbox.compileCode(filename, code.code);
return code;
},
});

for (let i = 0; i < codes.length; i += 1) {
if (!codes[i]) {
const { namespace, id, version } = steps[i];
const codeName = version ? `${namespace}/${id}/${version}` : `${namespace}/${id}`;
const e = new Error(`Code '${codeName}' was not found`);
e.statusCode = 404;
throw e;
}
}

const result = await new Pipeline(this.sandbox, this.req, codes).run();
metric.observePipelineRun(result.status);
return result;
} catch (err) {
const result = {
status: err.statusCode || 500,
body: { error: err.message },
};

metric.observePipelineRun(result.status);
return result;
}
}
}

module.exports = Functions;
8 changes: 4 additions & 4 deletions lib/domain/LogStorage.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ function extractHTTPFields(req) {

class StdoutLogStorage {
/* eslint class-methods-use-this: ["error", { "exceptMethods": ["flush"] }] */
constructor(namespace, id) {
this.console = new PrefixLog(`namespace:${namespace}, id:${id}`);
constructor(namespace, id, version) {
this.console = new PrefixLog(`namespace:${namespace}, id:${id}, version:${version}`);
}

flush() {}
}

class GelfLogStorage {
constructor(namespace, id, req) {
constructor(namespace, id, version, req) {
this.stream = new MemoryStream(config.log.maxFullMessage);
this.console = new PrefixLog(null, this.stream, this.stream);
this.file = `${namespace}/${id}.js`;
this.file = `${namespace}/${id}/${version}.js`;
this.namespace = namespace;
this.fields = extractHTTPFields(req);
this.gelfClients = GelfLogStorage.prepareGelfClients();
Expand Down
10 changes: 5 additions & 5 deletions lib/domain/Metric.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const functionRunHistogram = new prometheusClient.Histogram({
name: 'backstage_functions_function_run_duration_seconds',
help: 'How many time spent to run a function in seconds',
buckets,
labelNames: ['namespace', 'id'],
labelNames: ['namespace', 'id', 'version'],
});

const functionOverviewRunHistogram = new prometheusClient.Histogram({
Expand Down Expand Up @@ -61,7 +61,7 @@ const functionOverviewRunCounter = new prometheusClient.Counter({
const functionRunCounter = new prometheusClient.Counter({
name: 'backstage_functions_function_run_total',
help: 'What is the status code of a function',
labelNames: ['namespace', 'id', 'status'],
labelNames: ['namespace', 'id', 'version', 'status'],
});

const functionPipelineCounter = new prometheusClient.Counter({
Expand All @@ -81,15 +81,15 @@ class Metric {
this.start = Date.now();
}

observeFunctionRun({ namespace, id, status }) {
observeFunctionRun({ namespace, id, version, status }) {
const spent = (Date.now() - this.start) / 1000;
const normalizedStatusCode = normalizeStatusCode(status);

functionOverviewRunHistogram.observe(spent);
functionOverviewRunCounter.labels(normalizedStatusCode).inc();

functionRunHistogram.labels(namespace, id).observe(spent);
functionRunCounter.labels(namespace, id, normalizedStatusCode).inc();
functionRunHistogram.labels(namespace, id, version).observe(spent);
functionRunCounter.labels(namespace, id, version, normalizedStatusCode).inc();

return spent;
}
Expand Down
11 changes: 7 additions & 4 deletions lib/domain/Pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ const Metric = require('./Metric');
const { DefaultLogStorage } = require('./LogStorage');
const ErrorTracker = require('./ErrorTracker');

function codeFileName(namespace, codeId) {
return `${namespace}/${codeId}.js`;
function codeFileName(namespace, codeId, version) {
return `${namespace}/${codeId}/${version}.js`;
}

class Pipeline {
Expand All @@ -25,8 +25,8 @@ class Pipeline {
this.steps = nextSteps;

const metric = new Metric('function-run');
const filename = codeFileName(step.namespace, step.id);
const logStorage = new DefaultLogStorage(step.namespace, step.id, this.req);
const filename = codeFileName(step.namespace, step.id, step.version);
const logStorage = new DefaultLogStorage(step.namespace, step.id, step.version, this.req);

const options = {
console: logStorage.console,
Expand All @@ -38,6 +38,7 @@ class Pipeline {
const spent = metric.observeFunctionRun({
namespace: step.namespace,
id: step.id,
version: step.version,
status: result.status,
});

Expand All @@ -59,6 +60,7 @@ class Pipeline {
} else {
result.body.namespace = step.namespace;
result.body.functionId = step.id;
result.body.version = step.version;

return result;
}
Expand All @@ -74,6 +76,7 @@ class Pipeline {
const spent = metric.observeFunctionRun({
namespace: step.namespace,
id: step.id,
version: step.version,
status,
});

Expand Down
Loading