Codebase list rabit / 03ec889
New upstream snapshot. Debian Janitor 2 years ago
12 changed file(s) with 86 addition(s) and 74 deletion(s). Raw diff Collapse all Expand all
00 # Rabit: Reliable Allreduce and Broadcast Interface
11 [![Build Status](https://travis-ci.org/dmlc/rabit.svg?branch=master)](https://travis-ci.org/dmlc/rabit)
22 [![Documentation Status](https://readthedocs.org/projects/rabit/badge/?version=latest)](http://rabit.readthedocs.org/)
3
4 ## Recent developments of Rabit have been moved into [dmlc/xgboost](https://github.com/dmlc/xgboost). See discussion in [dmlc/xgboost#5995](https://github.com/dmlc/xgboost/issues/5995).
35
46 rabit is a light weight library that provides a fault tolerant interface of Allreduce and Broadcast. It is designed to support easy implementations of distributed machine learning programs, many of which fall naturally under the Allreduce abstraction. The goal of rabit is to support ***portable*** , ***scalable*** and ***reliable*** distributed machine learning programs.
57
0 rabit (0.0~git20201105.f307ace-1) UNRELEASED; urgency=low
1
2 * New upstream snapshot.
3
4 -- Debian Janitor <janitor@jelmer.uk> Thu, 19 Aug 2021 08:20:21 -0000
5
06 rabit (0.0~git20200628.74bf00a-2) unstable; urgency=medium
17
28 * Stop tracking C++ symbols.
6464 /*! \brief error message buffer length */
6565 const int kPrintBuffer = 1 << 12;
6666
67 /*! \brief we may want to keep the process alive when there are multiple workers
68 * co-locate in the same process */
69 extern bool STOP_PROCESS_ON_ERROR;
70
7167 /* \brief Case-insensitive string comparison */
7268 inline int CompareStringsCaseInsensitive(const char* s1, const char* s2) {
7369 #ifdef _MSC_VER
8884 * \param msg error message
8985 */
9086 inline void HandleAssertError(const char *msg) {
91 if (STOP_PROCESS_ON_ERROR) {
92 fprintf(stderr, "AssertError:%s, shutting down process\n", msg);
93 exit(-1);
94 } else {
95 fprintf(stderr, "AssertError:%s, rabit is configured to keep process running\n", msg);
96 throw dmlc::Error(msg);
97 }
87 fprintf(stderr,
88 "AssertError:%s, rabit is configured to keep process running\n", msg);
89 throw dmlc::Error(msg);
9890 }
9991 /*!
10092 * \brief handling of Check error, caused by inappropriate input
10193 * \param msg error message
10294 */
10395 inline void HandleCheckError(const char *msg) {
104 if (STOP_PROCESS_ON_ERROR) {
105 fprintf(stderr, "%s, shutting down process\n", msg);
106 exit(-1);
107 } else {
108 fprintf(stderr, "%s, rabit is configured to keep process running\n", msg);
109 throw dmlc::Error(msg);
110 }
96 fprintf(stderr, "%s, rabit is configured to keep process running\n", msg);
97 throw dmlc::Error(msg);
11198 }
11299 inline void HandlePrint(const char *msg) {
113100 printf("%s", msg);
55 * \author Tianqi Chen, Ignacio Cano, Tianyi Zhou
66 */
77 #define NOMINMAX
8 #include "allreduce_base.h"
89 #include <rabit/base.h>
910 #include <netinet/tcp.h>
1011 #include <cstring>
1112 #include <map>
12 #include "allreduce_base.h"
1313
1414 namespace rabit {
15
16 namespace utils {
17 bool STOP_PROCESS_ON_ERROR = true;
18 }
19
2015 namespace engine {
2116 // constructor
2217 AllreduceBase::AllreduceBase(void) {
3227 version_number = 0;
3328 // 32 K items
3429 reduce_ring_mincount = 32 << 10;
30 // 1M reducer size each time
31 tree_reduce_minsize = 1 << 20;
3532 // tracker URL
3633 task_id = "NULL";
3734 err_link = NULL;
4542 env_vars.push_back("DMLC_TRACKER_URI");
4643 env_vars.push_back("DMLC_TRACKER_PORT");
4744 env_vars.push_back("DMLC_WORKER_CONNECT_RETRY");
48 env_vars.push_back("DMLC_WORKER_STOP_PROCESS_ON_ERROR");
4945 }
5046
5147 // initialization function
186182 if (!strcmp(name, "DMLC_ROLE")) dmlc_role = val;
187183 if (!strcmp(name, "rabit_world_size")) world_size = atoi(val);
188184 if (!strcmp(name, "rabit_hadoop_mode")) hadoop_mode = utils::StringToBool(val);
185 if (!strcmp(name, "rabit_tree_reduce_minsize")) tree_reduce_minsize = atoi(val);
189186 if (!strcmp(name, "rabit_reduce_ring_mincount")) {
190187 reduce_ring_mincount = atoi(val);
191188 utils::Assert(reduce_ring_mincount > 0, "rabit_reduce_ring_mincount should be greater than 0");
195192 }
196193 if (!strcmp(name, "DMLC_WORKER_CONNECT_RETRY")) {
197194 connect_retry = atoi(val);
198 }
199 if (!strcmp(name, "DMLC_WORKER_STOP_PROCESS_ON_ERROR")) {
200 if (!strcmp(val, "true")) {
201 rabit::utils::STOP_PROCESS_ON_ERROR = true;
202 } else if (!strcmp(val, "false")) {
203 rabit::utils::STOP_PROCESS_ON_ERROR = false;
204 } else {
205 throw std::runtime_error("invalid value of DMLC_WORKER_STOP_PROCESS_ON_ERROR");
206 }
207195 }
208196 if (!strcmp(name, "rabit_bootstrap_cache")) {
209197 rabit_bootstrap_cache = utils::StringToBool(val);
503491 size_t size_up_out = 0;
504492 // size of message we received, and send in the down pass
505493 size_t size_down_in = 0;
494 // minimal size of each reducer
495 const size_t eachreduce = (tree_reduce_minsize / type_nbytes * type_nbytes);
496
506497 // initialize the link ring-buffer and pointer
507498 for (int i = 0; i < nlink; ++i) {
508499 if (i != parent_index) {
559550 // read data from childs
560551 for (int i = 0; i < nlink; ++i) {
561552 if (i != parent_index && watcher.CheckRead(links[i].sock)) {
562 ReturnType ret = links[i].ReadToRingBuffer(size_up_out, total_size);
563 if (ret != kSuccess) {
564 return ReportError(&links[i], ret);
553 // make sure to receive minimal reducer size
554 // since each child reduce and sends the minimal reducer size
555 while (links[i].size_read < total_size
556 && links[i].size_read - size_up_reduce < eachreduce) {
557 ReturnType ret = links[i].ReadToRingBuffer(size_up_out, total_size);
558 if (ret != kSuccess) {
559 return ReportError(&links[i], ret);
560 }
565561 }
566562 }
567563 }
581577 utils::Assert(buffer_size != 0, "must assign buffer_size");
582578 // round to type_n4bytes
583579 max_reduce = (max_reduce / type_nbytes * type_nbytes);
580
581 // if max reduce is less than total size, we reduce multiple times of
582 // eachreduce size
583 if (max_reduce < total_size)
584 max_reduce = max_reduce - max_reduce % eachreduce;
585
584586 // peform reduce, can be at most two rounds
585587 while (size_up_reduce < max_reduce) {
586588 // start position
604606 // pass message up to parent, can pass data that are already been reduced
605607 if (size_up_out < size_up_reduce) {
606608 ssize_t len = links[parent_index].sock.
607 Send(sendrecvbuf + size_up_out, size_up_reduce - size_up_out);
609 Send(sendrecvbuf + size_up_out, size_up_reduce - size_up_out);
608610 if (len != -1) {
609611 size_up_out += static_cast<size_t>(len);
610612 } else {
617619 // read data from parent
618620 if (watcher.CheckRead(links[parent_index].sock) &&
619621 total_size > size_down_in) {
620 ssize_t len = links[parent_index].sock.
621 Recv(sendrecvbuf + size_down_in, total_size - size_down_in);
622 if (len == 0) {
623 links[parent_index].sock.Close();
624 return ReportError(&links[parent_index], kRecvZeroLen);
625 }
626 if (len != -1) {
627 size_down_in += static_cast<size_t>(len);
628 utils::Assert(size_down_in <= size_up_out,
629 "Allreduce: boundary error");
630 } else {
631 ReturnType ret = Errno2Return();
632 if (ret != kSuccess) {
633 return ReportError(&links[parent_index], ret);
622 size_t left_size = total_size-size_down_in;
623 size_t reduce_size_min = std::min(left_size, eachreduce);
624 size_t recved = 0;
625 while (recved < reduce_size_min) {
626 ssize_t len = links[parent_index].sock.
627 Recv(sendrecvbuf + size_down_in, total_size - size_down_in);
628
629 if (len == 0) {
630 links[parent_index].sock.Close();
631 return ReportError(&links[parent_index], kRecvZeroLen);
632 }
633 if (len != -1) {
634 size_down_in += static_cast<size_t>(len);
635 utils::Assert(size_down_in <= size_up_out,
636 "Allreduce: boundary error");
637 recved+=len;
638
639 // if it receives more data than each reduce, it means the next block is sent.
640 // we double the reduce_size_min or add to left_size
641 while (recved > reduce_size_min) {
642 reduce_size_min += std::min(left_size-reduce_size_min, eachreduce);
643 }
644 } else {
645 ReturnType ret = Errno2Return();
646 if (ret != kSuccess) {
647 return ReportError(&links[parent_index], ret);
648 }
634649 }
635650 }
636651 }
564564 int reduce_method;
565565 // mininum count of cells to use ring based method
566566 size_t reduce_ring_mincount;
567 // minimul block size per tree reduce
568 size_t tree_reduce_minsize;
567569 // current rank
568570 int rank;
569571 // world size
166166 * \param size_prev_slice size of the previous slice i.e. slice of node (rank - 1) % world_size
167167 * \param _file caller file name used to generate unique cache key
168168 * \param _line caller line number used to generate unique cache key
169 * \param _caller caller function name used to generate unique cache key
170 */
169 * \param _caller caller function name used to generate unique cache key
170 */
171171 void AllreduceRobust::Allgather(void *sendrecvbuf,
172172 size_t total_size,
173173 size_t slice_begin,
517517 }
518518 // execute checkpoint, note: when checkpoint existing, load will not happen
519519 _assert(RecoverExec(NULL, 0, ActionSummary::kCheckPoint,
520 ActionSummary::kSpecialOp, cur_cache_seq),
521 "check point must return true");
520 ActionSummary::kSpecialOp, cur_cache_seq),
521 "check point must return true");
522522 // this is the critical region where we will change all the stored models
523523 // increase version number
524524 version_number += 1;
549549 delta = utils::GetTime() - start;
550550 // log checkpoint ack latency
551551 if (rabit_debug) {
552 utils::HandleLogInfo("[%d] checkpoint ack finished version %d, take %f seconds\n",
553 rank, version_number, delta);
552 utils::HandleLogInfo(
553 "[%d] checkpoint ack finished version %d, take %f seconds\n", rank,
554 version_number, delta);
554555 }
555556 }
556557 /*!
1111 #include "rabit/internal/engine.h"
1212
1313 namespace rabit {
14
15 namespace utils {
16 bool STOP_PROCESS_ON_ERROR = true;
17 }
18
1914 namespace engine {
2015 /*! \brief EmptyEngine */
2116 class EmptyEngine : public IEngine {
66 * \author Tianqi Chen
77 */
88 #define NOMINMAX
9 #include <mpi.h>
910 #include <rabit/base.h>
10 #include <mpi.h>
1111 #include <cstdio>
12 #include <string>
1213 #include "rabit/internal/engine.h"
1314 #include "rabit/internal/utils.h"
1415
1516 namespace rabit {
16
17 namespace utils {
18 bool STOP_PROCESS_ON_ERROR = true;
19 }
20
2117 namespace engine {
2218 /*! \brief implementation of engine using MPI */
2319 class MPIEngine : public IEngine {
22 add_executable(
33 unit_tests
44 test_io.cc
5 test_utils.cc
56 allreduce_robust_test.cc
67 allreduce_base_test.cc
78 allreduce_mock_test.cc
1616 char* argv[] = {cmd};
1717 m.Init(1, argv);
1818 m.rank = 0;
19 EXPECT_EXIT(m.Allreduce(nullptr,0,0,nullptr,nullptr,nullptr), ::testing::ExitedWithCode(255), "");
19 EXPECT_THROW(m.Allreduce(nullptr,0,0,nullptr,nullptr,nullptr), dmlc::Error);
2020 }
2121
2222 TEST(allreduce_mock, mock_broadcast)
3131 m.rank = 0;
3232 m.version_number=1;
3333 m.seq_counter=2;
34 EXPECT_EXIT(m.Broadcast(nullptr,0,0), ::testing::ExitedWithCode(255), "");
34 EXPECT_THROW(m.Broadcast(nullptr,0,0), dmlc::Error);
3535 }
22
33 #include <string>
44 #include <iostream>
5 #include <dmlc/logging.h>
56 #include "../../src/allreduce_mock.h"
67
78 TEST(allreduce_mock, mock_allreduce)
1617 char* argv[] = {cmd};
1718 m.Init(1, argv);
1819 m.rank = 0;
19 EXPECT_EXIT(m.Allreduce(nullptr,0,0,nullptr,nullptr,nullptr), ::testing::ExitedWithCode(255), "");
20 EXPECT_THROW({m.Allreduce(nullptr,0,0,nullptr,nullptr,nullptr);}, dmlc::Error);
2021 }
2122
2223 TEST(allreduce_mock, mock_broadcast)
3132 m.rank = 0;
3233 m.version_number=1;
3334 m.seq_counter=2;
34 EXPECT_EXIT(m.Broadcast(nullptr,0,0), ::testing::ExitedWithCode(255), "");
35 EXPECT_THROW({m.Broadcast(nullptr,0,0);}, dmlc::Error);
3536 }
3637
3738 TEST(allreduce_mock, mock_gather)
4647 m.rank = 3;
4748 m.version_number=13;
4849 m.seq_counter=22;
49 EXPECT_EXIT(m.Allgather(nullptr,0,0,0,0), ::testing::ExitedWithCode(255), "");
50 EXPECT_THROW({m.Allgather(nullptr,0,0,0,0);}, dmlc::Error);
5051 }
0 #include <gtest/gtest.h>
1 #include <rabit/internal/utils.h>
2
3 TEST(Utils, Assert) {
4 EXPECT_THROW({rabit::utils::Assert(false, "foo");}, dmlc::Error);
5 }