Skip to content

Commit 1a5035b

Browse files
authored
Merge pull request #34 from usherlabs/LABS-1100/Integrate_Events_Webstream
Labs 1100/integrate events webstream
2 parents 6c713e4 + 9263d10 commit 1a5035b

File tree

17 files changed

+240
-224
lines changed

17 files changed

+240
-224
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ jobs:
3232

3333
- name: Download ROOCH and APTOS
3434
run: |
35-
wget https://github.com/rooch-network/rooch/releases/latest/download/rooch-ubuntu-22.04.zip
35+
wget https://github.com/rooch-network/rooch/releases/latest/download/rooch-ubuntu-22.04.zip
3636
wget -qO- "https://aptos.dev/scripts/install_cli.py" | python3
3737
echo "Download completed"
3838

.vscode/settings.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,8 @@
1414
],
1515
"[typescript]": {
1616
"editor.defaultFormatter": "biomejs.biome"
17+
},
18+
"[prisma]": {
19+
"editor.defaultFormatter": "Prisma.prisma"
1720
}
18-
}
21+
}

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ pnpm dev
171171
--args 'string:POST' \
172172
--args 'string:{}' \
173173
--args 'string:{
174-
"model": "gpt-4",
174+
"model": "gpt-4o",
175175
"messages": [{"role": "user", "content": "Say this is a test!"}],
176176
"temperature": 0.7
177177
}' \

docs/APTOS.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,5 @@ aptos move publish --named-addresses verity=default,verity_test_foreign_module=d
2121
Send a new request transaction to have it indexed. Make sure to replace placeholders with actual values relevant to your setup.
2222

2323
```bash
24-
aptos move run --function-id 0xa2b7160c0dc70548e8105121b075df9ea3b98c0c82294207ca38cb1165b94f59::example_caller::request_data --sender-account default --args 'string:https://api.x.com/2/users/by/username/elonmusk?user.fields=public_metrics' --args 'string:GET' --args 'string:{}' --args 'string:{}' --args 'string:.data.public_metrics.followers_count' --args 'address:6b516ae2eb4aac47ffadd502cf19ce842020f515f1abea3e154cfc053ab3ab9a'
24+
aptos move run --function-id 0xa2b7160c0dc70548e8105121b075df9ea3b98c0c82294207ca38cb1165b94f59::example_caller::request_data --sender-account default --args 'string:https://api.x.com/2/users/by/username/elonmusk?user.fields=public_metrics' --args 'string:GET' --args 'string:{}' --args 'string:{}' --args 'string:.data.public_metrics.followers_count' --args 'address:6b516ae2eb4aac47ffadd502cf19ce842020f515f1abea3e154cfc053ab3ab9a'
2525
```
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
-- CreateTable
2+
CREATE TABLE "SupportedUrl" (
3+
"domain" TEXT NOT NULL,
4+
"supported_path" TEXT[],
5+
"authType" TEXT NOT NULL,
6+
"authKey" TEXT NOT NULL,
7+
"requestRate" BIGINT NOT NULL,
8+
9+
CONSTRAINT "SupportedUrl_pkey" PRIMARY KEY ("domain")
10+
);
11+
12+
-- CreateIndex
13+
CREATE INDEX "SupportedUrl_authKey_idx" ON "SupportedUrl"("authKey");

orchestrator/prisma/schema.prisma

Lines changed: 34 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -10,34 +10,43 @@ datasource db {
1010
url = env("DATABASE_URL")
1111
}
1212

