Skip to content

Commit a8e3952

Browse files
authored
sync topic to topic in core V2 (#1684)
1 parent f18f839 commit a8e3952

File tree

7 files changed

+167
-5
lines changed

7 files changed

+167
-5
lines changed

apps/framework-cli/src/cli/display.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,15 @@ pub fn show_changes(infra_plan: &InfraPlan) {
323323
.processes_changes
324324
.iter()
325325
.for_each(|change| match change {
326+
ProcessChange::TopicToTopicSyncProcess(Change::Added(infra)) => {
327+
infra_added(&infra.expanded_display());
328+
}
329+
ProcessChange::TopicToTopicSyncProcess(Change::Removed(infra)) => {
330+
infra_removed(&infra.short_display());
331+
}
332+
ProcessChange::TopicToTopicSyncProcess(Change::Updated { before, after: _ }) => {
333+
infra_updated(&before.expanded_display());
334+
}
326335
ProcessChange::TopicToTableSyncProcess(Change::Added(infra)) => {
327336
infra_added(&infra.expanded_display());
328337
}

apps/framework-cli/src/framework/core/infrastructure.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ pub mod function_process;
66
pub mod olap_process;
77
pub mod table;
88
pub mod topic;
9-
pub mod topic_to_table_sync_process;
9+
pub mod topic_sync_process;
1010
pub mod view;
1111

1212
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
use serde::{Deserialize, Serialize};
22

3-
use crate::framework::core::infrastructure_map::PrimitiveSignature;
4-
53
use super::{
64
table::{Column, Table},
75
topic::Topic,
86
};
7+
use crate::framework::core::infrastructure_map::{PrimitiveSignature, PrimitiveTypes};
8+
use crate::framework::streaming::model::StreamingFunction;
99

1010
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1111
pub struct TopicToTableSyncProcess {
@@ -18,6 +18,14 @@ pub struct TopicToTableSyncProcess {
1818
pub source_primitive: PrimitiveSignature,
1919
}
2020

21+
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
22+
pub struct TopicToTopicSyncProcess {
23+
pub source_topic_id: String,
24+
pub target_topic_id: String,
25+
26+
pub source_primitive: PrimitiveSignature,
27+
}
28+
2129
impl TopicToTableSyncProcess {
2230
pub fn new(topic: &Topic, table: &Table) -> Self {
2331
if topic.version != table.version {
@@ -57,3 +65,36 @@ impl TopicToTableSyncProcess {
5765
)
5866
}
5967
}
68+
69+
impl TopicToTopicSyncProcess {
70+
pub fn from_migration_function(function: &StreamingFunction) -> Self {
71+
let source_topic = Topic::from_data_model(&function.source_data_model);
72+
let (source_for_func, _) = Topic::from_migration_function(function);
73+
TopicToTopicSyncProcess {
74+
source_topic_id: source_topic.id(),
75+
target_topic_id: source_for_func.id(),
76+
source_primitive: PrimitiveSignature {
77+
name: function.id(),
78+
primitive_type: PrimitiveTypes::Function,
79+
},
80+
}
81+
}
82+
83+
pub fn id(&self) -> String {
84+
self.target_topic_id.to_string()
85+
}
86+
87+
pub fn expanded_display(&self) -> String {
88+
format!(
89+
"Topic to Topic Sync Process: {} -> {}",
90+
self.source_topic_id, self.target_topic_id
91+
)
92+
}
93+
94+
pub fn short_display(&self) -> String {
95+
format!(
96+
"Topic to Topic Sync Process: {} -> {}",
97+
self.source_topic_id, self.target_topic_id
98+
)
99+
}
100+
}

apps/framework-cli/src/framework/core/infrastructure_map.rs

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use super::infrastructure::function_process::FunctionProcess;
44
use super::infrastructure::olap_process::OlapProcess;
55
use super::infrastructure::table::Table;
66
use super::infrastructure::topic::Topic;
7-
use super::infrastructure::topic_to_table_sync_process::TopicToTableSyncProcess;
7+
use super::infrastructure::topic_sync_process::{TopicToTableSyncProcess, TopicToTopicSyncProcess};
88
use super::infrastructure::view::View;
99
use super::primitive_map::PrimitiveMap;
1010
use crate::framework::controller::{InitialDataLoad, InitialDataLoadStatus};
@@ -60,6 +60,7 @@ pub enum ApiChange {
6060
#[derive(Debug, Clone)]
6161
pub enum ProcessChange {
6262
TopicToTableSyncProcess(Change<TopicToTableSyncProcess>),
63+
TopicToTopicSyncProcess(Change<TopicToTopicSyncProcess>),
6364
FunctionProcess(Change<FunctionProcess>),
6465
OlapProcess(Change<OlapProcess>),
6566
ConsumptionApiWebServer(Change<ConsumptionApiWebServer>),
@@ -101,6 +102,8 @@ pub struct InfrastructureMap {
101102
pub views: HashMap<String, View>,
102103

103104
pub topic_to_table_sync_processes: HashMap<String, TopicToTableSyncProcess>,
105+
#[serde(default = "HashMap::new")]
106+
pub topic_to_topic_sync_processes: HashMap<String, TopicToTopicSyncProcess>,
104107
pub function_processes: HashMap<String, FunctionProcess>,
105108

106109
// TODO change to a hashmap of processes when we have several
@@ -120,6 +123,7 @@ impl InfrastructureMap {
120123
let mut topics = HashMap::new();
121124
let mut api_endpoints = HashMap::new();
122125
let mut topic_to_table_sync_processes = HashMap::new();
126+
let mut topic_to_topic_sync_processes = HashMap::new();
123127
let mut function_processes = HashMap::new();
124128
let mut initial_data_loads = HashMap::new();
125129

@@ -208,6 +212,9 @@ impl InfrastructureMap {
208212
);
209213
topic_to_table_sync_processes.insert(sync_process.id(), sync_process);
210214

215+
let topic_sync = TopicToTopicSyncProcess::from_migration_function(function);
216+
topic_to_topic_sync_processes.insert(topic_sync.id(), topic_sync);
217+
211218
initial_data_loads.insert(
212219
function_process.id(),
213220
InitialDataLoad {
@@ -240,6 +247,7 @@ impl InfrastructureMap {
240247
topics,
241248
api_endpoints,
242249
topic_to_table_sync_processes,
250+
topic_to_topic_sync_processes,
243251
tables,
244252
views,
245253
function_processes,
@@ -421,6 +429,47 @@ impl InfrastructureMap {
421429
}
422430
}
423431

432+
// =================================================================
433+
// Topic to Topic Sync Processes
434+
// =================================================================
435+
436+
for (id, topic_to_topic_sync_process) in &self.topic_to_topic_sync_processes {
437+
if let Some(target_topic_to_topic_sync_process) =
438+
target_map.topic_to_topic_sync_processes.get(id)
439+
{
440+
if topic_to_topic_sync_process != target_topic_to_topic_sync_process {
441+
changes
442+
.processes_changes
443+
.push(ProcessChange::TopicToTopicSyncProcess(Change::<
444+
TopicToTopicSyncProcess,
445+
>::Updated {
446+
before: topic_to_topic_sync_process.clone(),
447+
after: target_topic_to_topic_sync_process.clone(),
448+
}));
449+
}
450+
} else {
451+
changes
452+
.processes_changes
453+
.push(ProcessChange::TopicToTopicSyncProcess(Change::<
454+
TopicToTopicSyncProcess,
455+
>::Removed(
456+
topic_to_topic_sync_process.clone(),
457+
)));
458+
}
459+
}
460+
461+
for (id, topic_to_topic_sync_process) in &target_map.topic_to_topic_sync_processes {
462+
if !self.topic_to_topic_sync_processes.contains_key(id) {
463+
changes
464+
.processes_changes
465+
.push(ProcessChange::TopicToTopicSyncProcess(Change::<
466+
TopicToTopicSyncProcess,
467+
>::Added(
468+
topic_to_topic_sync_process.clone(),
469+
)));
470+
}
471+
}
472+
424473
// =================================================================
425474
// Function Processes
426475
// =================================================================
@@ -574,6 +623,17 @@ impl InfrastructureMap {
574623
})
575624
.collect();
576625

626+
let mut topic_to_topic_process_changes: Vec<ProcessChange> = self
627+
.topic_to_topic_sync_processes
628+
.values()
629+
.map(|topic_to_table_sync_process| {
630+
ProcessChange::TopicToTopicSyncProcess(Change::<TopicToTopicSyncProcess>::Added(
631+
topic_to_table_sync_process.clone(),
632+
))
633+
})
634+
.collect();
635+
topic_to_table_process_changes.append(&mut topic_to_topic_process_changes);
636+
577637
let mut function_process_changes: Vec<ProcessChange> = self
578638
.function_processes
579639
.values()

apps/framework-cli/src/infrastructure/olap/clickhouse_alt_client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ pub async fn retrieve_current_state(
305305
#[derive(Debug, thiserror::Error)]
306306
#[non_exhaustive]
307307
pub enum StateStorageError {
308-
#[error("Failed to serialize the state")]
308+
#[error("Failed to (de)serialize the state")]
309309
SerdeError(#[from] serde_json::Error),
310310
#[error("Clickhouse error")]
311311
ClickhouseError(#[from] clickhouse_rs::errors::Error),

apps/framework-cli/src/infrastructure/processes.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,28 @@ pub async fn execute_changes(
7474
metrics.clone(),
7575
);
7676
}
77+
ProcessChange::TopicToTopicSyncProcess(Change::Added(sync)) => {
78+
log::info!("Starting sync process: {:?}", sync.id());
79+
syncing_registry.start_topic_to_topic(
80+
sync.source_topic_id.clone(),
81+
sync.target_topic_id.clone(),
82+
metrics.clone(),
83+
);
84+
}
85+
ProcessChange::TopicToTopicSyncProcess(Change::Removed(sync)) => {
86+
log::info!("Stopping sync process: {:?}", sync.id());
87+
syncing_registry.stop_topic_to_topic(&sync.target_topic_id)
88+
}
89+
// TopicToTopicSyncProcess Updated seems impossible
90+
ProcessChange::TopicToTopicSyncProcess(Change::Updated { before, after }) => {
91+
log::info!("Replacing Sync process: {:?} by {:?}", before, after);
92+
syncing_registry.stop_topic_to_topic(&before.target_topic_id);
93+
syncing_registry.start_topic_to_topic(
94+
after.source_topic_id.clone(),
95+
after.target_topic_id.clone(),
96+
metrics.clone(),
97+
);
98+
}
7799
ProcessChange::FunctionProcess(Change::Added(function_process)) => {
78100
log::info!("Starting Function process: {:?}", function_process.id());
79101
process_registry.functions.start(function_process)?;

apps/framework-cli/src/infrastructure/processes/kafka_clickhouse_sync.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,36 @@ impl SyncingProcessesRegistry {
204204
process.abort();
205205
}
206206
}
207+
208+
pub fn start_topic_to_topic(
209+
&mut self,
210+
source_topic_name: String,
211+
target_topic_name: String,
212+
metrics: Arc<Metrics>,
213+
) {
214+
info!(
215+
"<DCM> Starting syncing process from topic: {} to topic: {}",
216+
source_topic_name, target_topic_name
217+
);
218+
let key = target_topic_name.clone();
219+
220+
if let Some(process) = self.to_topic_registry.remove(&key) {
221+
process.abort();
222+
}
223+
224+
self.insert_topic_sync(spawn_kafka_to_kafka_process(
225+
self.kafka_config.clone(),
226+
source_topic_name,
227+
target_topic_name,
228+
metrics.clone(),
229+
));
230+
}
231+
232+
pub fn stop_topic_to_topic(&mut self, target_topic_name: &str) {
233+
if let Some(process) = self.to_table_registry.remove(target_topic_name) {
234+
process.abort();
235+
}
236+
}
207237
}
208238

209239
type FnSyncProcess =

0 commit comments

Comments
 (0)