-
Notifications
You must be signed in to change notification settings - Fork 589
Rewrite stream manager back presssure algorithm #1785
base: master
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,6 +33,7 @@ cc_library( | |
| "sptest.h", | ||
| "sptypes.h", | ||
| "strutils.h", | ||
| "spmultimap.h" | ||
| ], | ||
| hdrs = [ | ||
| "basics.h", | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| /* | ||
| * Copyright 2017 Twitter, Inc. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| #ifndef __SP_MULTIMAP_H | ||
| #define __SP_MULTIMAP_H | ||
|
|
||
| #include <functional> | ||
| #include <map> | ||
| #include <memory> | ||
| #include <utility> | ||
|
|
||
| template <typename _Key, typename _Tp, | ||
| typename _Compare = std::less<_Key>, | ||
| typename _Alloc = std::allocator<std::pair<const _Key, _Tp>>> | ||
| class sp_multimap : public std::multimap<_Key, _Tp, _Compare, _Alloc> { | ||
| public: | ||
| typedef typename std::multimap<_Key, _Tp, _Compare, _Alloc>::value_type value_type; | ||
| typedef typename std::multimap<_Key, _Tp, _Compare, _Alloc>::size_type size_type; | ||
|
|
||
| using std::multimap<_Key, _Tp, _Compare, _Alloc>::erase; | ||
|
|
||
| size_type erase(const value_type& v) { | ||
| auto iterpair = this->equal_range(v.first); | ||
| size_type num = 0; | ||
|
|
||
| for (auto it = iterpair.first; it != iterpair.second;) { | ||
| if (it->second == v.second) { | ||
| it = erase(it); | ||
| num++; | ||
| } else { | ||
| it++; | ||
| } | ||
| } | ||
|
|
||
| return num; | ||
| } | ||
| }; | ||
|
|
||
| #endif |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -77,6 +77,7 @@ void StMgrClientMgr::NewPhysicalPlan(const proto::system::PhysicalPlan* _pplan) | |
| // This stmgr has actually moved to a different host/port | ||
| clients_[s.id()]->Quit(); // this will delete itself. | ||
| clients_[s.id()] = CreateClient(s.id(), s.host_name(), s.data_port()); | ||
| instance_stats_[s.id()].clear(); | ||
| } else { | ||
| // This stmgr has remained the same. Don't do anything | ||
| } | ||
|
|
@@ -101,13 +102,10 @@ void StMgrClientMgr::NewPhysicalPlan(const proto::system::PhysicalPlan* _pplan) | |
| LOG(INFO) << "Stmgr " << *iter << " no longer required"; | ||
| clients_[*iter]->Quit(); // This will delete itself. | ||
| clients_.erase(*iter); | ||
| instance_stats_.erase(*iter); | ||
| } | ||
| } | ||
|
|
||
| bool StMgrClientMgr::DidAnnounceBackPressure() { | ||
| return stream_manager_->DidAnnounceBackPressure(); | ||
| } | ||
|
|
||
| StMgrClient* StMgrClientMgr::CreateClient(const sp_string& _other_stmgr_id, | ||
| const sp_string& _hostname, sp_int32 _port) { | ||
| stmgr_clientmgr_metrics_->scope(METRIC_STMGR_NEW_CONNECTIONS)->incr(); | ||
|
|
@@ -125,11 +123,26 @@ StMgrClient* StMgrClientMgr::CreateClient(const sp_string& _other_stmgr_id, | |
| return client; | ||
| } | ||
|
|
||
| sp_int32 StMgrClientMgr::FindBusiestTaskOnStmgr(const sp_string& _stmgr_id) { | ||
| sp_int32 task_id; | ||
| sp_int64 max = 0; | ||
| for (auto iter = instance_stats_[_stmgr_id].begin(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if instance_stats_[_stmgr_id] does not exist? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If instance_stats_[_stmgr_id] does not exist that means we don't have any traffic to that stmgr, therefore impossible to trigger a back pressure. |
||
| iter!= instance_stats_[_stmgr_id].end(); | ||
| iter++) { | ||
| if (iter->second > max) { | ||
| task_id = iter->first; | ||
| max = iter->second; | ||
| } | ||
| } | ||
| return task_id; | ||
| } | ||
|
|
||
| void StMgrClientMgr::SendTupleStreamMessage(sp_int32 _task_id, const sp_string& _stmgr_id, | ||
| const proto::system::HeronTupleSet2& _msg) { | ||
| auto iter = clients_.find(_stmgr_id); | ||
| CHECK(iter != clients_.end()); | ||
|
|
||
| instance_stats_[_stmgr_id][_task_id] += _msg.GetCachedSize(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. where are you clearing this? Shouldn;t this be cleared on a regular basis? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What makes you think a += operator is clearing it?? I am totally confused... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oops, replied too fast. Good point, we need to clear it after a backpressure is triggered. |
||
| // Acquire the message | ||
| proto::stmgr::TupleStreamMessage2* out = nullptr; | ||
| out = clients_[_stmgr_id]->acquire(out); | ||
|
|
@@ -151,15 +164,15 @@ void StMgrClientMgr::StopBackPressureOnServer(const sp_string& _other_stmgr_id) | |
| stream_manager_->StopBackPressureOnServer(_other_stmgr_id); | ||
| } | ||
|
|
||
| void StMgrClientMgr::SendStartBackPressureToOtherStMgrs() { | ||
| void StMgrClientMgr::SendStartBackPressureToOtherStMgrs(const sp_int32 _task_id) { | ||
| for (auto iter = clients_.begin(); iter != clients_.end(); ++iter) { | ||
| iter->second->SendStartBackPressureMessage(); | ||
| iter->second->SendStartBackPressureMessage(_task_id); | ||
| } | ||
| } | ||
|
|
||
| void StMgrClientMgr::SendStopBackPressureToOtherStMgrs() { | ||
| void StMgrClientMgr::SendStopBackPressureToOtherStMgrs(const sp_int32 _task_id) { | ||
| for (auto iter = clients_.begin(); iter != clients_.end(); ++iter) { | ||
| iter->second->SendStopBackPressureMessage(); | ||
| iter->second->SendStopBackPressureMessage(_task_id); | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -53,9 +53,9 @@ class StMgrClientMgr { | |
| void StopBackPressureOnServer(const sp_string& _other_stmgr_id); | ||
| // Used by the server to tell the client to send the back pressure related | ||
| // messages | ||
| void SendStartBackPressureToOtherStMgrs(); | ||
| void SendStopBackPressureToOtherStMgrs(); | ||
| bool DidAnnounceBackPressure(); | ||
| void SendStartBackPressureToOtherStMgrs(const sp_int32 _task_id); | ||
| void SendStopBackPressureToOtherStMgrs(const sp_int32 _task_id); | ||
| sp_int32 FindBusiestTaskOnStmgr(const sp_string& _stmgr_id); | ||
|
|
||
| private: | ||
| StMgrClient* CreateClient(const sp_string& _other_stmgr_id, const sp_string& _host_name, | ||
|
|
@@ -76,6 +76,9 @@ class StMgrClientMgr { | |
|
|
||
| sp_int64 high_watermark_; | ||
| sp_int64 low_watermark_; | ||
|
|
||
| // Counters for remote instance traffic, this is used for back pressure | ||
|
||
| std::unordered_map<sp_string, std::unordered_map<sp_int32, sp_int64>> instance_stats_; | ||
| }; | ||
|
|
||
| } // namespace stmgr | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it failed the unit test test_back_pressure_stmgr_reconnect.
when the stmgr X in backpressure disconnects and reconnects, stmgr X is supposed to receive backpressure notice according to test_back_pressure_stmgr_reconnect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
follow up question:
how do you maintain the state eg. backpressure_starters_ after stmgr restarts/reconnects?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@huijunw That should be handled in HandleConnectionClose().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@huijunw I already noticed the test case failure, will fix it.