From 17466b9663ddefbd2d9fad33f72c3feee084f2b1 Mon Sep 17 00:00:00 2001 From: Cong Wang Date: Fri, 18 Nov 2016 09:53:13 -0800 Subject: [PATCH 01/11] Add a limit for mempool --- heron/common/src/cpp/network/mempool.h | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/heron/common/src/cpp/network/mempool.h b/heron/common/src/cpp/network/mempool.h index a8467dfcbe1..569cf7282bb 100644 --- a/heron/common/src/cpp/network/mempool.h +++ b/heron/common/src/cpp/network/mempool.h @@ -16,6 +16,7 @@ #ifndef MEM_POOL_H #define MEM_POOL_H +#include #include #include #include @@ -68,7 +69,7 @@ class MemPool { template M* acquire(M* m) { std::type_index type = typeid(M); - std::vector& pool = map_[type]; + auto& pool = map_[type]; if (pool.empty()) { return new M(); @@ -81,11 +82,18 @@ class MemPool { template void release(M* ptr) { std::type_index type = typeid(M); - map_[type].push_back(static_cast(ptr)); + auto& pool = map_[type]; + // TODO(cwang): expose this limit via config? + if (pool.size() > 2048) { + auto first = pool.front(); + pool.pop_front(); + delete first; + } + pool.push_back(static_cast(ptr)); } private: - std::unordered_map> map_; + std::unordered_map> map_; }; #endif From ecffa7b34221ff9ee043e863f489786637420e4f Mon Sep 17 00:00:00 2001 From: Cong Wang Date: Mon, 21 Nov 2016 14:20:19 -0800 Subject: [PATCH 02/11] Expose mempool limit via heron config --- .../src/cpp/config/heron-internals-config-reader.cpp | 4 ++++ .../src/cpp/config/heron-internals-config-reader.h | 3 +++ .../src/cpp/config/heron-internals-config-vars.cpp | 2 ++ .../src/cpp/config/heron-internals-config-vars.h | 3 +++ heron/common/src/cpp/network/client.h | 7 +++++-- heron/common/src/cpp/network/mempool.h | 10 +++++++--- heron/common/src/cpp/network/server.h | 6 +++++- heron/config/src/yaml/conf/aurora/heron_internals.yaml | 3 +++ .../config/src/yaml/conf/examples/heron_internals.yaml | 3 +++ heron/config/src/yaml/conf/local/heron_internals.yaml | 3 +++ .../config/src/yaml/conf/localzk/heron_internals.yaml | 3 +++ .../config/src/yaml/conf/marathon/heron_internals.yaml | 3 +++ heron/config/src/yaml/conf/mesos/heron_internals.yaml | 3 +++ heron/config/src/yaml/conf/slurm/heron_internals.yaml | 3 +++ .../src/yaml/conf/test/test_heron_internals.yaml | 3 +++ heron/config/src/yaml/conf/yarn/heron_internals.yaml | 3 +++ heron/stmgr/src/cpp/manager/stmgr-client.cpp | 4 ++++ heron/stmgr/src/cpp/manager/stmgr-server.cpp | 5 +++++ 18 files changed, 65 insertions(+), 6 deletions(-) diff --git a/heron/common/src/cpp/config/heron-internals-config-reader.cpp b/heron/common/src/cpp/config/heron-internals-config-reader.cpp index 5ffe3dd8187..a6ac6df388c 100644 --- a/heron/common/src/cpp/config/heron-internals-config-reader.cpp +++ b/heron/common/src/cpp/config/heron-internals-config-reader.cpp @@ -200,6 +200,10 @@ sp_int32 HeronInternalsConfigReader::GetHeronStreammgrCacheDrainSizeMb() { return config_[HeronInternalsConfigVars::HERON_STREAMMGR_CACHE_DRAIN_SIZE_MB].as(); } +sp_int32 HeronInternalsConfigReader::GetHeronStreammgrMempoolSize() { + return config_[HeronInternalsConfigVars::HERON_STREAMMGR_MEMPOOL_SIZE].as(); +} + sp_int32 HeronInternalsConfigReader::GetHeronStreammgrXormgrRotatingmapNbuckets() { return config_[HeronInternalsConfigVars::HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS].as(); } diff --git a/heron/common/src/cpp/config/heron-internals-config-reader.h b/heron/common/src/cpp/config/heron-internals-config-reader.h index c19ea9d702a..46cab088d41 100644 --- a/heron/common/src/cpp/config/heron-internals-config-reader.h +++ b/heron/common/src/cpp/config/heron-internals-config-reader.h @@ -152,6 +152,9 @@ class HeronInternalsConfigReader : public YamlFileReader { // The sized based threshold in MB for draining the tuple cache sp_int32 GetHeronStreammgrCacheDrainSizeMb(); + // For the size of the memory pool for each type of messages + sp_int32 GetHeronStreammgrMempoolSize(); + // Get the Nbucket value, for efficient acknowledgement sp_int32 GetHeronStreammgrXormgrRotatingmapNbuckets(); diff --git a/heron/common/src/cpp/config/heron-internals-config-vars.cpp b/heron/common/src/cpp/config/heron-internals-config-vars.cpp index fd2ce84cc19..5b5aaa646b3 100644 --- a/heron/common/src/cpp/config/heron-internals-config-vars.cpp +++ b/heron/common/src/cpp/config/heron-internals-config-vars.cpp @@ -88,6 +88,8 @@ const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_CACHE_DRAIN_FREQUENCY_ "heron.streammgr.cache.drain.frequency.ms"; const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_CACHE_DRAIN_SIZE_MB = "heron.streammgr.cache.drain.size.mb"; +const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_MEMPOOL_SIZE = + "heron.streammgr.mempool.size"; const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS = "heron.streammgr.xormgr.rotatingmap.nbuckets"; const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_CLIENT_RECONNECT_INTERVAL_SEC = diff --git a/heron/common/src/cpp/config/heron-internals-config-vars.h b/heron/common/src/cpp/config/heron-internals-config-vars.h index 8d4ab540cef..f074a93087d 100644 --- a/heron/common/src/cpp/config/heron-internals-config-vars.h +++ b/heron/common/src/cpp/config/heron-internals-config-vars.h @@ -140,6 +140,9 @@ class HeronInternalsConfigVars { // The sized based threshold in MB for draining the tuple cache static const sp_string HERON_STREAMMGR_CACHE_DRAIN_SIZE_MB; + // For the size of the memory pool for each type of messages + static const sp_string HERON_STREAMMGR_MEMPOOL_SIZE; + // For efficient acknowledgement static const sp_string HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS; diff --git a/heron/common/src/cpp/network/client.h b/heron/common/src/cpp/network/client.h index 7c640f2b259..fc64c4e4585 100644 --- a/heron/common/src/cpp/network/client.h +++ b/heron/common/src/cpp/network/client.h @@ -174,8 +174,9 @@ class Client : public BaseClient { // Return the underlying EventLoop. EventLoop* getEventLoop() { return eventLoop_; } - // TODO(mfu): - MemPool _heron_message_pool; + void set_pool_size(sp_int32 size) { + _heron_message_pool.set_size(size); + } template void release(M* m) { @@ -214,6 +215,8 @@ class Client : public BaseClient { virtual void StopBackPressureConnectionCb(Connection* _connection); private: + MemPool _heron_message_pool; + //! Imlement methods of BaseClient virtual BaseConnection* CreateConnection(ConnectionEndPoint* endpoint, ConnectionOptions* options, EventLoop* eventLoop); diff --git a/heron/common/src/cpp/network/mempool.h b/heron/common/src/cpp/network/mempool.h index 569cf7282bb..5ebb82cf107 100644 --- a/heron/common/src/cpp/network/mempool.h +++ b/heron/common/src/cpp/network/mempool.h @@ -52,7 +52,7 @@ class BaseMemPool { template class MemPool { public: - MemPool() { + MemPool() : size_(0) { } // TODO(cwang): we have a memory leak here. @@ -66,6 +66,10 @@ class MemPool { map_.clear(); } + void set_size(sp_int32 size) { + size_ = size; + } + template M* acquire(M* m) { std::type_index type = typeid(M); @@ -83,8 +87,7 @@ class MemPool { void release(M* ptr) { std::type_index type = typeid(M); auto& pool = map_[type]; - // TODO(cwang): expose this limit via config? - if (pool.size() > 2048) { + if (pool.size() > size_) { auto first = pool.front(); pool.pop_front(); delete first; @@ -93,6 +96,7 @@ class MemPool { } private: + sp_int32 size_; std::unordered_map> map_; }; diff --git a/heron/common/src/cpp/network/server.h b/heron/common/src/cpp/network/server.h index ed78e03fe1d..3d2b9ae08b5 100644 --- a/heron/common/src/cpp/network/server.h +++ b/heron/common/src/cpp/network/server.h @@ -208,7 +208,9 @@ class Server : public BaseServer { // Called when the connection is closed virtual void HandleConnectionClose_Base(BaseConnection* connection, NetworkErrorCode _status); - MemPool _heron_message_pool; + void set_pool_size(sp_int32 size) { + _heron_message_pool.set_size(size); + } template void release(M* m) { @@ -221,6 +223,8 @@ class Server : public BaseServer { } private: + MemPool _heron_message_pool; + // When a new packet arrives on the connection, this is invoked by the Connection void OnNewPacket(Connection* connection, IncomingPacket* packet); diff --git a/heron/config/src/yaml/conf/aurora/heron_internals.yaml b/heron/config/src/yaml/conf/aurora/heron_internals.yaml index 6abf8f539d0..5f6ffa07491 100644 --- a/heron/config/src/yaml/conf/aurora/heron_internals.yaml +++ b/heron/config/src/yaml/conf/aurora/heron_internals.yaml @@ -61,6 +61,9 @@ heron.streammgr.cache.drain.size.mb: 100 # For efficient acknowledgements heron.streammgr.xormgr.rotatingmap.nbuckets: 3 +# For the size of the memory pool for each type of messages +heron.streammgr.mempool.size: 3072 + # The reconnect interval to other stream managers in secs for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/config/src/yaml/conf/examples/heron_internals.yaml b/heron/config/src/yaml/conf/examples/heron_internals.yaml index 6abf8f539d0..5f6ffa07491 100644 --- a/heron/config/src/yaml/conf/examples/heron_internals.yaml +++ b/heron/config/src/yaml/conf/examples/heron_internals.yaml @@ -61,6 +61,9 @@ heron.streammgr.cache.drain.size.mb: 100 # For efficient acknowledgements heron.streammgr.xormgr.rotatingmap.nbuckets: 3 +# For the size of the memory pool for each type of messages +heron.streammgr.mempool.size: 3072 + # The reconnect interval to other stream managers in secs for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/config/src/yaml/conf/local/heron_internals.yaml b/heron/config/src/yaml/conf/local/heron_internals.yaml index ab44597c87f..d44b69605a0 100644 --- a/heron/config/src/yaml/conf/local/heron_internals.yaml +++ b/heron/config/src/yaml/conf/local/heron_internals.yaml @@ -61,6 +61,9 @@ heron.streammgr.cache.drain.size.mb: 100 # For efficient acknowledgements heron.streammgr.xormgr.rotatingmap.nbuckets: 3 +# For the size of the memory pool for each type of messages +heron.streammgr.mempool.size: 3072 + # The reconnect interval to other stream managers in secs for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/config/src/yaml/conf/localzk/heron_internals.yaml b/heron/config/src/yaml/conf/localzk/heron_internals.yaml index 6abf8f539d0..5f6ffa07491 100644 --- a/heron/config/src/yaml/conf/localzk/heron_internals.yaml +++ b/heron/config/src/yaml/conf/localzk/heron_internals.yaml @@ -61,6 +61,9 @@ heron.streammgr.cache.drain.size.mb: 100 # For efficient acknowledgements heron.streammgr.xormgr.rotatingmap.nbuckets: 3 +# For the size of the memory pool for each type of messages +heron.streammgr.mempool.size: 3072 + # The reconnect interval to other stream managers in secs for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/config/src/yaml/conf/marathon/heron_internals.yaml b/heron/config/src/yaml/conf/marathon/heron_internals.yaml index 6abf8f539d0..5f6ffa07491 100644 --- a/heron/config/src/yaml/conf/marathon/heron_internals.yaml +++ b/heron/config/src/yaml/conf/marathon/heron_internals.yaml @@ -61,6 +61,9 @@ heron.streammgr.cache.drain.size.mb: 100 # For efficient acknowledgements heron.streammgr.xormgr.rotatingmap.nbuckets: 3 +# For the size of the memory pool for each type of messages +heron.streammgr.mempool.size: 3072 + # The reconnect interval to other stream managers in secs for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/config/src/yaml/conf/mesos/heron_internals.yaml b/heron/config/src/yaml/conf/mesos/heron_internals.yaml index 6abf8f539d0..5f6ffa07491 100644 --- a/heron/config/src/yaml/conf/mesos/heron_internals.yaml +++ b/heron/config/src/yaml/conf/mesos/heron_internals.yaml @@ -61,6 +61,9 @@ heron.streammgr.cache.drain.size.mb: 100 # For efficient acknowledgements heron.streammgr.xormgr.rotatingmap.nbuckets: 3 +# For the size of the memory pool for each type of messages +heron.streammgr.mempool.size: 3072 + # The reconnect interval to other stream managers in secs for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/config/src/yaml/conf/slurm/heron_internals.yaml b/heron/config/src/yaml/conf/slurm/heron_internals.yaml index b19ba51fbb1..fc4c4f5f1a1 100644 --- a/heron/config/src/yaml/conf/slurm/heron_internals.yaml +++ b/heron/config/src/yaml/conf/slurm/heron_internals.yaml @@ -61,6 +61,9 @@ heron.streammgr.cache.drain.size.mb: 100 # For efficient acknowledgements heron.streammgr.xormgr.rotatingmap.nbuckets: 3 +# For the size of the memory pool for each type of messages +heron.streammgr.mempool.size: 3072 + # The reconnect interval to other stream managers in secs for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/config/src/yaml/conf/test/test_heron_internals.yaml b/heron/config/src/yaml/conf/test/test_heron_internals.yaml index b0f449b5aae..49f73e2abba 100644 --- a/heron/config/src/yaml/conf/test/test_heron_internals.yaml +++ b/heron/config/src/yaml/conf/test/test_heron_internals.yaml @@ -49,6 +49,9 @@ heron.streammgr.cache.drain.size.mb: 100 # For efficient acknowledgement heron.streammgr.xormgr.rotatingmap.nbuckets: 3 +# For the size of the memory pool for each type of messages +heron.streammgr.mempool.size: 3072 + # The reconnect interval to other stream managers in second for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/config/src/yaml/conf/yarn/heron_internals.yaml b/heron/config/src/yaml/conf/yarn/heron_internals.yaml index 6abf8f539d0..5f6ffa07491 100644 --- a/heron/config/src/yaml/conf/yarn/heron_internals.yaml +++ b/heron/config/src/yaml/conf/yarn/heron_internals.yaml @@ -61,6 +61,9 @@ heron.streammgr.cache.drain.size.mb: 100 # For efficient acknowledgements heron.streammgr.xormgr.rotatingmap.nbuckets: 3 +# For the size of the memory pool for each type of messages +heron.streammgr.mempool.size: 3072 + # The reconnect interval to other stream managers in secs for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/stmgr/src/cpp/manager/stmgr-client.cpp b/heron/stmgr/src/cpp/manager/stmgr-client.cpp index b985cbdf1e2..633876ccd34 100644 --- a/heron/stmgr/src/cpp/manager/stmgr-client.cpp +++ b/heron/stmgr/src/cpp/manager/stmgr-client.cpp @@ -72,6 +72,10 @@ StMgrClient::StMgrClient(EventLoop* eventLoop, const NetworkOptions& _options, stmgr_client_metrics_ = new heron::common::MultiCountMetric(); metrics_manager_client_->register_metric("__client_" + other_stmgr_id_, stmgr_client_metrics_); + + sp_int32 pool_size = + config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrMempoolSize(); + set_pool_size(pool_size); } StMgrClient::~StMgrClient() { diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.cpp b/heron/stmgr/src/cpp/manager/stmgr-server.cpp index fced037edd0..e36da8f1c0a 100644 --- a/heron/stmgr/src/cpp/manager/stmgr-server.cpp +++ b/heron/stmgr/src/cpp/manager/stmgr-server.cpp @@ -26,6 +26,7 @@ #include "network/network.h" #include "config/helper.h" #include "metrics/metrics.h" +#include "config/heron-internals-config-reader.h" namespace heron { namespace stmgr { @@ -107,6 +108,10 @@ StMgrServer::StMgrServer(EventLoop* eventLoop, const NetworkOptions& _options, metrics_manager_client_->register_metric(METRIC_TIME_SPENT_BACK_PRESSURE_INIT, back_pressure_metric_initiated_); spouts_under_back_pressure_ = false; + + sp_int32 pool_size = + config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrMempoolSize(); + set_pool_size(pool_size); } StMgrServer::~StMgrServer() { From 0aebc09cd3abf7a933481b3d3ee3282a5af23bfa Mon Sep 17 00:00:00 2001 From: Cong Wang Date: Tue, 22 Nov 2016 16:43:09 -0800 Subject: [PATCH 03/11] Use memory size as the limit --- .../config/heron-internals-config-reader.cpp | 4 ++-- .../cpp/config/heron-internals-config-reader.h | 2 +- .../cpp/config/heron-internals-config-vars.cpp | 4 ++-- .../cpp/config/heron-internals-config-vars.h | 2 +- heron/common/src/cpp/network/client.h | 4 ++-- heron/common/src/cpp/network/mempool.h | 17 +++++++++++++---- heron/common/src/cpp/network/server.h | 4 ++-- .../src/yaml/conf/aurora/heron_internals.yaml | 2 +- .../src/yaml/conf/examples/heron_internals.yaml | 2 +- .../src/yaml/conf/local/heron_internals.yaml | 2 +- .../src/yaml/conf/localzk/heron_internals.yaml | 2 +- .../src/yaml/conf/marathon/heron_internals.yaml | 2 +- .../src/yaml/conf/mesos/heron_internals.yaml | 2 +- .../src/yaml/conf/slurm/heron_internals.yaml | 2 +- .../yaml/conf/test/test_heron_internals.yaml | 2 +- .../src/yaml/conf/yarn/heron_internals.yaml | 2 +- heron/stmgr/src/cpp/manager/stmgr-client.cpp | 6 +++--- heron/stmgr/src/cpp/manager/stmgr-server.cpp | 6 +++--- 18 files changed, 38 insertions(+), 29 deletions(-) diff --git a/heron/common/src/cpp/config/heron-internals-config-reader.cpp b/heron/common/src/cpp/config/heron-internals-config-reader.cpp index a6ac6df388c..231f65bb1ac 100644 --- a/heron/common/src/cpp/config/heron-internals-config-reader.cpp +++ b/heron/common/src/cpp/config/heron-internals-config-reader.cpp @@ -200,8 +200,8 @@ sp_int32 HeronInternalsConfigReader::GetHeronStreammgrCacheDrainSizeMb() { return config_[HeronInternalsConfigVars::HERON_STREAMMGR_CACHE_DRAIN_SIZE_MB].as(); } -sp_int32 HeronInternalsConfigReader::GetHeronStreammgrMempoolSize() { - return config_[HeronInternalsConfigVars::HERON_STREAMMGR_MEMPOOL_SIZE].as(); +sp_int32 HeronInternalsConfigReader::GetHeronStreammgrMempoolSizeMb() { + return config_[HeronInternalsConfigVars::HERON_STREAMMGR_MEMPOOL_SIZE_MB].as(); } sp_int32 HeronInternalsConfigReader::GetHeronStreammgrXormgrRotatingmapNbuckets() { diff --git a/heron/common/src/cpp/config/heron-internals-config-reader.h b/heron/common/src/cpp/config/heron-internals-config-reader.h index 46cab088d41..88259493bf6 100644 --- a/heron/common/src/cpp/config/heron-internals-config-reader.h +++ b/heron/common/src/cpp/config/heron-internals-config-reader.h @@ -153,7 +153,7 @@ class HeronInternalsConfigReader : public YamlFileReader { sp_int32 GetHeronStreammgrCacheDrainSizeMb(); // For the size of the memory pool for each type of messages - sp_int32 GetHeronStreammgrMempoolSize(); + sp_int32 GetHeronStreammgrMempoolSizeMb(); // Get the Nbucket value, for efficient acknowledgement sp_int32 GetHeronStreammgrXormgrRotatingmapNbuckets(); diff --git a/heron/common/src/cpp/config/heron-internals-config-vars.cpp b/heron/common/src/cpp/config/heron-internals-config-vars.cpp index 5b5aaa646b3..385f4d71be2 100644 --- a/heron/common/src/cpp/config/heron-internals-config-vars.cpp +++ b/heron/common/src/cpp/config/heron-internals-config-vars.cpp @@ -88,8 +88,8 @@ const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_CACHE_DRAIN_FREQUENCY_ "heron.streammgr.cache.drain.frequency.ms"; const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_CACHE_DRAIN_SIZE_MB = "heron.streammgr.cache.drain.size.mb"; -const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_MEMPOOL_SIZE = - "heron.streammgr.mempool.size"; +const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_MEMPOOL_SIZE_MB = + "heron.streammgr.mempool.size.mb"; const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS = "heron.streammgr.xormgr.rotatingmap.nbuckets"; const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_CLIENT_RECONNECT_INTERVAL_SEC = diff --git a/heron/common/src/cpp/config/heron-internals-config-vars.h b/heron/common/src/cpp/config/heron-internals-config-vars.h index f074a93087d..ba4d0818c40 100644 --- a/heron/common/src/cpp/config/heron-internals-config-vars.h +++ b/heron/common/src/cpp/config/heron-internals-config-vars.h @@ -141,7 +141,7 @@ class HeronInternalsConfigVars { static const sp_string HERON_STREAMMGR_CACHE_DRAIN_SIZE_MB; // For the size of the memory pool for each type of messages - static const sp_string HERON_STREAMMGR_MEMPOOL_SIZE; + static const sp_string HERON_STREAMMGR_MEMPOOL_SIZE_MB; // For efficient acknowledgement static const sp_string HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS; diff --git a/heron/common/src/cpp/network/client.h b/heron/common/src/cpp/network/client.h index fc64c4e4585..f9e94d84cf6 100644 --- a/heron/common/src/cpp/network/client.h +++ b/heron/common/src/cpp/network/client.h @@ -174,8 +174,8 @@ class Client : public BaseClient { // Return the underlying EventLoop. EventLoop* getEventLoop() { return eventLoop_; } - void set_pool_size(sp_int32 size) { - _heron_message_pool.set_size(size); + void set_pool_limit(sp_int32 limit) { + _heron_message_pool.set_limit(limit); } template diff --git a/heron/common/src/cpp/network/mempool.h b/heron/common/src/cpp/network/mempool.h index 5ebb82cf107..90357d66da6 100644 --- a/heron/common/src/cpp/network/mempool.h +++ b/heron/common/src/cpp/network/mempool.h @@ -52,7 +52,7 @@ class BaseMemPool { template class MemPool { public: - MemPool() : size_(0) { + MemPool() : size_(0), size_limit_(0) { } // TODO(cwang): we have a memory leak here. @@ -66,8 +66,8 @@ class MemPool { map_.clear(); } - void set_size(sp_int32 size) { - size_ = size; + void set_limit(sp_int32 limit) { + size_limit_ = limit; } template @@ -80,23 +80,32 @@ class MemPool { } B* t = pool.back(); pool.pop_back(); + size_ -= sizeof(M); return static_cast(t); } template void release(M* ptr) { + if (size_limit_ == 0) { + delete ptr; + return; + } + std::type_index type = typeid(M); auto& pool = map_[type]; - if (pool.size() > size_) { + if (size_ > size_limit_) { auto first = pool.front(); pool.pop_front(); + size_ -= sizeof(*first); delete first; } pool.push_back(static_cast(ptr)); + size_ += sizeof(M); } private: sp_int32 size_; + sp_int32 size_limit_; std::unordered_map> map_; }; diff --git a/heron/common/src/cpp/network/server.h b/heron/common/src/cpp/network/server.h index 3d2b9ae08b5..7fc4f38ad6f 100644 --- a/heron/common/src/cpp/network/server.h +++ b/heron/common/src/cpp/network/server.h @@ -208,8 +208,8 @@ class Server : public BaseServer { // Called when the connection is closed virtual void HandleConnectionClose_Base(BaseConnection* connection, NetworkErrorCode _status); - void set_pool_size(sp_int32 size) { - _heron_message_pool.set_size(size); + void set_pool_limit(sp_int32 limit) { + _heron_message_pool.set_limit(limit); } template diff --git a/heron/config/src/yaml/conf/aurora/heron_internals.yaml b/heron/config/src/yaml/conf/aurora/heron_internals.yaml index 5f6ffa07491..0c49fb5aa4b 100644 --- a/heron/config/src/yaml/conf/aurora/heron_internals.yaml +++ b/heron/config/src/yaml/conf/aurora/heron_internals.yaml @@ -62,7 +62,7 @@ heron.streammgr.cache.drain.size.mb: 100 heron.streammgr.xormgr.rotatingmap.nbuckets: 3 # For the size of the memory pool for each type of messages -heron.streammgr.mempool.size: 3072 +heron.streammgr.mempool.size.mb: 100 # The reconnect interval to other stream managers in secs for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/config/src/yaml/conf/examples/heron_internals.yaml b/heron/config/src/yaml/conf/examples/heron_internals.yaml index 5f6ffa07491..0c49fb5aa4b 100644 --- a/heron/config/src/yaml/conf/examples/heron_internals.yaml +++ b/heron/config/src/yaml/conf/examples/heron_internals.yaml @@ -62,7 +62,7 @@ heron.streammgr.cache.drain.size.mb: 100 heron.streammgr.xormgr.rotatingmap.nbuckets: 3 # For the size of the memory pool for each type of messages -heron.streammgr.mempool.size: 3072 +heron.streammgr.mempool.size.mb: 100 # The reconnect interval to other stream managers in secs for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/config/src/yaml/conf/local/heron_internals.yaml b/heron/config/src/yaml/conf/local/heron_internals.yaml index d44b69605a0..e625a970a61 100644 --- a/heron/config/src/yaml/conf/local/heron_internals.yaml +++ b/heron/config/src/yaml/conf/local/heron_internals.yaml @@ -62,7 +62,7 @@ heron.streammgr.cache.drain.size.mb: 100 heron.streammgr.xormgr.rotatingmap.nbuckets: 3 # For the size of the memory pool for each type of messages -heron.streammgr.mempool.size: 3072 +heron.streammgr.mempool.size.mb: 100 # The reconnect interval to other stream managers in secs for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/config/src/yaml/conf/localzk/heron_internals.yaml b/heron/config/src/yaml/conf/localzk/heron_internals.yaml index 5f6ffa07491..0c49fb5aa4b 100644 --- a/heron/config/src/yaml/conf/localzk/heron_internals.yaml +++ b/heron/config/src/yaml/conf/localzk/heron_internals.yaml @@ -62,7 +62,7 @@ heron.streammgr.cache.drain.size.mb: 100 heron.streammgr.xormgr.rotatingmap.nbuckets: 3 # For the size of the memory pool for each type of messages -heron.streammgr.mempool.size: 3072 +heron.streammgr.mempool.size.mb: 100 # The reconnect interval to other stream managers in secs for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/config/src/yaml/conf/marathon/heron_internals.yaml b/heron/config/src/yaml/conf/marathon/heron_internals.yaml index 5f6ffa07491..0c49fb5aa4b 100644 --- a/heron/config/src/yaml/conf/marathon/heron_internals.yaml +++ b/heron/config/src/yaml/conf/marathon/heron_internals.yaml @@ -62,7 +62,7 @@ heron.streammgr.cache.drain.size.mb: 100 heron.streammgr.xormgr.rotatingmap.nbuckets: 3 # For the size of the memory pool for each type of messages -heron.streammgr.mempool.size: 3072 +heron.streammgr.mempool.size.mb: 100 # The reconnect interval to other stream managers in secs for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/config/src/yaml/conf/mesos/heron_internals.yaml b/heron/config/src/yaml/conf/mesos/heron_internals.yaml index 5f6ffa07491..0c49fb5aa4b 100644 --- a/heron/config/src/yaml/conf/mesos/heron_internals.yaml +++ b/heron/config/src/yaml/conf/mesos/heron_internals.yaml @@ -62,7 +62,7 @@ heron.streammgr.cache.drain.size.mb: 100 heron.streammgr.xormgr.rotatingmap.nbuckets: 3 # For the size of the memory pool for each type of messages -heron.streammgr.mempool.size: 3072 +heron.streammgr.mempool.size.mb: 100 # The reconnect interval to other stream managers in secs for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/config/src/yaml/conf/slurm/heron_internals.yaml b/heron/config/src/yaml/conf/slurm/heron_internals.yaml index fc4c4f5f1a1..8e148b6752d 100644 --- a/heron/config/src/yaml/conf/slurm/heron_internals.yaml +++ b/heron/config/src/yaml/conf/slurm/heron_internals.yaml @@ -62,7 +62,7 @@ heron.streammgr.cache.drain.size.mb: 100 heron.streammgr.xormgr.rotatingmap.nbuckets: 3 # For the size of the memory pool for each type of messages -heron.streammgr.mempool.size: 3072 +heron.streammgr.mempool.size.mb: 100 # The reconnect interval to other stream managers in secs for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/config/src/yaml/conf/test/test_heron_internals.yaml b/heron/config/src/yaml/conf/test/test_heron_internals.yaml index 49f73e2abba..1fb1efb946b 100644 --- a/heron/config/src/yaml/conf/test/test_heron_internals.yaml +++ b/heron/config/src/yaml/conf/test/test_heron_internals.yaml @@ -50,7 +50,7 @@ heron.streammgr.cache.drain.size.mb: 100 heron.streammgr.xormgr.rotatingmap.nbuckets: 3 # For the size of the memory pool for each type of messages -heron.streammgr.mempool.size: 3072 +heron.streammgr.mempool.size.mb: 100 # The reconnect interval to other stream managers in second for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/config/src/yaml/conf/yarn/heron_internals.yaml b/heron/config/src/yaml/conf/yarn/heron_internals.yaml index 5f6ffa07491..0c49fb5aa4b 100644 --- a/heron/config/src/yaml/conf/yarn/heron_internals.yaml +++ b/heron/config/src/yaml/conf/yarn/heron_internals.yaml @@ -62,7 +62,7 @@ heron.streammgr.cache.drain.size.mb: 100 heron.streammgr.xormgr.rotatingmap.nbuckets: 3 # For the size of the memory pool for each type of messages -heron.streammgr.mempool.size: 3072 +heron.streammgr.mempool.size.mb: 100 # The reconnect interval to other stream managers in secs for stream manager client heron.streammgr.client.reconnect.interval.sec: 1 diff --git a/heron/stmgr/src/cpp/manager/stmgr-client.cpp b/heron/stmgr/src/cpp/manager/stmgr-client.cpp index 633876ccd34..d50503c31a5 100644 --- a/heron/stmgr/src/cpp/manager/stmgr-client.cpp +++ b/heron/stmgr/src/cpp/manager/stmgr-client.cpp @@ -73,9 +73,9 @@ StMgrClient::StMgrClient(EventLoop* eventLoop, const NetworkOptions& _options, stmgr_client_metrics_ = new heron::common::MultiCountMetric(); metrics_manager_client_->register_metric("__client_" + other_stmgr_id_, stmgr_client_metrics_); - sp_int32 pool_size = - config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrMempoolSize(); - set_pool_size(pool_size); + sp_int32 pool_limit = + config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrMempoolSizeMb(); + set_pool_limit(pool_limit * 1024 * 1024); } StMgrClient::~StMgrClient() { diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.cpp b/heron/stmgr/src/cpp/manager/stmgr-server.cpp index e36da8f1c0a..2e3826c2668 100644 --- a/heron/stmgr/src/cpp/manager/stmgr-server.cpp +++ b/heron/stmgr/src/cpp/manager/stmgr-server.cpp @@ -109,9 +109,9 @@ StMgrServer::StMgrServer(EventLoop* eventLoop, const NetworkOptions& _options, back_pressure_metric_initiated_); spouts_under_back_pressure_ = false; - sp_int32 pool_size = - config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrMempoolSize(); - set_pool_size(pool_size); + sp_int32 pool_limit = + config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrMempoolSizeMb(); + set_pool_limit(pool_limit * 1024 * 1024); } StMgrServer::~StMgrServer() { From eef591a3b107c2fa0129158150a10b3a7b25cf4c Mon Sep 17 00:00:00 2001 From: Cong Wang Date: Wed, 23 Nov 2016 10:27:30 -0800 Subject: [PATCH 04/11] Update comments --- heron/common/src/cpp/config/heron-internals-config-reader.h | 2 +- heron/common/src/cpp/config/heron-internals-config-vars.h | 2 +- heron/config/src/yaml/conf/aurora/heron_internals.yaml | 2 +- heron/config/src/yaml/conf/examples/heron_internals.yaml | 2 +- heron/config/src/yaml/conf/local/heron_internals.yaml | 2 +- heron/config/src/yaml/conf/localzk/heron_internals.yaml | 2 +- heron/config/src/yaml/conf/marathon/heron_internals.yaml | 2 +- heron/config/src/yaml/conf/mesos/heron_internals.yaml | 2 +- heron/config/src/yaml/conf/slurm/heron_internals.yaml | 2 +- heron/config/src/yaml/conf/test/test_heron_internals.yaml | 2 +- heron/config/src/yaml/conf/yarn/heron_internals.yaml | 2 +- 11 files changed, 11 insertions(+), 11 deletions(-) diff --git a/heron/common/src/cpp/config/heron-internals-config-reader.h b/heron/common/src/cpp/config/heron-internals-config-reader.h index 88259493bf6..dc3ea9ff2a8 100644 --- a/heron/common/src/cpp/config/heron-internals-config-reader.h +++ b/heron/common/src/cpp/config/heron-internals-config-reader.h @@ -152,7 +152,7 @@ class HeronInternalsConfigReader : public YamlFileReader { // The sized based threshold in MB for draining the tuple cache sp_int32 GetHeronStreammgrCacheDrainSizeMb(); - // For the size of the memory pool for each type of messages + // The max size of the memory pool for each type of messages sp_int32 GetHeronStreammgrMempoolSizeMb(); // Get the Nbucket value, for efficient acknowledgement diff --git a/heron/common/src/cpp/config/heron-internals-config-vars.h b/heron/common/src/cpp/config/heron-internals-config-vars.h index ba4d0818c40..682bfb29ec2 100644 --- a/heron/common/src/cpp/config/heron-internals-config-vars.h +++ b/heron/common/src/cpp/config/heron-internals-config-vars.h @@ -140,7 +140,7 @@ class HeronInternalsConfigVars { // The sized based threshold in MB for draining the tuple cache static const sp_string HERON_STREAMMGR_CACHE_DRAIN_SIZE_MB; - // For the size of the memory pool for each type of messages + // The max size of the memory pool for each type of messages static const sp_string HERON_STREAMMGR_MEMPOOL_SIZE_MB; // For efficient acknowledgement diff --git a/heron/config/src/yaml/conf/aurora/heron_internals.yaml b/heron/config/src/yaml/conf/aurora/heron_internals.yaml index 0c49fb5aa4b..57f11589a07 100644 --- a/heron/config/src/yaml/conf/aurora/heron_internals.yaml +++ b/heron/config/src/yaml/conf/aurora/heron_internals.yaml @@ -61,7 +61,7 @@ heron.streammgr.cache.drain.size.mb: 100 # For efficient acknowledgements heron.streammgr.xormgr.rotatingmap.nbuckets: 3 -# For the size of the memory pool for each type of messages +# The max size of the memory pool for all types of messages heron.streammgr.mempool.size.mb: 100 # The reconnect interval to other stream managers in secs for stream manager client diff --git a/heron/config/src/yaml/conf/examples/heron_internals.yaml b/heron/config/src/yaml/conf/examples/heron_internals.yaml index 0c49fb5aa4b..57f11589a07 100644 --- a/heron/config/src/yaml/conf/examples/heron_internals.yaml +++ b/heron/config/src/yaml/conf/examples/heron_internals.yaml @@ -61,7 +61,7 @@ heron.streammgr.cache.drain.size.mb: 100 # For efficient acknowledgements heron.streammgr.xormgr.rotatingmap.nbuckets: 3 -# For the size of the memory pool for each type of messages +# The max size of the memory pool for all types of messages heron.streammgr.mempool.size.mb: 100 # The reconnect interval to other stream managers in secs for stream manager client diff --git a/heron/config/src/yaml/conf/local/heron_internals.yaml b/heron/config/src/yaml/conf/local/heron_internals.yaml index e625a970a61..9281ac43855 100644 --- a/heron/config/src/yaml/conf/local/heron_internals.yaml +++ b/heron/config/src/yaml/conf/local/heron_internals.yaml @@ -61,7 +61,7 @@ heron.streammgr.cache.drain.size.mb: 100 # For efficient acknowledgements heron.streammgr.xormgr.rotatingmap.nbuckets: 3 -# For the size of the memory pool for each type of messages +# The max size of the memory pool for all types of messages heron.streammgr.mempool.size.mb: 100 # The reconnect interval to other stream managers in secs for stream manager client diff --git a/heron/config/src/yaml/conf/localzk/heron_internals.yaml b/heron/config/src/yaml/conf/localzk/heron_internals.yaml index 0c49fb5aa4b..57f11589a07 100644 --- a/heron/config/src/yaml/conf/localzk/heron_internals.yaml +++ b/heron/config/src/yaml/conf/localzk/heron_internals.yaml @@ -61,7 +61,7 @@ heron.streammgr.cache.drain.size.mb: 100 # For efficient acknowledgements heron.streammgr.xormgr.rotatingmap.nbuckets: 3 -# For the size of the memory pool for each type of messages +# The max size of the memory pool for all types of messages heron.streammgr.mempool.size.mb: 100 # The reconnect interval to other stream managers in secs for stream manager client diff --git a/heron/config/src/yaml/conf/marathon/heron_internals.yaml b/heron/config/src/yaml/conf/marathon/heron_internals.yaml index 0c49fb5aa4b..57f11589a07 100644 --- a/heron/config/src/yaml/conf/marathon/heron_internals.yaml +++ b/heron/config/src/yaml/conf/marathon/heron_internals.yaml @@ -61,7 +61,7 @@ heron.streammgr.cache.drain.size.mb: 100 # For efficient acknowledgements heron.streammgr.xormgr.rotatingmap.nbuckets: 3 -# For the size of the memory pool for each type of messages +# The max size of the memory pool for all types of messages heron.streammgr.mempool.size.mb: 100 # The reconnect interval to other stream managers in secs for stream manager client diff --git a/heron/config/src/yaml/conf/mesos/heron_internals.yaml b/heron/config/src/yaml/conf/mesos/heron_internals.yaml index 0c49fb5aa4b..57f11589a07 100644 --- a/heron/config/src/yaml/conf/mesos/heron_internals.yaml +++ b/heron/config/src/yaml/conf/mesos/heron_internals.yaml @@ -61,7 +61,7 @@ heron.streammgr.cache.drain.size.mb: 100 # For efficient acknowledgements heron.streammgr.xormgr.rotatingmap.nbuckets: 3 -# For the size of the memory pool for each type of messages +# The max size of the memory pool for all types of messages heron.streammgr.mempool.size.mb: 100 # The reconnect interval to other stream managers in secs for stream manager client diff --git a/heron/config/src/yaml/conf/slurm/heron_internals.yaml b/heron/config/src/yaml/conf/slurm/heron_internals.yaml index 8e148b6752d..b3383dd63bd 100644 --- a/heron/config/src/yaml/conf/slurm/heron_internals.yaml +++ b/heron/config/src/yaml/conf/slurm/heron_internals.yaml @@ -61,7 +61,7 @@ heron.streammgr.cache.drain.size.mb: 100 # For efficient acknowledgements heron.streammgr.xormgr.rotatingmap.nbuckets: 3 -# For the size of the memory pool for each type of messages +# The max size of the memory pool for all types of messages heron.streammgr.mempool.size.mb: 100 # The reconnect interval to other stream managers in secs for stream manager client diff --git a/heron/config/src/yaml/conf/test/test_heron_internals.yaml b/heron/config/src/yaml/conf/test/test_heron_internals.yaml index 1fb1efb946b..cbe24af4390 100644 --- a/heron/config/src/yaml/conf/test/test_heron_internals.yaml +++ b/heron/config/src/yaml/conf/test/test_heron_internals.yaml @@ -49,7 +49,7 @@ heron.streammgr.cache.drain.size.mb: 100 # For efficient acknowledgement heron.streammgr.xormgr.rotatingmap.nbuckets: 3 -# For the size of the memory pool for each type of messages +# The max size of the memory pool for all types of messages heron.streammgr.mempool.size.mb: 100 # The reconnect interval to other stream managers in second for stream manager client diff --git a/heron/config/src/yaml/conf/yarn/heron_internals.yaml b/heron/config/src/yaml/conf/yarn/heron_internals.yaml index 0c49fb5aa4b..57f11589a07 100644 --- a/heron/config/src/yaml/conf/yarn/heron_internals.yaml +++ b/heron/config/src/yaml/conf/yarn/heron_internals.yaml @@ -61,7 +61,7 @@ heron.streammgr.cache.drain.size.mb: 100 # For efficient acknowledgements heron.streammgr.xormgr.rotatingmap.nbuckets: 3 -# For the size of the memory pool for each type of messages +# The max size of the memory pool for all types of messages heron.streammgr.mempool.size.mb: 100 # The reconnect interval to other stream managers in secs for stream manager client From b0852b80e42876206c00b42bab413c9be6ccb1eb Mon Sep 17 00:00:00 2001 From: Cong Wang Date: Wed, 23 Nov 2016 10:33:09 -0800 Subject: [PATCH 05/11] Fix size calculation in mempool --- heron/common/src/cpp/network/mempool.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/heron/common/src/cpp/network/mempool.h b/heron/common/src/cpp/network/mempool.h index 90357d66da6..0e8b8a86857 100644 --- a/heron/common/src/cpp/network/mempool.h +++ b/heron/common/src/cpp/network/mempool.h @@ -96,7 +96,7 @@ class MemPool { if (size_ > size_limit_) { auto first = pool.front(); pool.pop_front(); - size_ -= sizeof(*first); + size_ -= sizeof(M); delete first; } pool.push_back(static_cast(ptr)); From dd09ed8fef49e8eb68e05a79e70d6f68a2025bfe Mon Sep 17 00:00:00 2001 From: Cong Wang Date: Wed, 23 Nov 2016 16:16:33 -0800 Subject: [PATCH 06/11] Update comments --- heron/common/src/cpp/config/heron-internals-config-reader.h | 2 +- heron/common/src/cpp/config/heron-internals-config-vars.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/heron/common/src/cpp/config/heron-internals-config-reader.h b/heron/common/src/cpp/config/heron-internals-config-reader.h index dc3ea9ff2a8..4102cf59e81 100644 --- a/heron/common/src/cpp/config/heron-internals-config-reader.h +++ b/heron/common/src/cpp/config/heron-internals-config-reader.h @@ -152,7 +152,7 @@ class HeronInternalsConfigReader : public YamlFileReader { // The sized based threshold in MB for draining the tuple cache sp_int32 GetHeronStreammgrCacheDrainSizeMb(); - // The max size of the memory pool for each type of messages + // The max size of the memory pool for all types of messages sp_int32 GetHeronStreammgrMempoolSizeMb(); // Get the Nbucket value, for efficient acknowledgement diff --git a/heron/common/src/cpp/config/heron-internals-config-vars.h b/heron/common/src/cpp/config/heron-internals-config-vars.h index 682bfb29ec2..6de1eb9eeaa 100644 --- a/heron/common/src/cpp/config/heron-internals-config-vars.h +++ b/heron/common/src/cpp/config/heron-internals-config-vars.h @@ -140,7 +140,7 @@ class HeronInternalsConfigVars { // The sized based threshold in MB for draining the tuple cache static const sp_string HERON_STREAMMGR_CACHE_DRAIN_SIZE_MB; - // The max size of the memory pool for each type of messages + // The max size of the memory pool for all types of messages static const sp_string HERON_STREAMMGR_MEMPOOL_SIZE_MB; // For efficient acknowledgement From 081aff234cd39d9aab5ed996d0898943356cc91e Mon Sep 17 00:00:00 2001 From: Cong Wang Date: Mon, 28 Nov 2016 13:11:06 -0800 Subject: [PATCH 07/11] Use deque instead of list --- heron/common/src/cpp/network/mempool.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/heron/common/src/cpp/network/mempool.h b/heron/common/src/cpp/network/mempool.h index 0e8b8a86857..e312b72ca86 100644 --- a/heron/common/src/cpp/network/mempool.h +++ b/heron/common/src/cpp/network/mempool.h @@ -16,7 +16,7 @@ #ifndef MEM_POOL_H #define MEM_POOL_H -#include +#include #include #include #include @@ -106,7 +106,7 @@ class MemPool { private: sp_int32 size_; sp_int32 size_limit_; - std::unordered_map> map_; + std::unordered_map> map_; }; #endif From b6969dec51ba7b68c83aebf938343341df3e9dca Mon Sep 17 00:00:00 2001 From: Cong Wang Date: Wed, 30 Nov 2016 13:07:55 -0800 Subject: [PATCH 08/11] Refactor mempool implementation --- heron/common/src/cpp/network/client.h | 4 - heron/common/src/cpp/network/mempool.h | 109 +++++++++---------- heron/common/src/cpp/network/server.h | 4 - heron/stmgr/src/cpp/manager/stmgr-client.cpp | 2 +- heron/stmgr/src/cpp/manager/stmgr-server.cpp | 2 +- heron/stmgr/src/cpp/util/tuple-cache.h | 3 +- 6 files changed, 54 insertions(+), 70 deletions(-) diff --git a/heron/common/src/cpp/network/client.h b/heron/common/src/cpp/network/client.h index f9e94d84cf6..9cd22885c2e 100644 --- a/heron/common/src/cpp/network/client.h +++ b/heron/common/src/cpp/network/client.h @@ -174,10 +174,6 @@ class Client : public BaseClient { // Return the underlying EventLoop. EventLoop* getEventLoop() { return eventLoop_; } - void set_pool_limit(sp_int32 limit) { - _heron_message_pool.set_limit(limit); - } - template void release(M* m) { _heron_message_pool.release(m); diff --git a/heron/common/src/cpp/network/mempool.h b/heron/common/src/cpp/network/mempool.h index e312b72ca86..6f6d2ab1c81 100644 --- a/heron/common/src/cpp/network/mempool.h +++ b/heron/common/src/cpp/network/mempool.h @@ -17,96 +17,87 @@ #define MEM_POOL_H #include -#include -#include #include +#include template class BaseMemPool { public: - template - T* acquire(Args&&... args) { + static void set_limit(sp_int32 limit) { + if (limit_ == 0) + limit_ = limit; + } + + BaseMemPool() { + } + + ~BaseMemPool() { + for (auto& p : pool_) { + delete p; + } + pool_.clear(); + } + + template + S* acquire(S* unused) { if (pool_.empty()) { - return new T(std::forward(args)...); + return new S(); } - T* t = pool_.back(); + S* t = static_cast(pool_.back()); pool_.pop_back(); + size_ -= sizeof(S); return t; } - void release(T* t) { + + template + void release(S* t) { + if (limit_ == 0) { + delete t; + return; + } + if (size_ > limit_) { + auto first = pool_.front(); + pool_.pop_front(); + size_ -= sizeof(S); + delete first; + } pool_.push_back(t); + size_ += sizeof(S); } - BaseMemPool() { - } - ~BaseMemPool() { - for (auto& p : pool_) { - delete p; - } - pool_.clear(); - } + private: - std::vector pool_; + static sp_int32 limit_; + static sp_int32 size_; + std::deque pool_; }; -template +template class MemPool { public: - MemPool() : size_(0), size_limit_(0) { + MemPool() { } - // TODO(cwang): we have a memory leak here. ~MemPool() { - for (auto& m : map_) { - for (auto& n : m.second) { - delete n; - } - m.second.clear(); - } map_.clear(); } - void set_limit(sp_int32 limit) { - size_limit_ = limit; - } - - template - M* acquire(M* m) { - std::type_index type = typeid(M); + template + KeyType* acquire(KeyType* m) { + std::type_index type = typeid(KeyType); auto& pool = map_[type]; - - if (pool.empty()) { - return new M(); - } - B* t = pool.back(); - pool.pop_back(); - size_ -= sizeof(M); - return static_cast(t); + return pool.acquire(m); } - template - void release(M* ptr) { - if (size_limit_ == 0) { - delete ptr; - return; - } - - std::type_index type = typeid(M); + template + void release(KeyType* ptr) { + std::type_index type = typeid(KeyType); auto& pool = map_[type]; - if (size_ > size_limit_) { - auto first = pool.front(); - pool.pop_front(); - size_ -= sizeof(M); - delete first; - } - pool.push_back(static_cast(ptr)); - size_ += sizeof(M); + pool.release(ptr); } private: - sp_int32 size_; - sp_int32 size_limit_; - std::unordered_map> map_; + std::unordered_map> map_; }; #endif diff --git a/heron/common/src/cpp/network/server.h b/heron/common/src/cpp/network/server.h index 7fc4f38ad6f..bbf90f45f3f 100644 --- a/heron/common/src/cpp/network/server.h +++ b/heron/common/src/cpp/network/server.h @@ -208,10 +208,6 @@ class Server : public BaseServer { // Called when the connection is closed virtual void HandleConnectionClose_Base(BaseConnection* connection, NetworkErrorCode _status); - void set_pool_limit(sp_int32 limit) { - _heron_message_pool.set_limit(limit); - } - template void release(M* m) { _heron_message_pool.release(m); diff --git a/heron/stmgr/src/cpp/manager/stmgr-client.cpp b/heron/stmgr/src/cpp/manager/stmgr-client.cpp index d50503c31a5..3ab4063fc8e 100644 --- a/heron/stmgr/src/cpp/manager/stmgr-client.cpp +++ b/heron/stmgr/src/cpp/manager/stmgr-client.cpp @@ -75,7 +75,7 @@ StMgrClient::StMgrClient(EventLoop* eventLoop, const NetworkOptions& _options, sp_int32 pool_limit = config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrMempoolSizeMb(); - set_pool_limit(pool_limit * 1024 * 1024); + BaseMemPool::set_limit(pool_limit * 1024 * 1024); } StMgrClient::~StMgrClient() { diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.cpp b/heron/stmgr/src/cpp/manager/stmgr-server.cpp index 2e3826c2668..3d63f41d279 100644 --- a/heron/stmgr/src/cpp/manager/stmgr-server.cpp +++ b/heron/stmgr/src/cpp/manager/stmgr-server.cpp @@ -111,7 +111,7 @@ StMgrServer::StMgrServer(EventLoop* eventLoop, const NetworkOptions& _options, sp_int32 pool_limit = config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrMempoolSizeMb(); - set_pool_limit(pool_limit * 1024 * 1024); + BaseMemPool::set_limit(pool_limit * 1024 * 1024); } StMgrServer::~StMgrServer() { diff --git a/heron/stmgr/src/cpp/util/tuple-cache.h b/heron/stmgr/src/cpp/util/tuple-cache.h index adf7e47bbf0..e282a5e527b 100644 --- a/heron/stmgr/src/cpp/util/tuple-cache.h +++ b/heron/stmgr/src/cpp/util/tuple-cache.h @@ -74,7 +74,8 @@ class TupleCache { std::function _drainer); proto::system::HeronTupleSet2* acquire() { - return heron_tuple_set_pool_.acquire(); + proto::system::HeronTupleSet2* unused = nullptr; + return heron_tuple_set_pool_.acquire(unused); } proto::system::HeronTupleSet2* acquire_clean_set() { From 340a163d2b5ef83ae71020c7dc7a80d846d64f30 Mon Sep 17 00:00:00 2001 From: Cong Wang Date: Wed, 30 Nov 2016 14:09:41 -0800 Subject: [PATCH 09/11] Fix link failure for static members --- heron/common/src/cpp/network/BUILD | 1 + heron/common/src/cpp/network/mempool.cpp | 23 +++++++++++++++++++++++ heron/common/src/cpp/network/mempool.h | 1 + 3 files changed, 25 insertions(+) create mode 100644 heron/common/src/cpp/network/mempool.cpp diff --git a/heron/common/src/cpp/network/BUILD b/heron/common/src/cpp/network/BUILD index 222da5d2330..493d37f5475 100644 --- a/heron/common/src/cpp/network/BUILD +++ b/heron/common/src/cpp/network/BUILD @@ -19,6 +19,7 @@ cc_library( "networkoptions.cpp", "packet.cpp", "server.cpp", + "mempool.cpp", "regevent.h", "asyncdns.h", diff --git a/heron/common/src/cpp/network/mempool.cpp b/heron/common/src/cpp/network/mempool.cpp new file mode 100644 index 00000000000..e3bdfafd674 --- /dev/null +++ b/heron/common/src/cpp/network/mempool.cpp @@ -0,0 +1,23 @@ +/* + * Copyright 2016 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "network/mempool.h" + +template<> +sp_int32 BaseMemPool::size_ = 0; + +template<> +sp_int32 BaseMemPool::limit_ = 0; diff --git a/heron/common/src/cpp/network/mempool.h b/heron/common/src/cpp/network/mempool.h index 6f6d2ab1c81..77b166d4857 100644 --- a/heron/common/src/cpp/network/mempool.h +++ b/heron/common/src/cpp/network/mempool.h @@ -19,6 +19,7 @@ #include #include #include +#include "basics/sptypes.h" template class BaseMemPool { From 8d13fbbd82a031da9e706bd71e9f5396bdcab504 Mon Sep 17 00:00:00 2001 From: Cong Wang Date: Wed, 30 Nov 2016 14:36:43 -0800 Subject: [PATCH 10/11] Fix static member initialization --- heron/common/src/cpp/network/mempool.cpp | 5 +++-- heron/stmgr/src/cpp/manager/stmgr-client.cpp | 4 ---- heron/stmgr/src/cpp/manager/stmgr-server.cpp | 4 ---- heron/stmgr/src/cpp/server/stmgr-main.cpp | 4 ++++ heron/stmgr/src/cpp/util/tuple-cache.cpp | 9 +++++++++ 5 files changed, 16 insertions(+), 10 deletions(-) diff --git a/heron/common/src/cpp/network/mempool.cpp b/heron/common/src/cpp/network/mempool.cpp index e3bdfafd674..e875539196a 100644 --- a/heron/common/src/cpp/network/mempool.cpp +++ b/heron/common/src/cpp/network/mempool.cpp @@ -15,9 +15,10 @@ */ #include "network/mempool.h" +#include template<> -sp_int32 BaseMemPool::size_ = 0; +sp_int32 BaseMemPool::size_ = 0; template<> -sp_int32 BaseMemPool::limit_ = 0; +sp_int32 BaseMemPool::limit_ = 0; diff --git a/heron/stmgr/src/cpp/manager/stmgr-client.cpp b/heron/stmgr/src/cpp/manager/stmgr-client.cpp index 3ab4063fc8e..b985cbdf1e2 100644 --- a/heron/stmgr/src/cpp/manager/stmgr-client.cpp +++ b/heron/stmgr/src/cpp/manager/stmgr-client.cpp @@ -72,10 +72,6 @@ StMgrClient::StMgrClient(EventLoop* eventLoop, const NetworkOptions& _options, stmgr_client_metrics_ = new heron::common::MultiCountMetric(); metrics_manager_client_->register_metric("__client_" + other_stmgr_id_, stmgr_client_metrics_); - - sp_int32 pool_limit = - config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrMempoolSizeMb(); - BaseMemPool::set_limit(pool_limit * 1024 * 1024); } StMgrClient::~StMgrClient() { diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.cpp b/heron/stmgr/src/cpp/manager/stmgr-server.cpp index 3d63f41d279..53d9f933573 100644 --- a/heron/stmgr/src/cpp/manager/stmgr-server.cpp +++ b/heron/stmgr/src/cpp/manager/stmgr-server.cpp @@ -108,10 +108,6 @@ StMgrServer::StMgrServer(EventLoop* eventLoop, const NetworkOptions& _options, metrics_manager_client_->register_metric(METRIC_TIME_SPENT_BACK_PRESSURE_INIT, back_pressure_metric_initiated_); spouts_under_back_pressure_ = false; - - sp_int32 pool_limit = - config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrMempoolSizeMb(); - BaseMemPool::set_limit(pool_limit * 1024 * 1024); } StMgrServer::~StMgrServer() { diff --git a/heron/stmgr/src/cpp/server/stmgr-main.cpp b/heron/stmgr/src/cpp/server/stmgr-main.cpp index 79404b15802..e6e6272c556 100644 --- a/heron/stmgr/src/cpp/server/stmgr-main.cpp +++ b/heron/stmgr/src/cpp/server/stmgr-main.cpp @@ -70,6 +70,10 @@ int main(int argc, char* argv[]) { LOG(FATAL) << "Corrupt topology defn file" << std::endl; } + sp_int32 pool_limit = + heron::config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrMempoolSizeMb(); + BaseMemPool::set_limit(pool_limit * 1024 * 1024); + heron::stmgr::StMgr mgr(&ss, myport, topology_name, topology_id, topology, myid, instances, zkhostportlist, topdir, metricsmgr_port, shell_port); mgr.Init(); diff --git a/heron/stmgr/src/cpp/util/tuple-cache.cpp b/heron/stmgr/src/cpp/util/tuple-cache.cpp index 0773586ebce..47271b4b518 100644 --- a/heron/stmgr/src/cpp/util/tuple-cache.cpp +++ b/heron/stmgr/src/cpp/util/tuple-cache.cpp @@ -28,6 +28,11 @@ namespace heron { namespace stmgr { +template<> +sp_int32 BaseMemPool::size_ = 0; +template<> +sp_int32 BaseMemPool::limit_ = 0; + TupleCache::TupleCache(EventLoop* eventLoop, sp_uint32 _drain_threshold) : eventLoop_(eventLoop), drain_threshold_bytes_(_drain_threshold) { cache_drain_frequency_ms_ = @@ -38,6 +43,10 @@ TupleCache::TupleCache(EventLoop* eventLoop, sp_uint32 _drain_threshold) total_size_ = 0; auto drain_cb = [this](EventLoop::Status status) { this->drain(status); }; eventLoop_->registerTimer(std::move(drain_cb), true, cache_drain_frequency_ms_ * 1000); + + sp_int32 pool_limit = + config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrMempoolSizeMb(); + BaseMemPool::set_limit(pool_limit * 1024 * 1024); } TupleCache::~TupleCache() { From abea74b6db33e7e47a53488ca4cb278c8143dfc2 Mon Sep 17 00:00:00 2001 From: Cong Wang Date: Wed, 30 Nov 2016 15:00:48 -0800 Subject: [PATCH 11/11] Use google::protobuf::Message as the base type for all --- heron/common/src/cpp/network/mempool.cpp | 2 ++ heron/stmgr/src/cpp/util/tuple-cache.cpp | 9 --------- heron/stmgr/src/cpp/util/tuple-cache.h | 2 +- 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/heron/common/src/cpp/network/mempool.cpp b/heron/common/src/cpp/network/mempool.cpp index e875539196a..d8e2718fcc9 100644 --- a/heron/common/src/cpp/network/mempool.cpp +++ b/heron/common/src/cpp/network/mempool.cpp @@ -17,6 +17,8 @@ #include "network/mempool.h" #include +// All types of BaseMemPool should be derived from google::protobuf::Message +// otherwise this would not work. template<> sp_int32 BaseMemPool::size_ = 0; diff --git a/heron/stmgr/src/cpp/util/tuple-cache.cpp b/heron/stmgr/src/cpp/util/tuple-cache.cpp index 47271b4b518..0773586ebce 100644 --- a/heron/stmgr/src/cpp/util/tuple-cache.cpp +++ b/heron/stmgr/src/cpp/util/tuple-cache.cpp @@ -28,11 +28,6 @@ namespace heron { namespace stmgr { -template<> -sp_int32 BaseMemPool::size_ = 0; -template<> -sp_int32 BaseMemPool::limit_ = 0; - TupleCache::TupleCache(EventLoop* eventLoop, sp_uint32 _drain_threshold) : eventLoop_(eventLoop), drain_threshold_bytes_(_drain_threshold) { cache_drain_frequency_ms_ = @@ -43,10 +38,6 @@ TupleCache::TupleCache(EventLoop* eventLoop, sp_uint32 _drain_threshold) total_size_ = 0; auto drain_cb = [this](EventLoop::Status status) { this->drain(status); }; eventLoop_->registerTimer(std::move(drain_cb), true, cache_drain_frequency_ms_ * 1000); - - sp_int32 pool_limit = - config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrMempoolSizeMb(); - BaseMemPool::set_limit(pool_limit * 1024 * 1024); } TupleCache::~TupleCache() { diff --git a/heron/stmgr/src/cpp/util/tuple-cache.h b/heron/stmgr/src/cpp/util/tuple-cache.h index e282a5e527b..81a70fc08a1 100644 --- a/heron/stmgr/src/cpp/util/tuple-cache.h +++ b/heron/stmgr/src/cpp/util/tuple-cache.h @@ -89,7 +89,7 @@ class TupleCache { } private: - BaseMemPool heron_tuple_set_pool_; + BaseMemPool heron_tuple_set_pool_; std::deque tuples_; proto::system::HeronTupleSet2* current_; sp_uint64 current_size_;