diff --git a/core/accessor_store.hpp b/core/accessor_store.hpp index 47cd338..edd02e2 100644 --- a/core/accessor_store.hpp +++ b/core/accessor_store.hpp @@ -38,18 +38,15 @@ class AccessorStore { template static std::vector>* create_accessor(size_t channel_id, size_t local_id, size_t num_local_threads) { - // double-checked locking + std::lock_guard lock(accessors_map_mutex); if (accessors_map.find(channel_id) == accessors_map.end()) { - std::lock_guard lock(accessors_map_mutex); - if (accessors_map.find(channel_id) == accessors_map.end()) { - AccessorSet* accessor_set = new AccessorSet(); - accessor_set->data.resize(num_local_threads); - for (auto& i : accessor_set->data) { - i.init(num_local_threads); - } - AccessorStore::num_local_threads.insert(std::make_pair(channel_id, num_local_threads)); - accessors_map.insert(std::make_pair(channel_id, accessor_set)); + AccessorSet* accessor_set = new AccessorSet(); + accessor_set->data.resize(num_local_threads); + for (auto& i : accessor_set->data) { + i.init(num_local_threads); } + AccessorStore::num_local_threads.insert({channel_id, num_local_threads}); + accessors_map.insert({channel_id, accessor_set}); } auto& data = dynamic_cast*>(accessors_map[channel_id])->data; // data[local_id].init(); diff --git a/core/channel/broadcast_channel.hpp b/core/channel/broadcast_channel.hpp index cf44ed8..b9c0a2c 100644 --- a/core/channel/broadcast_channel.hpp +++ b/core/channel/broadcast_channel.hpp @@ -34,17 +34,13 @@ using base::BinStream; template class BroadcastChannel : public ChannelBase { public: - explicit BroadcastChannel(ChannelSource* src) : src_ptr_(src) { - // TODO(yuzhen): Should be careful, maybe need to deregister every time? - src_ptr_->register_outchannel(channel_id_, this); - } + BroadcastChannel() = default; ~BroadcastChannel() override { // Make sure to invoke inc_progress_ before destructor if (need_leave_accessor_) leave_accessor(); AccessorStore::remove_accessor(channel_id_); - src_ptr_->deregister_outchannel(channel_id_); } BroadcastChannel(const BroadcastChannel&) = delete; @@ -53,7 +49,7 @@ class BroadcastChannel : public ChannelBase { BroadcastChannel(BroadcastChannel&&) = default; BroadcastChannel& operator=(BroadcastChannel&&) = default; - void customized_setup() override { + void buffer_accessor_setup() { broadcast_buffer_.resize(worker_info_->get_largest_tid() + 1); accessor_ = AccessorStore::create_accessor>( channel_id_, local_id_, worker_info_->get_num_local_workers()); @@ -92,17 +88,9 @@ class BroadcastChannel : public ChannelBase { void set_clear_dict(bool clear) { clear_dict_each_progress_ = clear; } - void prepare() override {} - - void in(BinStream& bin) override {} - - void out() override { - flush(); - prepare_broadcast(); - } + std::unordered_map& get_local_dict() { return (*accessor_)[local_id_].storage(); } - /// This method is only useful without list_execute - void flush() { + void send() override { this->inc_progress(); int start = global_id_; for (int i = 0; i < broadcast_buffer_.size(); ++i) { @@ -116,17 +104,17 @@ class BroadcastChannel : public ChannelBase { this->worker_info_->get_pids()); } - /// This method is only useful without list_execute - void prepare_broadcast() { + void recv() override { // Check whether need to leave accessor_ (last round's accessor_) if (need_leave_accessor_) leave_accessor(); need_leave_accessor_ = true; - auto& local_dict = (*accessor_)[local_id_].storage(); while (mailbox_->poll(channel_id_, progress_)) { auto bin = mailbox_->recv(channel_id_, progress_); - process_bin(bin, local_dict); + if (bin_stream_processor_ != nullptr) { + bin_stream_processor_(&bin); + } } (*accessor_)[local_id_].commit(); } @@ -144,17 +132,6 @@ class BroadcastChannel : public ChannelBase { } } - void process_bin(BinStream& bin, std::unordered_map& local_dict) { - while (bin.size() != 0) { - KeyT key; - ValueT value; - bin >> key >> value; - local_dict[key] = value; - } - } - - ChannelSource* src_ptr_; - bool clear_dict_each_progress_ = false; bool need_leave_accessor_ = false; std::vector broadcast_buffer_; diff --git a/core/channel/broadcast_channel_unittest.cpp b/core/channel/broadcast_channel_unittest.cpp index 8dc58b2..d5d1beb 100644 --- a/core/channel/broadcast_channel_unittest.cpp +++ b/core/channel/broadcast_channel_unittest.cpp @@ -35,10 +35,19 @@ class Obj { }; // Create broadcast without setting -template -BroadcastChannel create_broadcast_channel(ChannelSource& src_list) { - BroadcastChannel broadcast_channel(&src_list); - return broadcast_channel; +template +BroadcastChannel create_broadcast_channel() { + auto ch = BroadcastChannel(); + ch.set_bin_stream_processor([&](base::BinStream* bin_stream) { + auto& local_dict = ch.get_local_dict(); + while (bin_stream->size() != 0) { + KeyT key; + MsgT value; + *bin_stream >> key >> value; + local_dict[key] = value; + } + }); + return ch; } TEST_F(TestBroadcastChannel, Create) { @@ -64,8 +73,10 @@ TEST_F(TestBroadcastChannel, Create) { ObjList src_list; // BroadcastChannel - auto broadcast_channel = create_broadcast_channel(src_list); + auto broadcast_channel = create_broadcast_channel(); + broadcast_channel.setup(0, 0, workerinfo, &mailbox); + broadcast_channel.buffer_accessor_setup(); } TEST_F(TestBroadcastChannel, Broadcast) { @@ -91,34 +102,36 @@ TEST_F(TestBroadcastChannel, Broadcast) { ObjList src_list; // BroadcastChannel - auto broadcast_channel = create_broadcast_channel(src_list); + auto broadcast_channel = create_broadcast_channel(); + broadcast_channel.setup(0, 0, workerinfo, &mailbox); + broadcast_channel.buffer_accessor_setup(); // broadcast // Round 1 broadcast_channel.broadcast(23, "abc"); broadcast_channel.broadcast(45, "bbb"); - broadcast_channel.flush(); + broadcast_channel.out(); - broadcast_channel.prepare_broadcast(); + broadcast_channel.in(); EXPECT_EQ(broadcast_channel.get(23), "abc"); EXPECT_EQ(broadcast_channel.get(45), "bbb"); // Round 2 broadcast_channel.broadcast(23, "a"); broadcast_channel.broadcast(45, "b"); - broadcast_channel.flush(); + broadcast_channel.out(); - broadcast_channel.prepare_broadcast(); + broadcast_channel.in(); EXPECT_EQ(broadcast_channel.get(23), "a"); EXPECT_EQ(broadcast_channel.get(45), "b"); // Round 3 broadcast_channel.broadcast(23, "c"); broadcast_channel.broadcast(45, "d"); - broadcast_channel.flush(); + broadcast_channel.out(); - broadcast_channel.prepare_broadcast(); + broadcast_channel.in(); EXPECT_EQ(broadcast_channel.get(23), "c"); EXPECT_EQ(broadcast_channel.get(45), "d"); } @@ -146,28 +159,30 @@ TEST_F(TestBroadcastChannel, BroadcastClearDict) { ObjList src_list; // BroadcastChannel - auto broadcast_channel = create_broadcast_channel(src_list); + auto broadcast_channel = create_broadcast_channel(); + broadcast_channel.setup(0, 0, workerinfo, &mailbox); + broadcast_channel.buffer_accessor_setup(); // broadcast // Round 1 broadcast_channel.broadcast(23, "abc"); - broadcast_channel.flush(); + broadcast_channel.out(); - broadcast_channel.prepare_broadcast(); + broadcast_channel.in(); EXPECT_EQ(broadcast_channel.get(23), "abc"); // Round 2 - broadcast_channel.flush(); - broadcast_channel.prepare_broadcast(); + broadcast_channel.out(); + broadcast_channel.in(); EXPECT_EQ(broadcast_channel.get(23), "abc"); // Last round result remain valid // set clear dict broadcast_channel.set_clear_dict(true); // Round 3 - broadcast_channel.flush(); - broadcast_channel.prepare_broadcast(); + broadcast_channel.out(); + broadcast_channel.in(); EXPECT_EQ(broadcast_channel.find(23), false); // Last round result is invalid } @@ -201,16 +216,17 @@ TEST_F(TestBroadcastChannel, MultiThread) { // ObjList Setup ObjList src_list; - // BroacastChannel - auto broadcast_channel = create_broadcast_channel(src_list); + // BroadcastChannel + auto broadcast_channel = create_broadcast_channel(); broadcast_channel.setup(0, 0, workerinfo, &mailbox_0); + broadcast_channel.buffer_accessor_setup(); // broadcast // Round 1 broadcast_channel.broadcast(23, "abc"); - broadcast_channel.flush(); + broadcast_channel.out(); - broadcast_channel.prepare_broadcast(); + broadcast_channel.in(); EXPECT_EQ(broadcast_channel.get(23), "abc"); EXPECT_EQ(broadcast_channel.get(12), "ddd"); }); @@ -218,16 +234,17 @@ TEST_F(TestBroadcastChannel, MultiThread) { // ObjList Setup ObjList src_list; - // BroacastChannel - auto broadcast_channel = create_broadcast_channel(src_list); + // BroadcastChannel + auto broadcast_channel = create_broadcast_channel(); broadcast_channel.setup(1, 1, workerinfo, &mailbox_1); + broadcast_channel.buffer_accessor_setup(); // broadcast // Round 1 broadcast_channel.broadcast(12, "ddd"); - broadcast_channel.flush(); + broadcast_channel.out(); - broadcast_channel.prepare_broadcast(); + broadcast_channel.in(); EXPECT_EQ(broadcast_channel.get(23), "abc"); EXPECT_EQ(broadcast_channel.get(12), "ddd"); }); diff --git a/core/channel/channel_base.cpp b/core/channel/channel_base.cpp index 033e3fe..638914c 100644 --- a/core/channel/channel_base.cpp +++ b/core/channel/channel_base.cpp @@ -22,12 +22,15 @@ namespace husky { thread_local int ChannelBase::max_channel_id_ = 0; -ChannelBase::ChannelBase() : channel_id_(max_channel_id_), progress_(0) { - max_channel_id_ += 1; -} +ChannelBase::ChannelBase() : channel_id_(max_channel_id_), progress_(0) { max_channel_id_ += 1; } -void ChannelBase::inc_progress() { - progress_ += 1; +void ChannelBase::setup(size_t local_id, size_t global_id, const WorkerInfo& worker_info, LocalMailbox* mailbox) { + set_local_id(local_id); + set_global_id(global_id); + set_worker_info(worker_info); + set_mailbox(mailbox); } +void ChannelBase::inc_progress() { progress_ += 1; } + } // namespace husky diff --git a/core/channel/channel_base.hpp b/core/channel/channel_base.hpp index 2ad526b..1c241fa 100644 --- a/core/channel/channel_base.hpp +++ b/core/channel/channel_base.hpp @@ -43,6 +43,9 @@ class ChannelBase { virtual void set_worker_info(const WorkerInfo& worker_info) { worker_info_.reset(new WorkerInfo(worker_info)); } void set_mailbox(LocalMailbox* mailbox) { mailbox_ = mailbox; } + // Setup API for unit test + void setup(size_t local_id, size_t global_id, const WorkerInfo& worker_info, LocalMailbox* mailbox); + // Top-level APIs virtual void in() { diff --git a/core/channel/channel_store.hpp b/core/channel/channel_store.hpp index 2b84571..b6f0da0 100644 --- a/core/channel/channel_store.hpp +++ b/core/channel/channel_store.hpp @@ -110,19 +110,41 @@ class ChannelStore : public ChannelStoreBase { // Create MigrateChannel template - static MigrateChannel& create_migrate_channel(ObjList& src_list, ObjList& dst_list, - const std::string& name = "") { - auto& ch = ChannelStoreBase::create_migrate_channel(src_list, dst_list, name); - setup(ch); + static auto* create_migrate_channel(ObjList* src_list, ObjList* dst_list, + const std::string& name = "") { + auto* ch = ChannelStoreBase::create_migrate_channel(*src_list, *dst_list, name); + common_setup(ch); + ch->set_obj_list(src_list); + ch->buffer_setup(); + ch->set_bin_stream_processor([=](base::BinStream* bin_stream) { + while (bin_stream->size() != 0) { + ObjT obj; + *bin_stream >> obj; + auto idx = dst_list->add_object(std::move(obj)); + dst_list->process_attribute(*bin_stream, idx); + } + if (dst_list->get_num_del() * 2 > dst_list->get_vector_size()) + dst_list->deletion_finalize(); + }); return ch; } // Create BroadcastChannel template - static BroadcastChannel& create_broadcast_channel(ChannelSource& src_list, - const std::string& name = "") { - auto& ch = ChannelStoreBase::create_broadcast_channel(src_list, name); - setup(ch); + static BroadcastChannel& create_broadcast_channel(const std::string& name = "") { + auto* ch = ChannelStoreBase::create_broadcast_channel(name); + common_setup(ch); + ch->buffer_accessor_setup(); + auto& local_dict = ch->get_local_dict(); + ch->set_bin_stream_processor([=](base::BinStream* bin_stream) { + auto& local_dict = ch->get_local_dict(); + while (bin_stream->size() != 0) { + KeyT key; + MsgT value; + *bin_stream >> key >> value; + local_dict[key] = value; + } + }); return ch; } diff --git a/core/channel/channel_store_base.hpp b/core/channel/channel_store_base.hpp index a9b5649..c024c5c 100644 --- a/core/channel/channel_store_base.hpp +++ b/core/channel/channel_store_base.hpp @@ -1,3 +1,4 @@ + // Copyright 2016 Husky Team // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -94,34 +95,43 @@ class ChannelStoreBase { // Create MigrateChannel template - static MigrateChannel& create_migrate_channel(ObjList& src_list, ObjList& dst_list, - const std::string& name = "") { + static auto* create_migrate_channel(ObjList& src_list, ObjList& dst_list, + const std::string& name = "") { std::string channel_name = name.empty() ? channel_name_prefix + std::to_string(default_channel_id++) : name; - ASSERT_MSG(channel_map.find(channel_name) == channel_map.end(), - "ChannelStoreBase::create_channel: Channel name already exists"); - auto* migrate_channel = new MigrateChannel(&src_list, &dst_list); + if (channel_map.find(channel_name) != channel_map.end()) + throw base::HuskyException("ChannelStoreBase::create_channel: Channel name already exists"); + auto* migrate_channel = new MigrateChannel(); channel_map.insert({channel_name, migrate_channel}); - return *migrate_channel; + return migrate_channel; } template - static MigrateChannel& get_migrate_channel(const std::string& name = "") { - ASSERT_MSG(channel_map.find(name) != channel_map.end(), - "ChannelStoreBase::get_channel: Channel name doesn't exist"); + static auto& get_migrate_channel(const std::string& name = "") { + if (channel_map.find(name) != channel_map.end()) + throw base::HuskyException("ChannelStoreBase::get_channel: Channel name doesn't exist"); auto* channel = channel_map[name]; - return *dynamic_cast*>(channel); + return *static_cast*>(channel); } // Create BroadcastChannel template - static BroadcastChannel& create_broadcast_channel(ChannelSource& src_list, - const std::string& name = "") { + static auto* create_broadcast_channel(const std::string& name = "") { std::string channel_name = name.empty() ? channel_name_prefix + std::to_string(default_channel_id++) : name; - ASSERT_MSG(channel_map.find(channel_name) == channel_map.end(), - "ChannelStoreBase::create_channel: Channel name already exists"); - auto* broadcast_channel = new BroadcastChannel(&src_list); + if (channel_map.find(channel_name) != channel_map.end()) { + throw base::HuskyException("ChannelStoreBase::create_channel: Channel name already exists"); + } + auto* broadcast_channel = new BroadcastChannel(); channel_map.insert({channel_name, broadcast_channel}); - return *broadcast_channel; + return broadcast_channel; + } + + template + static auto& get_broadcast_channel(const std::string& name = "") { + if (channel_map.find(name) == channel_map.end()) { + throw base::HuskyException("ChannelStoreBase::get_channel: Channel name doesn't exist"); + } + auto* channel = channel_map[name]; + return *static_cast*>(channel); } // Create AsyncPushChannel @@ -164,14 +174,6 @@ class ChannelStoreBase { return *dynamic_cast*>(channel); } - template - static BroadcastChannel& get_broadcast_channel(const std::string& name = "") { - ASSERT_MSG(channel_map.find(name) != channel_map.end(), - "ChannelStoreBase::get_channel: Channel name doesn't exist"); - auto* channel = channel_map[name]; - return *dynamic_cast*>(channel); - } - static void drop_channel(const std::string& name) { ASSERT_MSG(channel_map.find(name) != channel_map.end(), "ChannelStoreBase::drop_channel: Channel name doesn't exist"); diff --git a/core/channel/migrate_channel.hpp b/core/channel/migrate_channel.hpp index a6ccd48..8a2ba16 100644 --- a/core/channel/migrate_channel.hpp +++ b/core/channel/migrate_channel.hpp @@ -29,17 +29,9 @@ namespace husky { using base::BinStream; template -class MigrateChannel : public ObjList2ObjListChannel { +class MigrateChannel : public ChannelBase { public: - MigrateChannel(ObjList* src, ObjList* dst) : ObjList2ObjListChannel(src, dst) { - this->src_ptr_->register_outchannel(this->channel_id_, this); - this->dst_ptr_->register_inchannel(this->channel_id_, this); - } - - ~MigrateChannel() override { - this->src_ptr_->deregister_outchannel(this->channel_id_); - this->dst_ptr_->deregister_inchannel(this->channel_id_); - } + MigrateChannel() = default; MigrateChannel(const MigrateChannel&) = delete; MigrateChannel& operator=(const MigrateChannel&) = delete; @@ -47,22 +39,17 @@ class MigrateChannel : public ObjList2ObjListChannel { MigrateChannel(MigrateChannel&&) = default; MigrateChannel& operator=(MigrateChannel&&) = default; - void customized_setup() override { migrate_buffer_.resize(this->worker_info_->get_largest_tid() + 1); } + void buffer_setup() { migrate_buffer_.resize(this->worker_info_->get_largest_tid() + 1); } void migrate(ObjT& obj, int dst_thread_id) { - auto idx = this->src_ptr_->delete_object(&obj); + auto idx = this->obj_list_ptr_->delete_object(&obj); migrate_buffer_[dst_thread_id] << obj; - this->src_ptr_->migrate_attribute(migrate_buffer_[dst_thread_id], idx); + this->obj_list_ptr_->migrate_attribute(migrate_buffer_[dst_thread_id], idx); } - void prepare() override {} + void set_obj_list(ObjList* obj_list_ptr) { obj_list_ptr_ = obj_list_ptr; } - void in(BinStream& bin) override { process_bin(bin); } - - void out() override { flush(); } - - /// This method is only useful without list_execute - void flush() { + void send() override { this->inc_progress(); int start = this->global_id_; for (int i = 0; i < migrate_buffer_.size(); ++i) { @@ -76,33 +63,8 @@ class MigrateChannel : public ObjList2ObjListChannel { this->worker_info_->get_pids()); } - /// This method is only useful without list_execute - void prepare_immigrants() { - if (!this->is_flushed()) - return; - // process immigrants - while (this->mailbox_->poll(this->channel_id_, this->progress_)) { - auto bin_push = this->mailbox_->recv(this->channel_id_, this->progress_); - process_bin(bin_push); - } - // TODO(yuzhen): Should I put sort here or other place - // object insertion finalize - // dst_ptr->sort(); - this->reset_flushed(); - } - protected: - void process_bin(BinStream& bin_push) { - while (bin_push.size() != 0) { - ObjT obj; - bin_push >> obj; - auto idx = this->dst_ptr_->add_object(std::move(obj)); - this->dst_ptr_->process_attribute(bin_push, idx); - } - if (this->dst_ptr_->get_num_del() * 2 > this->dst_ptr_->get_vector_size()) - this->dst_ptr_->deletion_finalize(); - } - + ObjList* obj_list_ptr_; std::vector migrate_buffer_; }; diff --git a/core/channel/migrate_channel_unittest.cpp b/core/channel/migrate_channel_unittest.cpp index 2e69ad5..4fd5fbd 100644 --- a/core/channel/migrate_channel_unittest.cpp +++ b/core/channel/migrate_channel_unittest.cpp @@ -57,9 +57,20 @@ class Attr { // Create MigrateChannel without setting, for setup template -MigrateChannel create_migrate_channel(ObjList& src_list, ObjList& dst_list) { - MigrateChannel migrate_channel(&src_list, &dst_list); - return migrate_channel; +MigrateChannel create_migrate_channel(ObjList* src_list, ObjList* dst_list) { + auto ch = MigrateChannel(); + ch.set_obj_list(src_list); + ch.set_bin_stream_processor([=](base::BinStream* bin_stream) { + while (bin_stream->size() != 0) { + ObjT obj; + *bin_stream >> obj; + auto idx = dst_list->add_object(std::move(obj)); + dst_list->process_attribute(*bin_stream, idx); + } + if (dst_list->get_num_del() * 2 > dst_list->get_vector_size()) + dst_list->deletion_finalize(); + }); + return ch; } TEST_F(TestMigrateChannel, Create) { @@ -86,8 +97,9 @@ TEST_F(TestMigrateChannel, Create) { ObjList dst_list; // MigrateChannel - auto migrate_channel = create_migrate_channel(src_list, dst_list); + auto migrate_channel = create_migrate_channel(&src_list, &dst_list); migrate_channel.setup(0, 0, workerinfo, &mailbox); + migrate_channel.buffer_setup(); } TEST_F(TestMigrateChannel, MigrateOther) { @@ -118,14 +130,15 @@ TEST_F(TestMigrateChannel, MigrateOther) { src_list.add_object(Obj(57)); // MigrateChannel - auto migrate_channel = create_migrate_channel(src_list, dst_list); + auto migrate_channel = create_migrate_channel(&src_list, &dst_list); migrate_channel.setup(0, 0, workerinfo, &mailbox); + migrate_channel.buffer_setup(); // migrate Obj* p = src_list.find(18); migrate_channel.migrate(*p, 0); // migrate Obj(18) to thread 0 - migrate_channel.flush(); + migrate_channel.out(); // migration done - migrate_channel.prepare_immigrants(); + migrate_channel.in(); Obj& obj = dst_list.get_data()[0]; EXPECT_EQ(obj.id(), 18); @@ -171,14 +184,15 @@ TEST_F(TestMigrateChannel, MigrateItself) { src_attr.set(idx, Attr("57")); // MigrateChannel - auto migrate_channel = create_migrate_channel(src_list, dst_list); + auto migrate_channel = create_migrate_channel(&src_list, &dst_list); migrate_channel.setup(0, 0, workerinfo, &mailbox); + migrate_channel.buffer_setup(); // migrate Obj* p = src_list.find(18); migrate_channel.migrate(*p, 0); // migrate Obj(18) to thread 0 - migrate_channel.flush(); + migrate_channel.out(); // migration done - migrate_channel.prepare_immigrants(); + migrate_channel.in(); Obj& obj = dst_list.get_data()[0]; auto& dst_int = dst_list.get_attrlist("int"); auto& dst_attr = dst_list.get_attrlist("attr"); @@ -229,14 +243,15 @@ TEST_F(TestMigrateChannel, MigrateOtherIncProgress) { // MigrateChannel // Round 1 - auto migrate_channel = create_migrate_channel(src_list, dst_list); + auto migrate_channel = create_migrate_channel(&src_list, &dst_list); migrate_channel.setup(0, 0, workerinfo, &mailbox); + migrate_channel.buffer_setup(); // migrate Obj* p = src_list.find(18); migrate_channel.migrate(*p, 0); // migrate Obj(18) to thread 0 - migrate_channel.flush(); + migrate_channel.out(); // migration done - migrate_channel.prepare_immigrants(); + migrate_channel.in(); Obj& obj = dst_list.get_data()[0]; auto& dst_int = dst_list.get_attrlist("int"); auto& dst_attr = dst_list.get_attrlist("attr"); @@ -251,9 +266,9 @@ TEST_F(TestMigrateChannel, MigrateOtherIncProgress) { // migrate p = src_list.find(100); migrate_channel.migrate(*p, 0); // migrate Obj(100) to thread 0 - migrate_channel.flush(); + migrate_channel.out(); // migration done - migrate_channel.prepare_immigrants(); + migrate_channel.in(); EXPECT_EQ(dst_list.get_size(), 2); EXPECT_EQ(src_list.get_size(), 1); diff --git a/core/executor.hpp b/core/executor.hpp index 53ff89d..4a54274 100644 --- a/core/executor.hpp +++ b/core/executor.hpp @@ -68,8 +68,8 @@ void balance(ObjList& obj_list, Algo algo) { } obj_list.deletion_finalize(); - migrate_channel.flush(); - migrate_channel.prepare_immigrants(); + migrate_channel.out(); + migrate_channel.in(); obj_list.sort(); ChannelStore::drop_channel("tmp_balance_broadcast"); @@ -93,8 +93,8 @@ void globalize(ObjList& obj_list) { migrate_channel.migrate(obj, dst_thread_id); } obj_list.deletion_finalize(); - migrate_channel.flush(); - migrate_channel.prepare_immigrants(); + migrate_channel.out(); + migrate_channel.in(); obj_list.sort(); ChannelStore::drop_channel("tmp_globalize");