13-
model Events{
14-
id String @id @default(cuid())
15-
eventHandleId String
16-
eventSeq BigInt
17-
eventType String
18-
eventData String
19-
eventIndex String
20-
decoded_event_data String //JSON String
21-
chain String @default("ROOCH-testnet")
22-
23-
status Int
24-
retries Int
25-
response String?
26-
indexedAt DateTime @default(now())
27-
updateAt DateTime @updatedAt
28-
29-
@@index([eventHandleId, eventSeq,chain])
13+
model Events {
14+
id String @id @default(cuid())
15+
eventHandleId String
16+
eventSeq BigInt
17+
eventType String
18+
eventData String
19+
eventIndex String
20+
decoded_event_data String //JSON String
21+
chain String @default("ROOCH-testnet")
22+
23+
status Int
24+
retries Int
25+
response String?
26+
indexedAt DateTime @default(now())
27+
updateAt DateTime @updatedAt
28+
29+
@@index([eventHandleId, eventSeq, chain])
3030
}
3131

32-
model Keeper{
33-
id String @id @default(cuid())
34-
chain String @default("ROOCH-testnet")
35-
module String
36-
privateKey String
32+
model Keeper {
33+
id String @id @default(cuid())
34+
chain String @default("ROOCH-testnet")
35+
module String
36+
privateKey String
3737
38-
createdAt DateTime @default(now())
39-
updateAt DateTime @updatedAt
38+
createdAt DateTime @default(now())
39+
updateAt DateTime @updatedAt
4040
4141
@@unique([chain, module])
42+
}
43+
44+
model SupportedUrl {
45+
domain String @id
46+
supported_path String[]
47+
authType String
48+
authKey String // BEARER, OAUTH1.0, OAUTH2.0
49+
requestRate BigInt
4250
43-
}
51+
@@index([authKey])
52+
}

orchestrator/src/env.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ const baseConfig = {
88
// Rooch
99
roochChainId: (process.env.ROOCH_CHAIN_ID
1010
? process.env.ROOCH_CHAIN_ID.split(",")
11-
: ["testnet", "pre-mainnet"]) as RoochNetwork[],
11+
: ["testnet", "mainnet"]) as RoochNetwork[],
1212
roochPrivateKey: process.env.ROOCH_PRIVATE_KEY ?? "",
1313
roochOracleAddress: process.env.ROOCH_ORACLE_ADDRESS ?? "",
1414
roochIndexerCron: process.env.ROOCH_INDEXER_CRON,
@@ -25,6 +25,7 @@ const baseConfig = {
2525
// Integrations
2626
xBearerToken: process.env.X_BEARER_TOKEN ?? "",
2727
openAIToken: process.env.OPEN_AI_TOKEN ?? "",
28+
azureToken: process.env.AZURE_TOKEN ?? "",
2829
};
2930

3031
interface IEnvVars {
@@ -43,6 +44,7 @@ interface IEnvVars {
4344
batchSize: number;
4445
xBearerToken: string;
4546
openAIToken: string;
47+
azureToken: string;
4648
}
4749

4850
const envVarsSchema = Joi.object({
@@ -93,6 +95,7 @@ const envVarsSchema = Joi.object({
9395
// Integrations
9496
xBearerToken: Joi.string().allow("").required(),
9597
openAIToken: Joi.string().allow("").required(),
98+
azureToken: Joi.string().allow("").required(),
9699

97100
// Common
98101
sentryDSN: Joi.string().allow("", null),
@@ -117,6 +120,7 @@ export default {
117120
integrations: {
118121
xBearerToken: envVars.xBearerToken,
119122
openAIToken: envVars.openAIToken,
123+
azureToken: envVars.azureToken,
120124
},
121125
rooch: {
122126
chainId: envVars.roochChainId,

orchestrator/src/index.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ import { log } from "./logger";
1010
// Start cron job to check for new events from Rooch Oracles
1111

1212
if (env.rooch.privateKey && env.rooch.chainId.length > 0 && env.rooch.oracleAddress && env.chains.includes("ROOCH")) {
13-
// https://www.npmjs.com/package/cron#cronjob-class
14-
1513
env.rooch.chainId.map((chain) => {
1614
const rooch = new RoochIndexer(env.rooch.privateKey, chain, env.rooch.oracleAddress);
1715
let running = false;

orchestrator/src/indexer/base.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import { log } from "@/logger";
22
import { type ProcessedRequestAdded, RequestStatus } from "@/types";
33

4-
import { instance as openAIInstance } from "@/integrations/openAI";
5-
import { instance as xTwitterInstance } from "@/integrations/xtwitter";
4+
import { azureInstance, openAIInstance } from "@/integrations/openAI";
5+
import { xTwitterInstance } from "@/integrations/xtwitter";
66

7-
import type { BasicBearerAPIHandler } from "@/integrations/base";
7+
import { type BasicBearerAPIHandler, dynamicInstanceManager } from "@/integrations/base";
88
import prismaClient from "../../prisma";
99

1010
// Abstract base class
@@ -50,6 +50,12 @@ export abstract class Indexer {
5050
if (openAIInstance.isApprovedPath(url)) {
5151
return openAIInstance;
5252
}
53+
if (azureInstance.isApprovedPath(url)) {
54+
return azureInstance;
55+
}
56+
if (!dynamicInstanceManager.loading) {
57+
return dynamicInstanceManager.getInstance(url.host) || null;
58+
}
5359
return null;
5460
}
5561

@@ -94,7 +100,7 @@ export abstract class Indexer {
94100
if (handler) {
95101
return handler.submitRequest(data);
96102
}
97-
return { status: 406, message: "URL Not supported" };
103+
return { status: 406, message: "URL NOT supported" };
98104
} catch {
99105
return { status: 406, message: "Invalid URL" };
100106
}

orchestrator/src/indexer/rooch.ts

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,23 @@
11
import env from "@/env";
2-
import { instance as xTwitterInstance } from "@/integrations/xtwitter";
2+
import { xTwitterInstance } from "@/integrations/xtwitter";
33
import { log } from "@/logger";
44
import type { IEvent, IRequestAdded, JsonRpcResponse, ProcessedRequestAdded, RoochNetwork } from "@/types";
55
import { decodeNotifyValueFull } from "@/util";
6-
import { Args, RoochClient, Secp256k1Keypair, Transaction, getRoochNodeUrl } from "@roochnetwork/rooch-sdk";
6+
import {
7+
Args,
8+
RoochClient,
9+
RoochWebSocketTransport,
10+
Secp256k1Keypair,
11+
Transaction,
12+
getRoochNodeUrl,
13+
} from "@roochnetwork/rooch-sdk";
714
import axios from "axios";
815
import prismaClient from "../../prisma";
916
import { Indexer } from "./base";
1017

1118
export default class RoochIndexer extends Indexer {
1219
private keyPair: Secp256k1Keypair;
20+
private client: RoochClient;
1321

1422
constructor(
1523
private privateKey: string,
@@ -18,6 +26,18 @@ export default class RoochIndexer extends Indexer {
1826
) {
1927
super(oracleAddress, Secp256k1Keypair.fromSecretKey(privateKey).getRoochAddress().toHexAddress());
2028
this.keyPair = Secp256k1Keypair.fromSecretKey(this.privateKey);
29+
const wsTransport = new RoochWebSocketTransport({
30+
url: getRoochNodeUrl(this.chainId),
31+
reconnectDelay: 1000, // Delay between reconnection attempts (default: 1000ms)
32+
maxReconnectAttempts: 5, // Maximum number of reconnection attempts (default: 5)
33+
requestTimeout: 30000, // Request timeout (default: 30000ms)
34+
connectionReadyTimeout: 5000, // Connection ready timeout (default: 5000ms)
35+
});
36+
37+
// Create client with WebSocket transport
38+
this.client = new RoochClient({
39+
transport: wsTransport,
40+
});
2141
log.info(`Rooch Indexer initialized`);
2242
log.info(`Chain ID: ${this.getChainId()} \n\t\tOrchestrator Oracle Node Address: ${this.orchestrator}`);
2343
}
@@ -35,7 +55,7 @@ export default class RoochIndexer extends Indexer {
3555
*/
3656
async sendUnfulfilledRequests() {
3757
// Initialize the Rooch client with the current node URL
38-
const client = new RoochClient({ url: this.getRoochNodeUrl() });
58+
// const _client = new RoochClient({ url: this.getRoochNodeUrl() });
3959

4060
// Initialize cursor object for pagination
4161
const cursor = {
@@ -53,7 +73,7 @@ export default class RoochIndexer extends Indexer {
5373
while (cursor.isNextPage) {
5474
try {
5575
// Query for object states with pagination
56-
const query = await client.queryObjectStates({
76+
const query = await this.client.queryObjectStates({
5777
filter: {
5878
object_type: `${this.oracleAddress}::oracles::Request`,
5979
},
@@ -135,7 +155,7 @@ export default class RoochIndexer extends Indexer {
135155
}
136156

137157
getRoochNodeUrl() {
138-
return this.chainId === "pre-mainnet" ? "https://main-seed.rooch.network" : getRoochNodeUrl(this.chainId);
158+
return getRoochNodeUrl(this.chainId);
139159
}
140160

141161
/**
@@ -216,11 +236,7 @@ export default class RoochIndexer extends Indexer {
216236
}
217237

218238
async isPreviouslyExecuted(data: ProcessedRequestAdded<any>) {
219-
const client = new RoochClient({
220-
url: this.getRoochNodeUrl(),
221-
});
222-
223-
const view = await client.executeViewFunction({
239+
const view = await this.client.executeViewFunction({
224240
target: `${this.oracleAddress}::oracles::get_response_status`,
225241
args: [Args.objectId(data.request_id)],
226242
});
@@ -240,11 +256,7 @@ export default class RoochIndexer extends Indexer {
240256
* @returns {Promise<any>} - The receipt of the transaction.
241257
*/
242258
async sendFulfillment(data: ProcessedRequestAdded<any>, status: number, result: string) {
243-
const client = new RoochClient({
244-
url: this.getRoochNodeUrl(),
245-
});
246-
247-
const view = await client.executeViewFunction({
259+
const view = await this.client.executeViewFunction({
248260
target: `${this.oracleAddress}::oracles::get_response_status`,
249261
args: [Args.objectId(data.request_id)],
250262
});
@@ -282,7 +294,7 @@ export default class RoochIndexer extends Indexer {
282294

283295
tx.setMaxGas(1000000000);
284296

285-
const receipt = await client.signAndExecuteTransaction({
297+
const receipt = await this.client.signAndExecuteTransaction({
286298
transaction: tx,
287299
signer: this.keyPair,
288300
});
@@ -298,7 +310,7 @@ export default class RoochIndexer extends Indexer {
298310
});
299311
try {
300312
if ((data.notify?.length ?? 0) > 66) {
301-
const module_abi = await client.getModuleAbi({
313+
const module_abi = await this.client.getModuleAbi({
302314
moduleAddr: notify_module[0] ?? "",
303315
moduleName: notify_module[1] ?? "",
304316
});
@@ -312,7 +324,7 @@ export default class RoochIndexer extends Indexer {
312324
args: function_abi?.params?.length === 0 ? [] : [Args.objectId(data.request_id)],
313325
});
314326
tx.setMaxGas(2_0000_0000);
315-
const notification_receipt = await client.signAndExecuteTransaction({
327+
const notification_receipt = await this.client.signAndExecuteTransaction({
316328
transaction: tx,
317329
signer: Secp256k1Keypair.fromSecretKey(keeper_key.privateKey),
318330
});

0 commit comments

Comments
 (0)