@@ -37,10 +37,12 @@ use rdkafka::message::OwnedMessage;
3737use rdkafka:: producer:: future_producer:: OwnedDeliveryResult ;
3838use rdkafka:: producer:: { DeliveryFuture , FutureRecord } ;
3939use rdkafka:: util:: Timeout ;
40- use serde:: Deserialize ;
4140use serde:: Serialize ;
42- use serde_json:: Value ;
41+ use serde:: { Deserialize , Deserializer } ;
42+ use serde_json:: Deserializer as JsonDeserializer ;
4343
44+ use crate :: framework:: data_model:: model:: DataModel ;
45+ use crate :: utilities:: validate_passthrough:: { DataModelArrayVisitor , DataModelVisitor } ;
4446use lazy_static:: lazy_static;
4547use log:: Level :: { Debug , Trace } ;
4648use std:: collections:: { HashMap , HashSet } ;
@@ -412,12 +414,10 @@ fn route_not_found_response() -> hyper::http::Result<Response<Full<Bytes>>> {
412414async fn send_payload_to_topic (
413415 configured_producer : & ConfiguredProducer ,
414416 topic_name : & str ,
415- payload : Value ,
417+ payload : Vec < u8 > ,
416418 metrics : Arc < Metrics > ,
417419 route : PathBuf ,
418420) -> Result < ( i32 , i64 ) , ( KafkaError , OwnedMessage ) > {
419- let payload = serde_json:: to_vec ( & payload) . unwrap ( ) ;
420-
421421 debug ! ( "Sending payload {:?} to topic: {}" , payload, topic_name) ;
422422
423423 metrics
@@ -447,6 +447,7 @@ async fn to_reader(req: Request<Incoming>) -> bytes::buf::Reader<impl Buf + Size
447447async fn handle_json_req (
448448 configured_producer : & ConfiguredProducer ,
449449 topic_name : & str ,
450+ data_model : & DataModel ,
450451 req : Request < Incoming > ,
451452 metrics : Arc < Metrics > ,
452453 route : PathBuf ,
@@ -456,7 +457,9 @@ async fn handle_json_req(
456457 let url = req. uri ( ) . to_string ( ) ;
457458 let number_of_bytes = req. body ( ) . size_hint ( ) . exact ( ) . unwrap ( ) ;
458459 let body = to_reader ( req) . await ;
459- let parsed: Result < Value , serde_json:: Error > = serde_json:: from_reader ( body) ;
460+
461+ let parsed = JsonDeserializer :: from_reader ( body)
462+ . deserialize_any ( & mut DataModelVisitor :: new ( & data_model. columns ) ) ;
460463
461464 metrics
462465 . send_metric ( MetricsMessage :: PutIngestedBytesCount {
@@ -508,6 +511,7 @@ async fn wait_for_batch_complete(
508511async fn handle_json_array_body (
509512 configured_producer : & ConfiguredProducer ,
510513 topic_name : & str ,
514+ data_model : & DataModel ,
511515 req : Request < Incoming > ,
512516 metrics : Arc < Metrics > ,
513517 route : PathBuf ,
@@ -522,7 +526,9 @@ async fn handle_json_array_body(
522526 "starting to parse json array with length {} for {}" ,
523527 number_of_bytes, topic_name
524528 ) ;
525- let parsed: Result < Vec < Value > , serde_json:: Error > = serde_json:: from_reader ( body) ;
529+ let parsed = JsonDeserializer :: from_reader ( body) . deserialize_seq ( & mut DataModelArrayVisitor {
530+ inner : DataModelVisitor :: new ( & data_model. columns ) ,
531+ } ) ;
526532
527533 debug ! ( "parsed json array for {}" , topic_name) ;
528534 metrics
@@ -541,8 +547,6 @@ async fn handle_json_array_body(
541547 let mut temp_res: Vec < Result < DeliveryFuture , KafkaError > > = Vec :: new ( ) ;
542548
543549 for ( count, payload) in parsed. ok ( ) . unwrap ( ) . into_iter ( ) . enumerate ( ) {
544- let payload = serde_json:: to_vec ( & payload) . unwrap ( ) ;
545-
546550 debug ! ( "Sending payload {:?} to topic: {}" , payload, topic_name) ;
547551 let record = FutureRecord :: to ( topic_name)
548552 . key ( topic_name) // This should probably be generated by the client that pushes data to the API
@@ -644,6 +648,7 @@ async fn ingest_route(
644648 EndpointIngestionFormat :: Json => Ok ( handle_json_req (
645649 & configured_producer,
646650 & route_meta. topic_name ,
651+ & route_meta. data_model ,
647652 req,
648653 metrics,
649654 route,
@@ -652,6 +657,7 @@ async fn ingest_route(
652657 EndpointIngestionFormat :: JsonArray => Ok ( handle_json_array_body (
653658 & configured_producer,
654659 & route_meta. topic_name ,
660+ & route_meta. data_model ,
655661 req,
656662 metrics,
657663 route,
@@ -859,11 +865,15 @@ impl Webserver {
859865 ApiChange :: ApiEndpoint ( Change :: Added ( api_endpoint) ) => {
860866 log:: info!( "Adding route: {:?}" , api_endpoint. path) ;
861867 match api_endpoint. api_type {
862- APIType :: INGRESS { target_topic } => {
868+ APIType :: INGRESS {
869+ target_topic,
870+ data_model,
871+ } => {
863872 route_table. insert (
864873 api_endpoint. path . clone ( ) ,
865874 RouteMeta {
866875 format : api_endpoint. format . clone ( ) ,
876+ data_model : data_model. unwrap ( ) ,
867877 topic_name : target_topic,
868878 } ,
869879 ) ;
@@ -879,14 +889,18 @@ impl Webserver {
879889 }
880890 ApiChange :: ApiEndpoint ( Change :: Updated { before, after } ) => {
881891 match & after. api_type {
882- APIType :: INGRESS { target_topic } => {
892+ APIType :: INGRESS {
893+ target_topic,
894+ data_model,
895+ } => {
883896 log:: info!( "Replacing route: {:?} with {:?}" , before, after) ;
884897
885898 route_table. remove ( & before. path ) ;
886899 route_table. insert (
887900 after. path . clone ( ) ,
888901 RouteMeta {
889902 format : after. format . clone ( ) ,
903+ data_model : data_model. as_ref ( ) . unwrap ( ) . clone ( ) ,
890904 topic_name : target_topic. clone ( ) ,
891905 } ,
892906 ) ;
0 commit comments