Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/examples/cosmos-bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ pub async fn serve(kolme: Kolme<CosmosBridgeApp>, bind: SocketAddr) -> Result<()
let processor = Processor::new(kolme.clone(), my_secret_key().clone());
set.spawn(processor.run());
let listener = Listener::new(kolme.clone(), my_secret_key().clone());
set.spawn(listener.run(ChainName::Cosmos));
set.spawn(async move { listener.run(ChainName::Cosmos).await.map_err(Into::into) });
let approver = Approver::new(kolme.clone(), my_secret_key().clone());
set.spawn(approver.run());
let submitter = Submitter::new_cosmos(
Expand All @@ -208,7 +208,7 @@ pub async fn serve(kolme: Kolme<CosmosBridgeApp>, bind: SocketAddr) -> Result<()
);
set.spawn(submitter.run());
let api_server = ApiServer::new(kolme);
set.spawn(api_server.run(bind));
set.spawn(async move { api_server.run(bind).await.map_err(Into::into) });

while let Some(res) = set.join_next().await {
match res {
Expand Down
6 changes: 3 additions & 3 deletions packages/examples/kademlia-discovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ pub async fn observer_node(validator_addr: &str) -> Result<()> {
set.spawn(gossip.run());

let api = ApiServer::new(kolme);
set.spawn(api.run(("0.0.0.0", 2005)));
set.spawn(async move { api.run(("0.0.0.0", 2005)).await.map_err(Into::into) });

loop {
tracing::info!("Continuing execution...");
Expand Down Expand Up @@ -241,13 +241,13 @@ pub async fn validators(port: u16, enable_api_server: bool) -> Result<()> {
// event from chain and then constructs a tx which leads to adding
// new mempool entry.
let listener = Listener::new(kolme.clone(), my_secret_key().clone());
set.spawn(listener.run(ChainName::Cosmos));
set.spawn(async move { listener.run(ChainName::Cosmos).await.map_err(Into::into) });
// Approves pending bridge actions.
let approver = Approver::new(kolme.clone(), my_secret_key().clone());
set.spawn(approver.run());
if enable_api_server {
let api_server = ApiServer::new(kolme.clone());
set.spawn(api_server.run(("0.0.0.0", 2002)));
set.spawn(async move { api_server.run(("0.0.0.0", 2002)).await.map_err(Into::into) });
}
let gossip = GossipBuilder::new()
.add_listener(GossipListener {
Expand Down
2 changes: 1 addition & 1 deletion packages/examples/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ pub async fn api_server(bind: SocketAddr) -> Result<()> {
let gossip = GossipBuilder::new().build(kolme.clone())?;
set.spawn(gossip.run());
let api_server = ApiServer::new(kolme);
set.spawn(api_server.run(bind));
set.spawn(async move { api_server.run(bind).await.map_err(Into::into) });

while let Some(res) = set.join_next().await {
match res {
Expand Down
11 changes: 9 additions & 2 deletions packages/examples/six-sigma/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,10 @@ impl Tasks {
let chain = self.kolme.get_app().chain;
let listener = Listener::new(self.kolme.clone(), my_secret_key().clone());

self.listener = Some(self.set.spawn(listener.run(chain.name())));
self.listener = Some(
self.set
.spawn(async move { listener.run(chain.name()).await.map_err(Into::into) }),
);
}

pub fn spawn_approver(&mut self) {
Expand All @@ -357,7 +360,11 @@ impl Tasks {

pub fn spawn_api_server(&mut self) {
let api_server = ApiServer::new(self.kolme.clone());
self.api_server = Some(self.set.spawn(api_server.run(self.bind)));
let bind = self.bind;
self.api_server = Some(
self.set
.spawn(async move { api_server.run(bind).await.map_err(Into::into) }),
);
}
}

Expand Down
6 changes: 3 additions & 3 deletions packages/examples/solana-cosmos-bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,10 @@ pub async fn serve(
set.spawn(processor.run());

let listener = Listener::new(kolme.clone(), my_secret_key().clone());
set.spawn(listener.run(ChainName::Cosmos));
set.spawn(async move { listener.run(ChainName::Cosmos).await.map_err(Into::into) });

let listener = Listener::new(kolme.clone(), my_secret_key().clone());
set.spawn(listener.run(ChainName::Solana));
set.spawn(async move { listener.run(ChainName::Solana).await.map_err(Into::into) });

let approver = Approver::new(kolme.clone(), my_secret_key().clone());
set.spawn(approver.run());
Expand All @@ -192,7 +192,7 @@ pub async fn serve(
set.spawn(submitter.run());

let api_server = ApiServer::new(kolme);
set.spawn(api_server.run(bind));
set.spawn(async move { api_server.run(bind).await.map_err(Into::into) });

while let Some(res) = set.join_next().await {
match res {
Expand Down
10 changes: 9 additions & 1 deletion packages/integration-tests/tests/key-rotation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,15 @@ async fn test_solana_contract_update(self_replace: bool) {
processor.add_secret(new_processor.clone());
set.spawn(processor.run());
set.spawn(Submitter::new_solana(kolme.clone(), submitter).run());
set.spawn(Listener::new(kolme.clone(), listener.clone()).run(ChainName::Solana));
let kolme_cloned = kolme.clone();
let listener_cloned = listener.clone();

set.spawn(async move {
Listener::new(kolme_cloned, listener_cloned)
.run(ChainName::Solana)
.await
.map_err(Into::into)
});
set.spawn(Approver::new(kolme.clone(), approver.clone()).run());

futures::join!(
Expand Down
44 changes: 37 additions & 7 deletions packages/kolme-store/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,59 @@
use merkle_map::{MerkleSerialError, Sha256Hash};

#[derive(thiserror::Error, Debug)]
#[derive(thiserror::Error, Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum KolmeStoreError {
#[error(transparent)]
Custom(Box<dyn std::error::Error + Send + Sync>),
#[error(transparent)]
Merkle(#[from] MerkleSerialError),
#[error("Custom error: {0}")]
Custom(String),

#[error("Merkle error: {0}")]
Merkle(String),

#[error("Block not found in storage: {height}")]
BlockNotFound { height: u64 },

#[error("KolmeStore::delete_block is not supported by this store: {0}")]
UnsupportedDeleteOperation(&'static str),
UnsupportedDeleteOperation(String),

// kolme#144 - Reports a diverging hash with same height
#[error("Block with height {height} in database with different hash {hash}")]
ConflictingBlockInDb { height: u64, hash: Sha256Hash },

// kolme#144 - Reports a double insert (Block already exists with same hash and insert)
#[error("Block already in database: {height}")]
MatchingBlockAlreadyInserted { height: u64 },

#[error("Transaction is already present in database: {txhash}")]
TxAlreadyInDb { txhash: Sha256Hash },

#[error("get_height_for_tx: invalid height in Fjall store: {details}")]
InvalidHeightInFjall { details: String },

#[error("{0}")]
Other(String),
}

impl KolmeStoreError {
pub fn custom<E: std::error::Error + Send + Sync + 'static>(e: E) -> Self {
Self::Custom(Box::new(e))
Self::Custom(e.to_string())
}
}

impl From<fjall::Error> for KolmeStoreError {
fn from(e: fjall::Error) -> Self {
KolmeStoreError::InvalidHeightInFjall {
details: e.to_string(),
}
}
}

impl From<anyhow::Error> for KolmeStoreError {
fn from(e: anyhow::Error) -> Self {
KolmeStoreError::Other(format!("Anyhow Error: {e}"))
}
}

impl From<MerkleSerialError> for KolmeStoreError {
fn from(e: MerkleSerialError) -> Self {
KolmeStoreError::Other(format!("Merkle Serial Error: {e}"))
}
}
15 changes: 12 additions & 3 deletions packages/kolme-store/src/fjall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ impl KolmeBackingStore for Store {
}

async fn delete_block(&self, _height: u64) -> Result<(), KolmeStoreError> {
Err(KolmeStoreError::UnsupportedDeleteOperation("Fjall"))
Err(KolmeStoreError::UnsupportedDeleteOperation(
"Fjall".to_owned(),
))
}

async fn take_construct_lock(&self) -> Result<crate::KolmeConstructLock, KolmeStoreError> {
Expand All @@ -51,13 +53,20 @@ impl KolmeBackingStore for Store {
self.merkle.clone().load_by_hash(hash).await
}

async fn get_height_for_tx(&self, txhash: Sha256Hash) -> anyhow::Result<Option<u64>> {
async fn get_height_for_tx(
&self,
txhash: Sha256Hash,
) -> core::result::Result<Option<u64>, KolmeStoreError> {
let Some(height) = self.merkle.handle.get(tx_key(txhash))? else {
return Ok(None);
};
let height = match <[u8; 8]>::try_from(&*height) {
Ok(height) => u64::from_be_bytes(height),
Err(e) => anyhow::bail!("get_height_for_tx: invalid height in Fjall store: {e}"),
Err(e) => {
return Err(KolmeStoreError::InvalidHeightInFjall {
details: e.to_string(),
});
}
};
Ok(Some(height))
}
Expand Down
5 changes: 4 additions & 1 deletion packages/kolme-store/src/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ impl KolmeBackingStore for Store {
Ok(self.0.read().await.blocks.contains_key(&height))
}

async fn get_height_for_tx(&self, txhash: Sha256Hash) -> Result<Option<u64>, anyhow::Error> {
async fn get_height_for_tx(
&self,
txhash: Sha256Hash,
) -> std::result::Result<Option<u64>, KolmeStoreError> {
Ok(self.0.read().await.txhashes.get(&txhash).copied())
}

Expand Down
9 changes: 7 additions & 2 deletions packages/kolme-store/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,9 @@ impl KolmeBackingStore for Store {
.inspect_err(|err| tracing::error!("{err:?}"))
}
async fn delete_block(&self, _height: u64) -> Result<(), KolmeStoreError> {
Err(KolmeStoreError::UnsupportedDeleteOperation("Postgres"))
Err(KolmeStoreError::UnsupportedDeleteOperation(
"Postgres".to_owned(),
))
}

async fn take_construct_lock(&self) -> Result<KolmeConstructLock, KolmeStoreError> {
Expand Down Expand Up @@ -218,7 +220,10 @@ impl KolmeBackingStore for Store {
let mut merkle = self.new_store();
merkle.load_by_hash(hash).await
}
async fn get_height_for_tx(&self, txhash: Sha256Hash) -> anyhow::Result<Option<u64>> {
async fn get_height_for_tx(
&self,
txhash: Sha256Hash,
) -> std::result::Result<Option<u64>, KolmeStoreError> {
let txhash = txhash.as_array().as_slice();
let height =
sqlx::query_scalar!("SELECT height FROM blocks WHERE txhash=$1 LIMIT 1", txhash)
Expand Down
5 changes: 4 additions & 1 deletion packages/kolme-store/src/trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ pub trait KolmeBackingStore {
&self,
hash: Sha256Hash,
) -> Result<Option<MerkleLayerContents>, MerkleSerialError>;
async fn get_height_for_tx(&self, txhash: Sha256Hash) -> anyhow::Result<Option<u64>>;
async fn get_height_for_tx(
&self,
txhash: Sha256Hash,
) -> core::result::Result<Option<u64>, KolmeStoreError>;

async fn load_latest_block(&self) -> Result<Option<u64>, KolmeStoreError>;
async fn load_block<Block, FrameworkState, AppState>(
Expand Down
7 changes: 7 additions & 0 deletions packages/kolme-test/src/key_rotation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ async fn test_total_replace_inner(testtasks: TestTasks, (): ()) {
processor.add_secret(new_processor.clone());
testtasks.try_spawn_persistent(processor.run());

// The genesis event hasn't completed, which causes this test to fail.
// We need to investigate why this is happening.
// Adding a short delay (sleep) as shown below allows the test to pass.

tracing::info!("Waiting for genesis event...");
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;

// Swap out the approver and listener right away. Since there's only one
// key being used, we don't need to do any approving.
let expected_new_set = ValidatorSet {
Expand Down
4 changes: 1 addition & 3 deletions packages/kolme-test/src/max_tx_height.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ async fn max_tx_height_inner(testtasks: TestTasks, (): ()) {
let e: KolmeError = kolme
.sign_propose_await_transaction(&secret, tx_builder)
.await
.unwrap_err()
.downcast()
.unwrap();
.unwrap_err();
match e {
KolmeError::PastMaxHeight {
txhash: _,
Expand Down
11 changes: 7 additions & 4 deletions packages/kolme-test/src/multiple_processors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ async fn multiple_processors_inner(
(kolmes, all_txhashes, highest_block)
}

async fn check_failed_txs(kolme: Kolme<SampleKolmeApp>) -> Result<()> {
async fn check_failed_txs(kolme: Kolme<SampleKolmeApp>) -> std::result::Result<(), KolmeError> {
let mut recv = kolme.subscribe();
loop {
match recv.recv().await? {
Expand All @@ -146,9 +146,12 @@ async fn check_failed_txs(kolme: Kolme<SampleKolmeApp>) -> Result<()> {
error,
proposed_height,
} = failed.message.as_inner();
anyhow::bail!(
"Error with transaction {txhash} for block {proposed_height}: {error}"
)

return Err(KolmeError::TransactionFailed {
txhash: *txhash,
proposed_height: *proposed_height,
error: error.to_string(),
});
}
Notification::LatestBlock(_) => (),
Notification::EvictMempoolTransaction(_) => (),
Expand Down
2 changes: 1 addition & 1 deletion packages/kolme-test/src/validations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async fn test_invalid_hashes_inner(testtasks: TestTasks, (): ()) {
kolme: &Kolme<SampleKolmeApp>,
mut block: Block<SampleMessage>,
f: impl FnOnce(&mut Block<SampleMessage>),
) -> anyhow::Result<()> {
) -> std::result::Result<(), KolmeError> {
f(&mut block);
let signed = TaggedJson::new(block)
.unwrap()
Expand Down
27 changes: 23 additions & 4 deletions packages/kolme/src/api_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,38 @@ impl<App: KolmeApp> ApiServer<App> {
ApiServer { kolme }
}

pub async fn run<A: tokio::net::ToSocketAddrs>(self, addr: A) -> Result<()> {
pub async fn run<A: tokio::net::ToSocketAddrs>(
self,
addr: A,
) -> std::result::Result<(), KolmeError> {
let cors = CorsLayer::new()
.allow_methods([Method::GET, Method::POST, Method::PUT])
.allow_origin(Any)
.allow_headers([CONTENT_TYPE]);

let app = base_api_router().layer(cors).with_state(self.kolme);

let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
tracing::info!("Starting API server on {:?}", listener.local_addr()?);
let listener =
tokio::net::TcpListener::bind(addr)
.await
.map_err(|e| KolmeError::ApiServerError {
details: format!("TCP bind error: {e}"),
})?;

tracing::info!(
"Starting API server on {:?}",
listener
.local_addr()
.map_err(|e| KolmeError::ApiServerError {
details: format!("Local address error: {e}"),
})
);

axum::serve(listener, app)
.await
.map_err(anyhow::Error::from)
.map_err(|e| KolmeError::ApiServerError {
details: format!("Axum server error: {e}"),
})
}
}

Expand Down
4 changes: 2 additions & 2 deletions packages/kolme/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ impl<T: serde::de::DeserializeOwned> MerkleDeserialize for SignedTaggedJson<T> {
}

impl<T> SignedTaggedJson<T> {
pub fn verify_signature(&self) -> Result<PublicKey> {
pub fn verify_signature(&self) -> std::result::Result<PublicKey, KolmeError> {
PublicKey::recover_from_msg(self.message.as_bytes(), &self.signature, self.recovery_id)
.map_err(anyhow::Error::from)
.map_err(KolmeError::from)
}

pub(crate) fn message_hash(&self) -> Sha256Hash {
Expand Down
Loading
Loading