Skip to content
30 changes: 25 additions & 5 deletions schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ type Dataset implements Ressource @entity {
checksum: Bytes!
timestamp: BigInt! # last transfer
usages: [Deal!]! @derivedFrom(field: "dataset")
bulkUsages: [BulkSlice!]! @derivedFrom(field: "datasets")
orders: [DatasetOrder!]! @derivedFrom(field: "dataset")
transfers: [DatasetTransfer!]! @derivedFrom(field: "dataset")
}
Expand Down Expand Up @@ -246,7 +247,7 @@ type AppOrder @entity {

type DatasetOrder @entity {
id: ID!
dataset: Dataset!
dataset: Dataset # nullable: off-chain bulk datasetorder could reference non-existing dataset
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to modify the nullability on this field because file handlers can't access on-chain handlers entities.
When indexing a datasetorder from a bulk (file handler), we cannot verify that the dataset exists on-chain or exists as an entity.

datasetprice: BigDecimal!
volume: BigInt
tag: Bytes
Expand Down Expand Up @@ -333,11 +334,29 @@ type Deal @entity {
schedulerRewardRatio: BigInt!
sponsor: Account!
timestamp: BigInt! # creation
apporder: AppOrder # todo: not available if not broadcasted
datasetorder: DatasetOrder # todo: not available if not broadcasted
workerpoolorder: WorkerpoolOrder # todo: not available if not broadcasted
requestorder: RequestOrder # todo: not available if not broadcasted
apporder: AppOrder
datasetorder: DatasetOrder
workerpoolorder: WorkerpoolOrder
requestorder: RequestOrder
events: [DealEvent!]! @derivedFrom(field: "deal")
bulk: Bulk
}

type Bulk @entity(immutable: true) {
id: ID!
hash: String!
slices: [BulkSlice!]! @derivedFrom(field: "bulk")
deal: Deal! @derivedFrom(field: "bulk")
}

type BulkSlice @entity(immutable: true) {
id: ID!
bulk: Bulk!
hash: String!
task: Task # nullable: task may not be initialized at the time of BulkSlice creation
index: BigInt!
datasets: [Dataset!]!
datasetOrders: [DatasetOrder!]!
}

enum TaskStatus {
Expand Down Expand Up @@ -366,6 +385,7 @@ type Task @entity {
rewards: [Reward!]! @derivedFrom(field: "task")
seizes: [Seize!]! @derivedFrom(field: "task")
events: [TaskEvent!]! @derivedFrom(field: "task")
bulkSlice: BulkSlice @derivedFrom(field: "task")
}

enum ContributionStatus {
Expand Down
180 changes: 180 additions & 0 deletions src/Modules/Bulk.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import {
Address,
BigInt,
Bytes,
dataSource,
DataSourceContext,
DataSourceTemplate,
json,
JSONValueKind,
} from '@graphprotocol/graph-ts';
import { Bulk, BulkSlice } from '../../generated/schema';
import {
computeTaskId,
CONTEXT_BOT_FIRST,
CONTEXT_BOT_SIZE,
CONTEXT_BULK,
CONTEXT_DEAL,
CONTEXT_INDEX,
createBulkOrderID,
createBulkSliceID,
fetchDatasetorder,
isAddressString,
isBytes32String,
isHexString,
isIntegerString,
toRLC,
} from '../utils';

export function handleBulk(content: Bytes): void {
const hash = dataSource.stringParam();
const context = dataSource.context();
Comment on lines +30 to +31
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how this infos are loaded when a handleBulk occure ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the diff with the content is like having metadata plug to the actual content of the IPFS file ? and this metadata is set up in the matchOrders handler function ?

const dealId = context.getString(CONTEXT_DEAL);
const botFirst = context.getBigInt(CONTEXT_BOT_FIRST);
const botSize = context.getBigInt(CONTEXT_BOT_SIZE);

const bulkId = dealId;
let bulk = Bulk.load(bulkId);
if (bulk != null) {
// immutable bulk already exists nothing to do
return;
}
bulk = new Bulk(bulkId);
bulk.hash = hash;

const jsonContent = json.try_fromBytes(content);
if (jsonContent.isOk && jsonContent.value.kind == JSONValueKind.ARRAY) {
const contentArray = jsonContent.value.toArray();

for (let i = 0; i < contentArray.length; i++) {
const entry = contentArray[i];
const index = BigInt.fromI32(i);
if (
// exclude slice out of deal bot range
index >= botFirst &&
index < botFirst.plus(botSize) &&
entry.kind == JSONValueKind.STRING
) {
const sliceCid = entry.toString();
let sliceContext = new DataSourceContext();
sliceContext.setString(CONTEXT_BULK, bulkId);
sliceContext.setString(CONTEXT_DEAL, dealId);
sliceContext.setBigInt(CONTEXT_INDEX, index);
DataSourceTemplate.createWithContext('BulkSlice', [sliceCid], sliceContext);
}
}
}

bulk.save();
}

export function handleBulkSlice(content: Bytes): void {
const hash = dataSource.stringParam();
const context = dataSource.context();
const bulk = context.getString(CONTEXT_BULK);
const dealId = context.getString(CONTEXT_DEAL);
const index = context.getBigInt(CONTEXT_INDEX);
const taskId = computeTaskId(dealId, index);

if (taskId !== null) {
const bulkSliceId = createBulkSliceID(dealId, index);
let bulkSlice = BulkSlice.load(bulkSliceId);
if (bulkSlice != null) {
// immutable bulk slice already exists nothing to do
return;
}
bulkSlice = new BulkSlice(bulkSliceId);
bulkSlice.task = taskId;
bulkSlice.hash = hash;
bulkSlice.bulk = bulk;
bulkSlice.index = index;
bulkSlice.datasets = new Array<string>();
bulkSlice.datasetOrders = new Array<string>();

const jsonContent = json.try_fromBytes(content);
if (jsonContent.isOk && jsonContent.value.kind == JSONValueKind.ARRAY) {
const datasetOrderArray = jsonContent.value.toArray();

for (let i = 0; i < datasetOrderArray.length; i++) {
const datasetOrder = datasetOrderArray[i];
if (datasetOrder.kind == JSONValueKind.OBJECT) {
const orderObj = datasetOrder.toObject();

const datasetEntry = orderObj.getEntry('dataset');
const datasetPriceEntry = orderObj.getEntry('datasetprice');
const volumeEntry = orderObj.getEntry('volume');
const tagEntry = orderObj.getEntry('tag');
const apprestrictEntry = orderObj.getEntry('apprestrict');
const workerpoolrestrictEntry = orderObj.getEntry('workerpoolrestrict');
const requesterrestrictEntry = orderObj.getEntry('requesterrestrict');
const saltEntry = orderObj.getEntry('salt');
const signEntry = orderObj.getEntry('sign');
// check that all entries are present and valid
if (
datasetEntry != null &&
datasetEntry.value.kind == JSONValueKind.STRING &&
isAddressString(datasetEntry.value.toString().toLowerCase()) &&
datasetPriceEntry != null &&
datasetPriceEntry.value.kind == JSONValueKind.STRING &&
isIntegerString(datasetPriceEntry.value.toString()) &&
volumeEntry != null &&
volumeEntry.value.kind == JSONValueKind.STRING &&
isIntegerString(volumeEntry.value.toString()) &&
tagEntry != null &&
tagEntry.value.kind == JSONValueKind.STRING &&
isBytes32String(tagEntry.value.toString()) &&
apprestrictEntry != null &&
apprestrictEntry.value.kind == JSONValueKind.STRING &&
isAddressString(apprestrictEntry.value.toString().toLowerCase()) &&
workerpoolrestrictEntry != null &&
workerpoolrestrictEntry.value.kind == JSONValueKind.STRING &&
isAddressString(workerpoolrestrictEntry.value.toString().toLowerCase()) &&
requesterrestrictEntry != null &&
requesterrestrictEntry.value.kind == JSONValueKind.STRING &&
isAddressString(requesterrestrictEntry.value.toString().toLowerCase()) &&
saltEntry != null &&
saltEntry.value.kind == JSONValueKind.STRING &&
isBytes32String(saltEntry.value.toString()) &&
signEntry != null &&
signEntry.value.kind == JSONValueKind.STRING &&
isHexString(signEntry.value.toString())
) {
// datasetOrderId cannot be orderHash as it could collide with on-chain indexed order
const datasetOrderId = createBulkOrderID(taskId, BigInt.fromI32(i));

const datasetAddress = datasetEntry.value.toString().toLowerCase();

let datasetOrder = fetchDatasetorder(datasetOrderId);
datasetOrder.dataset = datasetAddress;
datasetOrder.datasetprice = toRLC(
BigInt.fromString(datasetPriceEntry.value.toString()),
);
datasetOrder.volume = BigInt.fromString(volumeEntry.value.toString());
datasetOrder.tag = Bytes.fromHexString(tagEntry.value.toString());
datasetOrder.apprestrict = Address.fromString(
apprestrictEntry.value.toString().toLowerCase(),
);
datasetOrder.workerpoolrestrict = Address.fromString(
workerpoolrestrictEntry.value.toString().toLowerCase(),
);
datasetOrder.requesterrestrict = Address.fromString(
requesterrestrictEntry.value.toString().toLowerCase(),
);
datasetOrder.salt = Bytes.fromHexString(saltEntry.value.toString());
datasetOrder.sign = Bytes.fromHexString(signEntry.value.toString());
datasetOrder.save();

let datasetOrders = bulkSlice.datasetOrders;
datasetOrders.push(datasetOrderId);
bulkSlice.datasetOrders = datasetOrders;

let datasets = bulkSlice.datasets;
datasets.push(datasetAddress);
bulkSlice.datasets = datasets;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we warn here in case the dataset order is invalid ?

}
}
}
bulkSlice.save();
}
}
43 changes: 38 additions & 5 deletions src/Modules/IexecPoco.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
// SPDX-FileCopyrightText: 2020-2025 IEXEC BLOCKCHAIN TECH <[email protected]>
// SPDX-License-Identifier: Apache-2.0

import { Address, BigInt, dataSource } from '@graphprotocol/graph-ts';
import {
Address,
BigInt,
dataSource,
DataSourceContext,
DataSourceTemplate,
json,
} from '@graphprotocol/graph-ts';
const chainName = dataSource.network();

import {
Expand Down Expand Up @@ -34,6 +41,9 @@ import {
} from '../../generated/schema';

import {
CONTEXT_BOT_FIRST,
CONTEXT_BOT_SIZE,
CONTEXT_DEAL,
createContributionID,
createEventID,
fetchAccount,
Expand Down Expand Up @@ -94,17 +104,40 @@ export function handleOrdersMatched(event: OrdersMatchedEvent): void {
deal.timestamp = event.block.timestamp;
deal.save();

// if no dataset, check if params include a bulk_cid reference
if (deal.dataset == Address.zero().toHex()) {
const params = json.try_fromString(viewedDeal.params);
if (params.isOk) {
const bulkCid = params.value.toObject().getEntry('bulk_cid');
if (bulkCid) {
// the same bulk may be used by many deals => we use dealid as bulk ID to avoid collisions
const bulkId = event.params.dealid.toHex();
let context = new DataSourceContext();
// Pass onchain data that will be needed in file handlers
context.setString(CONTEXT_DEAL, deal.id);
context.setBigInt(CONTEXT_BOT_FIRST, deal.botFirst);
context.setBigInt(CONTEXT_BOT_SIZE, deal.botSize);
DataSourceTemplate.createWithContext('Bulk', [bulkCid.value.toString()], context);
// bulk may not be indexed, this is not an issue, the model will prune it
deal.bulk = bulkId;
deal.save();
}
}
}

const dataset = deal.dataset;

let apporder = fetchApporder(event.params.appHash.toHex());
apporder.app = deal.app;
apporder.appprice = deal.appPrice;
apporder.save();

let datasetorder = fetchDatasetorder(event.params.datasetHash.toHex());
if (dataset) datasetorder.dataset = dataset;
datasetorder.datasetprice = deal.datasetPrice;
datasetorder.save();
if (dataset != Address.zero().toHex()) {
let datasetorder = fetchDatasetorder(event.params.datasetHash.toHex());
if (dataset) datasetorder.dataset = dataset;
datasetorder.datasetprice = deal.datasetPrice;
datasetorder.save();
}
Comment on lines +135 to +140
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


let workerpoolorder = fetchWorkerpoolorder(event.params.workerpoolHash.toHex());
workerpoolorder.workerpool = deal.workerpool;
Expand Down
Loading