2727#include " event_util.h"
2828#include " fmt/format.h"
2929#include " io_util.h"
30+ #include " redis_protocol.h"
3031#include " storage/batch_extractor.h"
3132#include " storage/iterator.h"
3233#include " storage/redis_metadata.h"
@@ -283,7 +284,7 @@ Status SlotMigrator::startMigration() {
283284 // Auth first
284285 std::string pass = srv_->GetConfig ()->requirepass ;
285286 if (!pass.empty ()) {
286- auto s = authOnDstNode (*dst_fd_, pass);
287+ auto s = util::AuthOnDstNode (*dst_fd_, pass);
287288 if (!s.IsOK ()) {
288289 return s.Prefixed (" failed to authenticate on destination node" );
289290 }
@@ -485,21 +486,6 @@ void SlotMigrator::clean() {
485486 SetStopMigrationFlag (false );
486487}
487488
488- Status SlotMigrator::authOnDstNode (int sock_fd, const std::string &password) {
489- std::string cmd = redis::ArrayOfBulkStrings ({" auth" , password});
490- auto s = util::SockSend (sock_fd, cmd);
491- if (!s.IsOK ()) {
492- return s.Prefixed (" failed to send AUTH command" );
493- }
494-
495- s = checkSingleResponse (sock_fd);
496- if (!s.IsOK ()) {
497- return s.Prefixed (" failed to check the response of AUTH command" );
498- }
499-
500- return Status::OK ();
501- }
502-
503489Status SlotMigrator::setImportStatusOnDstNode (int sock_fd, int status) {
504490 if (sock_fd <= 0 ) return {Status::NotOK, " invalid socket descriptor" };
505491
@@ -510,7 +496,7 @@ Status SlotMigrator::setImportStatusOnDstNode(int sock_fd, int status) {
510496 return s.Prefixed (" failed to send command to the destination node" );
511497 }
512498
513- s = checkSingleResponse (sock_fd);
499+ s = util::CheckSingleResponse (sock_fd);
514500 if (!s.IsOK ()) {
515501 return s.Prefixed (" failed to check the response from the destination node" );
516502 }
@@ -545,138 +531,6 @@ StatusOr<bool> SlotMigrator::supportedApplyBatchCommandOnDstNode(int sock_fd) {
545531 return false ;
546532}
547533
548- Status SlotMigrator::checkSingleResponse (int sock_fd) { return checkMultipleResponses (sock_fd, 1 ); }
549-
550- // Commands | Response | Instance
551- // ++++++++++++++++++++++++++++++++++++++++
552- // set Redis::Integer :1\r\n
553- // hset Redis::SimpleString +OK\r\n
554- // sadd Redis::Integer
555- // zadd Redis::Integer
556- // siadd Redis::Integer
557- // setbit Redis::Integer
558- // expire Redis::Integer
559- // lpush Redis::Integer
560- // rpush Redis::Integer
561- // ltrim Redis::SimpleString -Err\r\n
562- // linsert Redis::Integer
563- // lset Redis::SimpleString
564- // hdel Redis::Integer
565- // srem Redis::Integer
566- // zrem Redis::Integer
567- // lpop Redis::NilString $-1\r\n
568- // or Redis::BulkString $1\r\n1\r\n
569- // rpop Redis::NilString
570- // or Redis::BulkString
571- // lrem Redis::Integer
572- // sirem Redis::Integer
573- // del Redis::Integer
574- // xadd Redis::BulkString
575- // bitfield Redis::Array *1\r\n:0
576- Status SlotMigrator::checkMultipleResponses (int sock_fd, int total) {
577- if (sock_fd < 0 || total <= 0 ) {
578- return {Status::NotOK, fmt::format (" invalid arguments: sock_fd={}, count={}" , sock_fd, total)};
579- }
580-
581- // Set socket receive timeout first
582- struct timeval tv;
583- tv.tv_sec = 1 ;
584- tv.tv_usec = 0 ;
585- setsockopt (sock_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof (tv));
586-
587- // Start checking response
588- size_t bulk_or_array_len = 0 ;
589- int cnt = 0 ;
590- parser_state_ = ParserState::ArrayLen;
591- UniqueEvbuf evbuf;
592- while (true ) {
593- // Read response data from socket buffer to the event buffer
594- if (evbuffer_read (evbuf.get (), sock_fd, -1 ) <= 0 ) {
595- return {Status::NotOK, fmt::format (" failed to read response: {}" , strerror (errno))};
596- }
597-
598- // Parse response data in event buffer
599- bool run = true ;
600- while (run) {
601- switch (parser_state_) {
602- // Handle single string response
603- case ParserState::ArrayLen: {
604- UniqueEvbufReadln line (evbuf.get (), EVBUFFER_EOL_CRLF_STRICT);
605- if (!line) {
606- LOG (INFO) << " [migrate] Event buffer is empty, read socket again" ;
607- run = false ;
608- break ;
609- }
610-
611- if (line[0 ] == ' -' ) {
612- return {Status::NotOK, fmt::format (" got invalid response of length {}: {}" , line.length , line.get ())};
613- } else if (line[0 ] == ' $' || line[0 ] == ' *' ) {
614- auto parse_result = ParseInt<uint64_t >(std::string (line.get () + 1 , line.length - 1 ), 10 );
615- if (!parse_result) {
616- return {Status::NotOK, " protocol error: expected integer value" };
617- }
618-
619- bulk_or_array_len = *parse_result;
620- if (bulk_or_array_len <= 0 ) {
621- parser_state_ = ParserState::OneRspEnd;
622- } else if (line[0 ] == ' $' ) {
623- parser_state_ = ParserState::BulkData;
624- } else {
625- parser_state_ = ParserState::ArrayData;
626- }
627- } else if (line[0 ] == ' +' || line[0 ] == ' :' ) {
628- parser_state_ = ParserState::OneRspEnd;
629- } else {
630- return {Status::NotOK, fmt::format (" got unexpected response of length {}: {}" , line.length , line.get ())};
631- }
632-
633- break ;
634- }
635- // Handle bulk string response
636- case ParserState::BulkData: {
637- if (evbuffer_get_length (evbuf.get ()) < bulk_or_array_len + 2 ) {
638- LOG (INFO) << " [migrate] Bulk data in event buffer is not complete, read socket again" ;
639- run = false ;
640- break ;
641- }
642- // TODO(chrisZMF): Check tail '\r\n'
643- evbuffer_drain (evbuf.get (), bulk_or_array_len + 2 );
644- bulk_or_array_len = 0 ;
645- parser_state_ = ParserState::OneRspEnd;
646- break ;
647- }
648- case ParserState::ArrayData: {
649- while (run && bulk_or_array_len > 0 ) {
650- evbuffer_ptr ptr = evbuffer_search_eol (evbuf.get (), nullptr , nullptr , EVBUFFER_EOL_CRLF_STRICT);
651- if (ptr.pos < 0 ) {
652- LOG (INFO) << " [migrate] Array data in event buffer is not complete, read socket again" ;
653- run = false ;
654- break ;
655- }
656- evbuffer_drain (evbuf.get (), ptr.pos + 2 );
657- --bulk_or_array_len;
658- }
659- if (run) {
660- parser_state_ = ParserState::OneRspEnd;
661- }
662- break ;
663- }
664- case ParserState::OneRspEnd: {
665- cnt++;
666- if (cnt >= total) {
667- return Status::OK ();
668- }
669-
670- parser_state_ = ParserState::ArrayLen;
671- break ;
672- }
673- default :
674- break ;
675- }
676- }
677- }
678- }
679-
680534StatusOr<KeyMigrationResult> SlotMigrator::migrateOneKey (const rocksdb::Slice &key,
681535 const rocksdb::Slice &encoded_metadata,
682536 std::string *restore_cmds) {
@@ -1029,7 +883,7 @@ Status SlotMigrator::sendCmdsPipelineIfNeed(std::string *commands, bool need) {
1029883
1030884 last_send_time_ = util::GetTimeStampUS ();
1031885
1032- s = checkMultipleResponses (*dst_fd_, current_pipeline_size_);
886+ s = util::CheckMultipleResponses (*dst_fd_, current_pipeline_size_);
1033887 if (!s.IsOK ()) {
1034888 return s.Prefixed (" wrong response from the destination node" );
1035889 }
0 commit comments