2222
2323#include < ctime>
2424
25+ #include " check_response.h"
2526#include " command_parser.h"
2627#include " commander.h"
2728#include " commands/scan_base.h"
@@ -1257,8 +1258,17 @@ class CommandDump : public Commander {
12571258 return Status::OK ();
12581259 }
12591260
1261+ std::string dump_result;
1262+ auto s = DumpKey (ctx, srv, conn, key, dump_result);
1263+ if (!s.IsOK ()) return s;
1264+ *output = redis::BulkString (dump_result);
1265+ return Status::OK ();
1266+ }
1267+ static Status DumpKey (engine::Context &ctx, Server *srv, Connection *conn, std::string &key,
1268+ std::string &dump_result) {
12601269 RedisType type = kRedisNone ;
1261- db_status = redis.Type (ctx, key, &type);
1270+ redis::Database redis (srv->storage , conn->GetNamespace ());
1271+ rocksdb::Status db_status = redis.Type (ctx, key, &type);
12621272 if (!db_status.ok ()) return {Status::RedisExecErr, db_status.ToString ()};
12631273
12641274 std::string result;
@@ -1267,7 +1277,7 @@ class CommandDump : public Commander {
12671277 auto s = rdb.Dump (key, type);
12681278 if (!s.IsOK ()) return s;
12691279 CHECK (dynamic_cast <RdbStringStream *>(rdb.GetStream ().get ()) != nullptr );
1270- *output = redis::BulkString ( static_cast <RdbStringStream *>(rdb.GetStream ().get ())->GetInput () );
1280+ dump_result = static_cast <RdbStringStream *>(rdb.GetStream ().get ())->GetInput ();
12711281 return Status::OK ();
12721282 }
12731283};
@@ -1370,6 +1380,86 @@ class CommandPollUpdates : public Commander {
13701380 Format format_ = Format::Raw;
13711381};
13721382
1383+ class CommandMigrate : public Commander {
1384+ public:
1385+ Status Parse (const std::vector<std::string> &args) override {
1386+ if (args.size () != 6 ) {
1387+ return {Status::RedisExecErr, errWrongNumOfArguments};
1388+ }
1389+ CommandParser parser (args, 1 );
1390+ host_ = GET_OR_RET (parser.TakeStr ());
1391+ port_ = GET_OR_RET (parser.TakeInt <uint32_t >());
1392+ key_ = GET_OR_RET (parser.TakeStr ());
1393+ db_ = GET_OR_RET (parser.TakeInt <int >());
1394+ timeout_ = GET_OR_RET (parser.TakeInt <uint32_t >());
1395+ return Status::OK ();
1396+ }
1397+
1398+ Status Execute (engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
1399+ redis::Database redis (srv->storage , conn->GetNamespace ());
1400+ int count = 0 ;
1401+ auto db_status = redis.Exists (ctx, {key_}, &count);
1402+ if (!db_status.ok ()) {
1403+ return {Status::RedisExecErr, db_status.ToString ()};
1404+ }
1405+ if (count == 0 ) {
1406+ *output = conn->NilString ();
1407+ return Status::OK ();
1408+ }
1409+
1410+ std::string dump_result;
1411+ auto status = CommandDump::DumpKey (ctx, srv, conn, key_, dump_result);
1412+ if (!status.OK ()) return status;
1413+ auto result = util::SockConnect (host_, port_, 0 , timeout_);
1414+ if (!result.IsOK ()) {
1415+ return {Status::RedisExecErr, " failed to connect to the destination node" };
1416+ }
1417+ UniqueFD dst_fd;
1418+ dst_fd.Reset (*result);
1419+
1420+ status = restoreOnDstNode (redis, ctx, *dst_fd, key_, dump_result);
1421+ if (!status.IsOK ()) {
1422+ return {Status::RedisExecErr, status.Msg ()};
1423+ }
1424+ auto redis_status = redis.Del (ctx, key_);
1425+ if (!redis_status.ok ()) {
1426+ return {Status::RedisExecErr, redis_status.ToString ()};
1427+ }
1428+
1429+ *output = redis::SimpleString (" OK" );
1430+ return Status::OK ();
1431+ }
1432+
1433+ private:
1434+ static Status restoreOnDstNode (redis::Database &redis, engine::Context &ctx, int dst_fd, std::string &key,
1435+ std::string &dump_result) {
1436+ std::string restore_cmd;
1437+ uint64_t timestamp = 0 ;
1438+ auto s = redis.GetExpireTime (ctx, key, ×tamp);
1439+ if (!s.ok () || s.IsExpired ()) {
1440+ return {Status::RedisExecErr, " failed to get expire time" };
1441+ }
1442+ restore_cmd += redis::ArrayOfBulkStrings ({" restore" , key, std::to_string (timestamp), dump_result});
1443+ auto sock_status = util::SockSend (dst_fd, restore_cmd);
1444+ if (!sock_status.IsOK ()) {
1445+ return {Status::RedisExecErr, fmt::format (" failed to send restore command to destination node" )};
1446+ }
1447+
1448+ sock_status = util::CheckSingleResponse (dst_fd);
1449+ if (!sock_status.IsOK ()) {
1450+ return {Status::RedisExecErr, sock_status.Msg ()};
1451+ }
1452+
1453+ return Status::OK ();
1454+ }
1455+
1456+ std::string host_;
1457+ uint32_t port_;
1458+ std::string key_;
1459+ int db_;
1460+ int timeout_;
1461+ };
1462+
13731463REDIS_REGISTER_COMMANDS (Server, MakeCmdAttr<CommandAuth>(" auth" , 2 , " read-only ok-loading auth" , NO_KEY),
13741464 MakeCmdAttr<CommandPing>(" ping" , -1 , " read-only" , NO_KEY),
13751465 MakeCmdAttr<CommandSelect>(" select" , 2 , " read-only" , NO_KEY),
@@ -1410,5 +1500,6 @@ REDIS_REGISTER_COMMANDS(Server, MakeCmdAttr<CommandAuth>("auth", 2, "read-only o
14101500 MakeCmdAttr<CommandReset>(" reset" , 1 , " ok-loading bypass-multi no-script" , NO_KEY),
14111501 MakeCmdAttr<CommandApplyBatch>(" applybatch" , -2 , " write no-multi" , NO_KEY),
14121502 MakeCmdAttr<CommandDump>(" dump" , 2 , " read-only" , 1 , 1 , 1 ),
1413- MakeCmdAttr<CommandPollUpdates>(" pollupdates" , -2 , " read-only admin" , NO_KEY), )
1503+ MakeCmdAttr<CommandPollUpdates>(" pollupdates" , -2 , " read-only admin" , NO_KEY),
1504+ MakeCmdAttr<CommandMigrate>(" migrate" , -6 , " write" , 1 , 1 , 1 ))
14141505} // namespace redis
0 commit comments