diff --git a/examples/bot.ts b/examples/bot.ts new file mode 100644 index 00000000..466b9a83 --- /dev/null +++ b/examples/bot.ts @@ -0,0 +1,43 @@ +import { + AutojoinRoomsMixin, + LogLevel, + LogService, + MessageEvent, + RichConsoleLogger, + SimpleFsStorageProvider +} from "../src"; +import { SyncV3MatrixClient } from "../src/syncv3/SyncV3MatrixClient"; + +LogService.setLogger(new RichConsoleLogger()); +LogService.setLevel(LogLevel.TRACE); +LogService.muteModule("Metrics"); +LogService.trace = LogService.debug; + +let creds = null; +try { + creds = require("../../examples/storage/bot.creds.json"); +} catch (e) { + // ignore +} + +const homeserverUrl = creds?.['homeserverUrl'] ?? "http://localhost:8008"; +const accessToken = creds?.['accessToken'] ?? 'YOUR_TOKEN'; +const storage = new SimpleFsStorageProvider("./examples/storage/bot.json"); + +const client = new SyncV3MatrixClient(homeserverUrl, accessToken, storage); +AutojoinRoomsMixin.setupOnClient(client); + +(async function() { + // client.joinRoom('!dIJnfuNqTABKFEGJVf:localhost'); + client.on("room.message", async (roomId: string, event: any) => { + const message = new MessageEvent(event); + if (message.sender === await client.getUserId() || message.messageType === "m.notice") return; + + if (message.textBody.startsWith("!hello ")) { + await client.replyText(roomId, event, message.textBody.substring("!hello ".length).trim() || "Hello!"); + } + }); + + await client.start(); + LogService.info("index", "Client started! Running as " + (await client.getUserId())); +})(); diff --git a/package.json b/package.json index 98eb2f85..63563bcd 100644 --- a/package.json +++ b/package.json @@ -19,6 +19,7 @@ "lint": "eslint 'src/**/*.ts'", "test": "ts-mocha --project ./tsconfig.json test/*Test.ts test/**/*.ts", "build:examples": "tsc -p tsconfig-examples.json", + "example:bot": "yarn build:examples && node lib/examples/bot.js", "example:appservice": "yarn build:examples && node lib/examples/appservice.js", "example:login_register": "yarn build:examples && node lib/examples/login_register.js", "example:encryption_bot": "yarn build:examples && node lib/examples/encryption_bot.js", diff --git a/src/MatrixClient.ts b/src/MatrixClient.ts index 8ca7119a..44c1bbb1 100644 --- a/src/MatrixClient.ts +++ b/src/MatrixClient.ts @@ -81,7 +81,7 @@ export class MatrixClient extends EventEmitter { private joinStrategy: IJoinRoomStrategy = null; private eventProcessors: { [eventType: string]: IPreprocessor[] } = {}; private filterId = 0; - private stopSyncing = false; + protected stopSyncing = false; private metricsInstance: Metrics = new Metrics(); private unstableApisInstance = new UnstableApis(this); diff --git a/src/syncv3/SyncV3MatrixClient.ts b/src/syncv3/SyncV3MatrixClient.ts new file mode 100644 index 00000000..0bdfe7dc --- /dev/null +++ b/src/syncv3/SyncV3MatrixClient.ts @@ -0,0 +1,104 @@ +import { MatrixClient } from "../MatrixClient"; +import { extractRequestError, LogService } from "../logging/LogService"; +import { IStorageProvider } from "../storage/IStorageProvider"; +import { SyncV3Response } from "./models"; +import { IV3List, ListBehaviour, SortBehaviour, V3List } from "./V3List"; + +/** + * A MatrixClient class which attempts to use Sync V3 instead of the normal sync protocol + * on the server. + * + * This class is considered UNSTABLE and may be removed at any time. This class will + * not check for server support before attempting to use it. + * + * @category Unstable: Sync V3 + */ +export class SyncV3MatrixClient extends MatrixClient { + private lastPos: string; + private lists: V3List[] = []; + + /** + * Creates a new matrix client + * @param {string} homeserverUrl The homeserver's client-server API URL + * @param {string} accessToken The access token for the homeserver + * @param {IStorageProvider} storage The storage provider to use. + * @param {SortBehaviour[]} sortBehaviours The list sorting behaviour to use. + */ + public constructor(homeserverUrl: string, accessToken: string, storage: IStorageProvider, sortBehaviours: SortBehaviour[] = [SortBehaviour.Name]) { + super(homeserverUrl, accessToken, storage); + + this.lists = [ + new V3List(ListBehaviour.JoinedOnly, sortBehaviours), + new V3List(ListBehaviour.InvitedOnly, sortBehaviours), + new V3List(ListBehaviour.DMsOnly, sortBehaviours), + ]; + } + + public get joinedList(): IV3List { + return this.lists[0]; + } + + public get invitedList(): IV3List { + return this.lists[1]; + } + + public get directMessagesList(): IV3List { + return this.lists[2]; + } + + protected async startSyncInternal(): Promise { + this.lastPos = await this.storageProvider.getSyncToken(); + for (let i = 0; i < this.lists.length; i++) { + const value = JSON.parse((await this.storageProvider.readValue(`sync_v3_list_${i}`)) || "{}"); + await this.lists[i].processOperations(this, value.count ?? 0, value.ops ?? []); + } + this.loopSync(); + } + + private loopSync() { + const promiseWhile = async () => { + if (this.stopSyncing) { + LogService.info("MatrixClientLite", "Client stop requested - stopping sync"); + return; + } + + try { + const response = await this.doSyncV3(); + await this.processSyncV3(response); + this.lastPos = response.pos; + await this.storageProvider.setSyncToken(this.lastPos); + } catch (e) { + LogService.error("MatrixClientLite", "Error handling sync " + extractRequestError(e)); + const backoffTime = 5000 + Math.random() * (15000 - 5000); // TODO: de-hardcode values. SYNC_BACKOFF_MIN_MS SYNC_BACKOFF_MAX_MS + LogService.info("MatrixClientLite", `Backing off for ${backoffTime}ms`); + await new Promise((r) => setTimeout(r, backoffTime)); + } + + return promiseWhile(); + }; + + promiseWhile(); // start loop async + } + + private async processSyncV3(sync: SyncV3Response): Promise { + for (let i = 0; i < this.lists.length; i++) { + const ops = sync.ops.filter(o => o.list === i); + await this.lists[i].processOperations(this, sync.counts[i], ops); + await this.storageProvider.storeValue(`sync_v3_list_${i}`, JSON.stringify({ + ops: this.lists[i].lossySerialized, + count: this.lists[i].totalRoomCount, + })); + } + } + + private async doSyncV3(): Promise { + const userId = await this.getUserId(); + const qs = {}; + if (this.lastPos) qs['pos'] = this.lastPos; + return this.doRequest("POST", "/_matrix/client/unstable/org.matrix.msc3575/sync", qs, { + lists: this.lists.map(l => l.getDefinitionFor(userId)), + + // TODO: Support extensions for crypto and such + }); + } +} diff --git a/src/syncv3/V3List.ts b/src/syncv3/V3List.ts new file mode 100644 index 00000000..a149b14b --- /dev/null +++ b/src/syncv3/V3List.ts @@ -0,0 +1,179 @@ +import { IV3Room, V3Room } from "./V3Room"; +import { Operation, OpSync, SyncV3Operation } from "./operations"; +import { SyncV3List, SyncV3OperationRoom, SyncV3Room } from "./models"; +import { MatrixClient } from "../MatrixClient"; + +/** + * @category Unstable: Sync V3 + */ +export enum ListBehaviour { + JoinedOnly, + InvitedOnly, + DMsOnly, +} + +/** + * @category Unstable: Sync V3 + */ +export enum SortBehaviour { + NotificationCount = "by_notification_count", + Recency = "by_recency", + Name = "by_name", +} + +/** + * @category Unstable: Sync V3 + */ +export const CURRENT_USER_STATE_KEY = "@CURRENT"; + +/** + * @category Unstable: Sync V3 + */ +export type RequiredStateTuple = [string, string]; + +// Unclear if it's even worth changing this to be smaller/bigger, but let's start here. +const WINDOW_SIZE = 1000; + +const TIMELINE_LIMIT = 20; // make configurable? + +/** + * @category Unstable: Sync V3 + */ +export interface IV3List { + totalRoomCount: number; + orderedKnownRooms: IV3Room[]; + unorderedAllRooms: IV3Room[]; +} + +/** + * @category Unstable: Sync V3 + */ +export class V3List implements IV3List { + private roomsByRange = new Map(); // key is JSON-serialized range + private roomMap = new Map(); // key is room ID + private totalCount = 0; + + public constructor( + public readonly behaviour: ListBehaviour, + public readonly sort: SortBehaviour[], + ) { + } + + public get lossySerialized(): Omit[] { + return Array.from(this.roomsByRange.entries()).map(([serializedRange, rooms]) => ({ + op: Operation.Sync, + range: JSON.parse(serializedRange), + rooms: rooms.filter(r => !!r).map(r => r.lossySerialized), + })); + } + + public get totalRoomCount(): number { + return this.totalCount; + } + + public get orderedKnownRooms(): V3Room[] { + // "Just because you can chain functions, doesn't mean you should" ~ CS Instructor + return Array.from(this.roomsByRange.entries()) + .map(([serializedRange, rooms]) => [JSON.parse(serializedRange)[0], rooms] as [number, V3Room[]]) + .sort((a, b) => a[0] - b[0]) + .map(e => e[1]) + .reduce((p, c) => [...p, ...c], []); + } + + public get unorderedAllRooms(): V3Room[] { + return Array.from(this.roomMap.values()); + } + + private findRange(index: number): [number, number] { + return [ + Math.floor(index / WINDOW_SIZE) * WINDOW_SIZE, + Math.floor((index + WINDOW_SIZE) / WINDOW_SIZE) * WINDOW_SIZE, + ]; + } + + private findIndexInRange(sourceIndex: number, range: [number, number]): number { + return sourceIndex - range[0]; + } + + public getDefinitionFor(currentUserId: string): SyncV3List { + const ranges: [number, number][] = []; + for (let i = 0; i <= Math.ceil(this.totalCount / WINDOW_SIZE); i++) { + ranges.push([i * WINDOW_SIZE, (i + 1) * WINDOW_SIZE]); + } + return { + filters: { + is_dm: this.behaviour === ListBehaviour.DMsOnly, + is_invite: this.behaviour === ListBehaviour.InvitedOnly, + }, + sort: this.sort, + required_state: V3Room.REQUIRED_STATE.map(t => [t[0], t[1] === CURRENT_USER_STATE_KEY ? currentUserId : t[1]]), + timeline_limit: TIMELINE_LIMIT, + rooms: ranges, + }; + } + + private async getOrCreateRoom(client: MatrixClient, room: SyncV3OperationRoom): Promise { + if (!this.roomMap.get(room.room_id)) { + this.roomMap.set(room.room_id, new V3Room(room.room_id)); + } + const mapped = this.roomMap.get(room.room_id); + if (room.required_state) await mapped.updateState(client, room.required_state); + if (room.timeline) await mapped.updateTimeline(client, room.timeline); + return mapped; + } + + private getOrCreateRangeFromIndex(index: number): {range: [number, number], inRangeIdx: number, rooms: V3Room[]} { + const range = this.findRange(index); + const inRangeIdx = this.findIndexInRange(index, range); + let rooms = this.roomsByRange.get(JSON.stringify(range)); + if (!rooms) { + rooms = new Array(range[1] - range[0]).fill(null); + this.roomsByRange.set(JSON.stringify(range), rooms); + } + return {range, inRangeIdx, rooms}; + } + + public async processOperations(client: MatrixClient, totalCount: number, ops: SyncV3Operation[]): Promise { + if (totalCount) this.totalCount = totalCount; + for (const op of ops) { + switch(op.op) { + case Operation.Delete: { + const range = this.findRange(op.index); + const inRangeIdx = this.findIndexInRange(op.index, range); + const rooms = this.roomsByRange.get(JSON.stringify(range)); + if (rooms) { + const [deletedRoom] = rooms.splice(inRangeIdx, 1); + this.roomMap.delete(deletedRoom.roomId); + } + break; + } + case Operation.Insert: { + const info = this.getOrCreateRangeFromIndex(op.index); + info.rooms[info.inRangeIdx] = await this.getOrCreateRoom(client, op.room); + break; + } + case Operation.Update: { + const info = this.getOrCreateRangeFromIndex(op.index); + const room = info.rooms[info.inRangeIdx]; + if (!room) throw new Error("Failed to handle update operation: unknown room at index. Op: " + JSON.stringify(op)); + if (op.room.required_state) await room.updateState(client, op.room.required_state); + if (op.room.timeline) await room.updateTimeline(client, op.room.timeline); + break; + } + case Operation.Sync: { + const rooms = new Array(op.range[1] - op.range[0]).fill(null) + for (let i = 0; i < op.rooms.length; i++) { + if (!op.rooms[i]) continue; + rooms[i] = await this.getOrCreateRoom(client, op.rooms[i]); + } + this.roomsByRange.set(JSON.stringify(op.range), rooms); + break; + } + case Operation.Invalidate: { + this.roomsByRange.delete(JSON.stringify(op.range)); + break; + } + } + } + } +} diff --git a/src/syncv3/V3Room.ts b/src/syncv3/V3Room.ts new file mode 100644 index 00000000..0abf5aff --- /dev/null +++ b/src/syncv3/V3Room.ts @@ -0,0 +1,92 @@ +import { RoomNameEvent } from "../models/events/RoomNameEvent"; +import { RoomTopicEvent } from "../models/events/RoomTopicEvent"; +import { EncryptionEvent, EncryptionEventContent } from "../models/events/EncryptionEvent"; +import { CURRENT_USER_STATE_KEY, RequiredStateTuple } from "./V3List"; +import { MembershipEvent } from "../models/events/MembershipEvent"; +import { SyncV3Event, SyncV3Room } from "./models"; +import { MatrixClient } from "../MatrixClient"; + +/** + * @category Unstable: Sync V3 + */ +export interface IV3Room { + name: string | undefined; + topic: string | undefined; + encryptionConfig: EncryptionEventContent | undefined; + membership: MembershipEvent; +} + +/** + * @category Unstable: Sync V3 + */ +export class V3Room implements IV3Room { + private nameEvent: RoomNameEvent; + private topicEvent: RoomTopicEvent; + private encryptionEvent: EncryptionEvent; + private myMembership: MembershipEvent; + + // TODO: Include other event types as "required"? + + public constructor(public readonly roomId: string) { + } + + public get lossySerialized(): SyncV3Room & {room_id: string} { + return { + room_id: this.roomId, + timeline: [], + required_state: [ + this.nameEvent, this.topicEvent, this.encryptionEvent, this.myMembership, + ].filter(e => !!e).map(e => e.raw), + name: this.nameEvent?.name, + + // We don't know these values, but also don't really care + highlight_count: 0, + notification_count: 0, + }; + } + + public get name(): string | undefined { + return this.nameEvent?.name ?? undefined; + } + + public get topic(): string | undefined { + return this.topicEvent?.topic ?? undefined; + } + + public get encryptionConfig(): EncryptionEventContent | undefined { + if (!this.encryptionEvent?.algorithm) return undefined; + return this.encryptionEvent?.content ?? undefined; + } + + public get membership(): MembershipEvent { + return this.myMembership; + } + + public async updateState(client: MatrixClient, state: (SyncV3Event & {state_key: string})[]): Promise { + // TODO: Send events through pre-processor + + for (const event of state) { + if (event['type'] === "m.room.name" && event['state_key'] === '') this.nameEvent = new RoomNameEvent(event); + if (event['type'] === "m.room.topic" && event['state_key'] === '') this.topicEvent = new RoomTopicEvent(event); + if (event['type'] === "m.room.encryption" && event['state_key'] === '') this.encryptionEvent = new EncryptionEvent(event); + if (event['type'] === "m.room.member" && event['state_key'] === (await client.getUserId())) this.myMembership = new MembershipEvent(event); + } + } + + public async updateTimeline(client: MatrixClient, timeline: SyncV3Event[]): Promise { + // TODO: Send events through pre-processor + + // TODO: Process membership + // TODO: Process events + console.log('@@ TIMELINE', timeline); + } + + public static get REQUIRED_STATE(): RequiredStateTuple[] { + return [ + ["m.room.name", ""], + ["m.room.topic", ""], + ["m.room.encryption", ""], + ["m.room.member", CURRENT_USER_STATE_KEY], + ]; + } +} diff --git a/src/syncv3/models.ts b/src/syncv3/models.ts new file mode 100644 index 00000000..2209c532 --- /dev/null +++ b/src/syncv3/models.ts @@ -0,0 +1,71 @@ +import { SyncV3Operation } from "./operations"; +import { RequiredStateTuple, SortBehaviour } from "./V3List"; + +/** + * @category Unstable: Sync V3 + */ +export interface SyncV3Event { + // Normal MatrixEvent, but we don't have types for that yet. + + event_id: string; + type: string; + sender: string; + state_key?: string; + content: Record; + origin_server_ts: number; + unsigned: Record; +} + +/** + * @category Unstable: Sync V3 + */ +export interface SyncV3Room { + name: string; + required_state: (SyncV3Event & {state_key: string})[]; + timeline: SyncV3Event[]; + notification_count: number; + highlight_count: number; +} + +/** + * @category Unstable: Sync V3 + */ +export interface SyncV3OperationRoom extends Pick { + room_id: string; +} + +/** + * @category Unstable: Sync V3 + */ +export interface SyncV3Response { + ops: SyncV3Operation[]; + initial?: boolean; + room_subscriptions: { + [roomId: string]: SyncV3Room; + }; + counts: number[]; // number of joined rooms per list + extensions: {}; // TODO + pos: string; +} + +/** + * @category Unstable: Sync V3 + */ +export interface SyncV3List { + rooms: [number, number][]; + sort: SortBehaviour[]; + required_state: RequiredStateTuple[]; + timeline_limit: number; + filters?: SyncV3ListFilter; +} + +/** + * @category Unstable: Sync V3 + */ +export interface SyncV3ListFilter { + is_dm?: boolean; + spaces?: string[]; // IDs + is_encrypted?: boolean; + is_invite?: boolean; +} + diff --git a/src/syncv3/operations.ts b/src/syncv3/operations.ts new file mode 100644 index 00000000..36c3afbe --- /dev/null +++ b/src/syncv3/operations.ts @@ -0,0 +1,70 @@ +import { SyncV3OperationRoom, SyncV3Room } from "./models"; + +/** + * @category Unstable: Sync V3 + */ +export enum Operation { + Sync = "SYNC", + Update = "UPDATE", + Insert = "INSERT", + Delete = "DELETE", + Invalidate = "INVALIDATE", +} + +/** + * @category Unstable: Sync V3 + */ +export type OpSync = { + op: Operation.Sync; + list: number; + range: [number, number]; + rooms: (SyncV3Room & {room_id: string})[]; +} + +/** + * @category Unstable: Sync V3 + */ +export type OpUpdate = { + op: Operation.Update; + list: number; + index: number; + room: Partial; // changed fields only +} + +/** + * @category Unstable: Sync V3 + */ +export type OpDelete = { + op: Operation.Delete; + list: number; + index: number; +} + +/** + * @category Unstable: Sync V3 + */ +export type OpInsert = { + op: Operation.Insert; + list: number; + index: number; + room: SyncV3OperationRoom; +} + +/** + * @category Unstable: Sync V3 + */ +export type OpInvalidate = { + op: Operation.Invalidate; + list: number; + range: [number, number]; +} + +/** + * @category Unstable: Sync V3 + */ +export type SyncV3Operation = + | OpSync + | OpInvalidate + | OpInsert + | OpUpdate + | OpDelete;