Skip to content
Open
2 changes: 1 addition & 1 deletion packages/baseai/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,4 @@
"langbase.com",
"generative AI"
]
}
}
38 changes: 22 additions & 16 deletions packages/baseai/src/deploy/document.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
handleError,
handleInvalidConfig,
listMemoryDocuments,
uploadDocumentsToMemory,
uploadDocumentsToMemory
} from '.';
import path from 'path';
import fs from 'fs/promises';
Expand All @@ -19,7 +19,10 @@ import {
} from '@/utils/memory/load-memory-files';
import type { MemoryI } from 'types/memory';
import { compareDocumentLists } from '@/utils/memory/compare-docs-list';
import { retrieveAuthentication, type Account } from '@/utils/retrieve-credentials';
import {
retrieveAuthentication,
type Account
} from '@/utils/retrieve-credentials';

type Spinner = ReturnType<typeof p.spinner>;

Expand Down Expand Up @@ -114,11 +117,18 @@ async function deployDocument({
process.exit(1);
}

// Fetch the existing documents
const prodDocs = await listMemoryDocuments({
account,
memoryName
});

await handleSingleDocDeploy({
memory: memoryObject,
account,
document,
overwrite
overwrite,
prodDocs
});

spinner.stop(
Expand All @@ -139,33 +149,29 @@ export async function handleSingleDocDeploy({
memory,
account,
document,
overwrite
overwrite,
prodDocs
}: {
memory: MemoryI;
account: Account;
document: MemoryDocumentI;
overwrite: boolean;
prodDocs: string[];
}) {
p.log.info(
`Checking "${memory.name}" memory for document "${document.name}".`
);

// Fetch the existing documents
const prodDocs = await listMemoryDocuments({
account,
memoryName: memory.name
});

// If overwrite is present, deploy.
if (overwrite) {
await uploadDocumentsToMemory({
account,
documents: [document],
name: memory.name
});
p.log.success(
`Document "${document.name}" uploaded to memory "${memory.name}".`
);
// p.log.success(
// `Document "${document.name}" uploaded to memory "${memory.name}".`
// );
return;
}

Expand All @@ -185,9 +191,9 @@ export async function handleSingleDocDeploy({
documents: [document],
name: memory.name
});
p.log.success(
`Document "${document.name}" uploaded to memory "${memory.name}".`
);
// p.log.success(
// `Document "${document.name}" uploaded to memory "${memory.name}".`
// );
return;
}

Expand Down
138 changes: 94 additions & 44 deletions packages/baseai/src/deploy/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -577,12 +577,6 @@ export async function upsertMemory({
p.log.info(
`Memory "${memory.name}" already exists. Updating changed documents.`
);
await handleGitSyncMemoryDeploy({
memory,
account,
documents,
overwrite
});

if (docsToDelete?.length > 0) {
await deleteDocumentsFromMemory({
Expand All @@ -592,6 +586,13 @@ export async function upsertMemory({
});
}

await handleGitSyncMemoryDeploy({
memory,
account,
documents,
overwrite
});

await updateDeployedCommitHash(memory.name);

p.log.info(
Expand Down Expand Up @@ -643,24 +644,43 @@ export async function uploadDocumentsToMemory({
name: string;
account: Account;
}) {
for (const doc of documents) {
try {
p.log.message(`Uploading document: ${doc.name} ....`);
await new Promise(resolve => setTimeout(resolve, 800)); // To avoid rate limiting
const signedUrl = await getSignedUploadUrl({
documentName: doc.name,
memoryName: name,
account,
meta: doc.meta
});
const BATCH_SIZE = 5; // Number of concurrent uploads
const RATE_LIMIT_DELAY = 1500; // 1.5 second delay between requests

// Process documents in batches to avoid rate limiting
for (let i = 0; i < documents.length; i += BATCH_SIZE) {
const batch = documents.slice(i, i + BATCH_SIZE);

const batchUploadPromises = batch.map(async (doc, index) => {
try {
// Stagger requests within batch
await new Promise(resolve =>
setTimeout(resolve, index * RATE_LIMIT_DELAY)
);

const uploadResponse = await uploadDocument(signedUrl, doc.blob);
dlog(`Upload response status: ${uploadResponse.status}`);
// p.log.message(`Uploading document: ${doc.name} ....`);
const signedUrl = await getSignedUploadUrl({
documentName: doc.name,
memoryName: name,
account,
meta: doc.meta
});

p.log.message(`Uploaded document: ${doc.name}`);
} catch (error) {
throw error;
}
const uploadResponse = await uploadDocument(
signedUrl,
doc.blob
);
dlog(`Upload response status: ${uploadResponse.status}`);

p.log.message(`Uploaded document: ${doc.name}`);
} catch (error: any) {
throw new Error(
`Failed to upload ${doc.name}: ${error.message ?? error}`
);
}
});

await Promise.all(batchUploadPromises);
}
}

Expand All @@ -673,25 +693,37 @@ export async function deleteDocumentsFromMemory({
name: string;
account: Account;
}) {
p.log.info(`Deleting documents from memory: ${name}`);
const BATCH_SIZE = 5; // Number of concurrent uploads
const RATE_LIMIT_DELAY = 1500; // 1.5 second delay between requests

for (const doc of documents) {
try {
p.log.message(`Deleting document: ${doc} ....`);
await new Promise(resolve => setTimeout(resolve, 800)); // To avoid rate limiting
p.log.info(`Deleting ${documents.length} documents from memory: ${name}`);

const deleteResponse = await deleteDocument({
documentName: doc,
memoryName: name,
account
});
for (let i = 0; i < documents.length; i += BATCH_SIZE) {
const batch = documents.slice(i, i + BATCH_SIZE);
const batchPromises = batch.map(async (doc, index) => {
try {
await new Promise(resolve =>
setTimeout(resolve, index * RATE_LIMIT_DELAY)
);

dlog(`Delete response status: ${deleteResponse.status}`);
// p.log.message(`Deleting document: ${doc}`);
const deleteResponse = await deleteDocument({
documentName: doc,
memoryName: name,
account
});

p.log.message(`Deleted document: ${doc}`);
} catch (error) {
throw error;
}
dlog(`Delete response status: ${deleteResponse.status}`);
p.log.message(`Deleted document: ${doc}`);
return deleteResponse;
} catch (error: any) {
throw new Error(
`Failed to delete ${doc}: ${error.message ?? error}`
);
}
});

await Promise.all(batchPromises);
}
p.log.info(`Deleted documents from memory: ${name}`);
}
Expand Down Expand Up @@ -1091,14 +1123,32 @@ export async function handleGitSyncMemoryDeploy({
documents: MemoryDocumentI[];
overwrite: boolean;
}) {
for (const doc in documents) {
await new Promise(resolve => setTimeout(resolve, 800)); // To avoid rate limiting
await handleSingleDocDeploy({
memory,
account,
document: documents[doc],
overwrite: true // TODO: Implement overwrite for git-sync memories
const BATCH_SIZE = 5;
const RATE_LIMIT_DELAY = 1500;

// Fetch existing documents once
const prodDocs = await listMemoryDocuments({
account,
memoryName: memory.name
});

// Process in batches
for (let i = 0; i < documents.length; i += BATCH_SIZE) {
const batch = documents.slice(i, i + BATCH_SIZE);
const batchPromises = batch.map(async (doc, index) => {
await new Promise(resolve =>
setTimeout(resolve, index * RATE_LIMIT_DELAY)
);
return handleSingleDocDeploy({
memory,
account,
document: doc,
overwrite: true,
prodDocs
});
});

await Promise.all(batchPromises);
}
}

Expand Down
2 changes: 0 additions & 2 deletions packages/baseai/src/utils/memory/load-memory-files.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ export const loadMemoryFilesFromCustomDir = async ({
process.exit(1);
}

console.log('Reading documents in memory...');

// Get all files that match the glob patterns and are tracked by git
let allFiles: string[];
try {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,4 @@
"langbase.com",
"generative AI"
]
}
}