diff --git a/src/bin/oura/daemon.rs b/src/bin/oura/daemon.rs index e33d71a6..af3df2b4 100644 --- a/src/bin/oura/daemon.rs +++ b/src/bin/oura/daemon.rs @@ -44,6 +44,9 @@ use oura::sinks::aws_lambda::Config as AwsLambdaConfig; #[cfg(feature = "aws")] use oura::sinks::aws_s3::Config as AwsS3Config; +#[cfg(feature = "aws")] +use oura::sinks::aws_s3_sqs::Config as AwsS3SqsConfig; + #[cfg(feature = "redissink")] use oura::sinks::redis::Config as RedisConfig; @@ -130,6 +133,9 @@ enum Sink { #[cfg(feature = "aws")] AwsS3(AwsS3Config), + #[cfg(feature = "aws")] + AwsS3Sqs(AwsS3SqsConfig), + #[cfg(feature = "redissink")] Redis(RedisConfig), @@ -170,6 +176,9 @@ fn bootstrap_sink(config: Sink, input: StageReceiver, utils: Arc) -> Boot #[cfg(feature = "aws")] Sink::AwsS3(c) => WithUtils::new(c, utils).bootstrap(input), + #[cfg(feature = "aws")] + Sink::AwsS3Sqs(c) => WithUtils::new(c, utils).bootstrap(input), + #[cfg(feature = "redissink")] Sink::Redis(c) => WithUtils::new(c, utils).bootstrap(input), diff --git a/src/sinks/aws_s3_sqs/combined_client.rs b/src/sinks/aws_s3_sqs/combined_client.rs new file mode 100644 index 00000000..1a6e92c4 --- /dev/null +++ b/src/sinks/aws_s3_sqs/combined_client.rs @@ -0,0 +1,241 @@ +use super::Config; +use crate::model::BlockRecord; +use crate::sinks::aws_s3_sqs::{ContentType, Naming}; +use crate::Error; +use aws_sdk_s3::types::ByteStream as S3ByteStream; +use aws_sdk_s3::Client as S3Client; +use aws_sdk_s3::Region as S3Region; +use aws_sdk_s3::RetryConfig as S3RetryConfig; +use aws_sdk_sqs::Client as SqsClient; +use aws_sdk_sqs::Region as SqsRegion; +use aws_sdk_sqs::RetryConfig as SqsRetryConfig; +use serde::{Deserialize, Serialize}; +use serde_json::json; + +const DEFAULT_MAX_RETRIES: u32 = 5; + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +struct SqsMessage { + s3_key: String, + block_hash: String, + previous_hash: String, + block_number: u64, + slot: u64, + tip: u64, +} + +impl From<&ContentType> for String { + fn from(other: &ContentType) -> Self { + match other { + ContentType::Cbor => "application/cbor".to_string(), + ContentType::CborHex => "text/plain".to_string(), + ContentType::Json => "application/json".to_string(), + } + } +} + +pub(super) struct CombinedClient { + s3: S3Client, + sqs: SqsClient, + config: Config, + naming: Naming, + content_type: ContentType, + sqs_group_id: String, + s3_prefix: String, +} + +impl CombinedClient { + pub fn new(config: &Config) -> Result { + let s3 = setup_s3_client(config)?; + let sqs = setup_sqs_client(config)?; + let naming = config.s3_naming.clone().unwrap_or(Naming::Hash); + let content_type = config.s3_content.clone().unwrap_or(ContentType::Cbor); + let group_id = config + .sqs_group_id + .clone() + .unwrap_or_else(|| "oura-sink".to_string()); + let s3_prefix = config.s3_prefix.clone().unwrap_or_default(); + Ok(CombinedClient { + s3, + sqs, + config: config.clone(), + naming, + content_type, + sqs_group_id: group_id, + s3_prefix, + }) + } + + pub async fn send_block( + self: &Self, + record: &BlockRecord, + tip: u64, + ) -> Result<(), Error> { + let key = self.get_s3_key(record); + self.send_s3_object(&key, record).await?; + self.send_sqs_message(&key, record, tip).await?; + Ok(()) + } + + async fn send_s3_object(self: &Self, key: &str, record: &BlockRecord) -> Result<(), Error> { + let content_type: String = String::from(&self.content_type); + let content = encode_block(&self.content_type, record); + let req = self + .s3 + .put_object() + .bucket(&self.config.s3_bucket) + .key(key) + .body(content) + .metadata("era", record.era.to_string()) + .metadata("issuer_vkey", &record.issuer_vkey) + .metadata("tx_count", record.tx_count.to_string()) + .metadata("slot", record.slot.to_string()) + .metadata("hash", &record.hash) + .metadata("number", record.number.to_string()) + .metadata("previous_hash", &record.previous_hash) + .content_type(content_type); + + let res = req.send().await?; + + log::trace!("S3 put response: {:?}", res); + + Ok(()) + } + + async fn send_sqs_message( + self: &Self, + key: &str, + record: &BlockRecord, + tip: u64, + ) -> Result<(), Error> { + let message = SqsMessage { + s3_key: key.to_string(), + block_hash: record.hash.to_string(), + previous_hash: record.previous_hash.to_string(), + block_number: record.number, + slot: record.slot, + tip: tip, + }; + + let body = json!(message).to_string(); + + let mut req = self + .sqs + .send_message() + .queue_url(&self.config.sqs_queue_url) + .message_body(body); + + if self.config.sqs_fifo.unwrap_or_default() { + req = req + .message_group_id(&self.sqs_group_id) + .message_deduplication_id(key); + } + + let res = req.send().await?; + + log::trace!("SQS send response: {:?}", res); + + Ok(()) + } + + fn get_s3_key(&self, record: &BlockRecord) -> String { + define_obj_key(&self.s3_prefix, &self.naming, record) + } +} + +fn encode_block(content_type: &ContentType, record: &BlockRecord) -> S3ByteStream { + let hex = match record.cbor_hex.as_ref() { + Some(x) => x, + None => { + log::error!( + "found block record without CBOR, please enable CBOR in source mapper options" + ); + panic!() + } + }; + + match content_type { + ContentType::Cbor => { + let cbor = hex::decode(hex).expect("valid hex value"); + S3ByteStream::from(cbor) + } + ContentType::CborHex => S3ByteStream::from(hex.as_bytes().to_vec()), + ContentType::Json => { + let json = json!(record).to_string().as_bytes().to_vec(); + S3ByteStream::from(json) + } + } +} + +fn setup_s3_client(config: &Config) -> Result { + let explicit_region = config.s3_region.to_owned(); + + let aws_config = tokio::runtime::Builder::new_current_thread() + .build()? + .block_on( + aws_config::from_env() + .region(S3Region::new(explicit_region)) + .load(), + ); + + let retry_config = S3RetryConfig::new() + .with_max_attempts(config.s3_max_retries.unwrap_or(DEFAULT_MAX_RETRIES)); + + let s3_config = aws_sdk_s3::config::Builder::from(&aws_config) + .retry_config(retry_config) + .build(); + + Ok(S3Client::from_conf(s3_config)) +} + +fn setup_sqs_client(config: &Config) -> Result { + let explicit_region = config.sqs_region.to_owned(); + + let aws_config = tokio::runtime::Builder::new_current_thread() + .build()? + .block_on( + aws_config::from_env() + .region(SqsRegion::new(explicit_region)) + .load(), + ); + + let retry_config = SqsRetryConfig::new() + .with_max_attempts(config.sqs_max_retries.unwrap_or(DEFAULT_MAX_RETRIES)); + + let sqs_config = aws_sdk_sqs::config::Builder::from(&aws_config) + .retry_config(retry_config) + .build(); + + Ok(SqsClient::from_conf(sqs_config)) +} + +fn define_obj_key(prefix: &str, policy: &Naming, record: &BlockRecord) -> String { + match policy { + Naming::Hash => format!("{}{}", prefix, record.hash), + Naming::SlotHash => format!("{}{}.{}", prefix, record.slot, record.hash), + Naming::BlockHash => format!("{}{}.{}", prefix, record.number, record.hash), + Naming::BlockNumber => format!("{}", record.number), + Naming::EpochHash => format!( + "{}{}.{}", + prefix, + record.epoch.unwrap_or_default(), + record.hash + ), + Naming::EpochSlotHash => format!( + "{}{}.{}.{}", + prefix, + record.epoch.unwrap_or_default(), + record.slot, + record.hash + ), + Naming::EpochBlockHash => { + format!( + "{}{}.{}.{}", + prefix, + record.epoch.unwrap_or_default(), + record.number, + record.hash + ) + } + } +} diff --git a/src/sinks/aws_s3_sqs/config.rs b/src/sinks/aws_s3_sqs/config.rs new file mode 100644 index 00000000..a3fbb06a --- /dev/null +++ b/src/sinks/aws_s3_sqs/config.rs @@ -0,0 +1,35 @@ +use serde::Deserialize; + +#[derive(Deserialize, Debug, Clone)] +pub enum Naming { + Hash, + SlotHash, + BlockHash, + BlockNumber, + EpochHash, + EpochSlotHash, + EpochBlockHash, +} + +#[derive(Deserialize, Debug, Clone)] +pub enum ContentType { + Cbor, + CborHex, + Json, +} + +#[derive(Default, Debug, Deserialize, Clone)] +pub struct Config { + pub s3_region: String, + pub s3_bucket: String, + pub s3_prefix: Option, + pub s3_naming: Option, + pub s3_content: Option, + pub s3_max_retries: Option, + + pub sqs_region: String, + pub sqs_queue_url: String, + pub sqs_fifo: Option, + pub sqs_group_id: Option, + pub sqs_max_retries: Option, +} diff --git a/src/sinks/aws_s3_sqs/mod.rs b/src/sinks/aws_s3_sqs/mod.rs new file mode 100644 index 00000000..14075616 --- /dev/null +++ b/src/sinks/aws_s3_sqs/mod.rs @@ -0,0 +1,7 @@ +mod combined_client; +mod config; +mod run; +mod setup; + +pub use self::config::*; +pub use setup::*; diff --git a/src/sinks/aws_s3_sqs/run.rs b/src/sinks/aws_s3_sqs/run.rs new file mode 100644 index 00000000..48a5c5b5 --- /dev/null +++ b/src/sinks/aws_s3_sqs/run.rs @@ -0,0 +1,39 @@ +use std::sync::Arc; + +use crate::sinks::aws_s3_sqs::combined_client::CombinedClient; +use crate::{model::EventData, pipelining::StageReceiver, utils::Utils, Error}; + +pub(super) fn writer_loop( + input: StageReceiver, + client: CombinedClient, + utils: Arc, +) -> Result<(), Error> { + let client = Arc::new(client); + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_time() + .enable_io() + .build()?; + + for event in input.iter() { + if let EventData::Block(record) = &event.data { + let client = client.clone(); + let tip = utils.tip.load(std::sync::atomic::Ordering::SeqCst); + + let result = rt.block_on(async move { client.send_block(record, tip).await }); + + match result { + Ok(_) => { + // notify the pipeline where we are + utils.track_sink_progress(&event); + } + Err(err) => { + log::error!("unrecoverable error sending block to S3 and SQS: {:?}", err); + return Err(err); + } + } + } + } + + Ok(()) +} diff --git a/src/sinks/aws_s3_sqs/setup.rs b/src/sinks/aws_s3_sqs/setup.rs new file mode 100644 index 00000000..e3e27ae0 --- /dev/null +++ b/src/sinks/aws_s3_sqs/setup.rs @@ -0,0 +1,21 @@ +use super::config::Config; +use crate::sinks::aws_s3_sqs::combined_client::CombinedClient; +use crate::{ + pipelining::{BootstrapResult, SinkProvider, StageReceiver}, + utils::WithUtils, +}; + +use super::run::writer_loop; + +impl SinkProvider for WithUtils { + fn bootstrap(&self, input: StageReceiver) -> BootstrapResult { + let client = CombinedClient::new(&self.inner)?; + let utils = self.utils.clone(); + + let handle = std::thread::spawn(move || { + writer_loop(input, client, utils).expect("writer loop failed") + }); + + Ok(handle) + } +} diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index 5eec177a..84ba24fd 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -27,6 +27,9 @@ pub mod aws_lambda; #[cfg(feature = "aws")] pub mod aws_s3; +#[cfg(feature = "aws")] +pub mod aws_s3_sqs; + #[cfg(feature = "redissink")] pub mod redis; diff --git a/src/sources/n2c/run.rs b/src/sources/n2c/run.rs index efa696ee..3bce5c6f 100644 --- a/src/sources/n2c/run.rs +++ b/src/sources/n2c/run.rs @@ -70,6 +70,8 @@ impl ChainObserver { let ready = self.chain_buffer.pop_with_depth(self.min_depth); log::debug!("found {} points with required min depth", ready.len()); + self.event_writer.utils.tip.store(tip.1, std::sync::atomic::Ordering::SeqCst); + // find confirmed block in memory and send down the pipeline for point in ready { let block = self diff --git a/src/sources/n2n/run.rs b/src/sources/n2n/run.rs index a3956d8e..7f357bb9 100644 --- a/src/sources/n2n/run.rs +++ b/src/sources/n2n/run.rs @@ -72,6 +72,8 @@ impl ChainObserver { let ready = self.chain_buffer.pop_with_depth(self.min_depth); log::debug!("found {} points with required min depth", ready.len()); + self.event_writer.utils.tip.store(tip.1, std::sync::atomic::Ordering::SeqCst); + // request download of blocks for confirmed points for point in ready { log::debug!("requesting block fetch for point {:?}", point); diff --git a/src/utils/mod.rs b/src/utils/mod.rs index fe841b7f..2c4584cf 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -6,6 +6,7 @@ //! pointer. use std::sync::Arc; +use std::sync::atomic::AtomicU64; use pallas::network::miniprotocols::{Point, MAINNET_MAGIC, TESTNET_MAGIC}; @@ -169,6 +170,7 @@ pub struct Utils { pub(crate) time: Option, pub(crate) cursor: Option, pub(crate) metrics: Option, + pub(crate) tip: AtomicU64, } // TODO: refactor this using the builder pattern @@ -179,6 +181,7 @@ impl Utils { well_known, cursor: None, metrics: None, + tip: 0.into(), } }