17 #include <arpa/inet.h> 
 20 #include <netinet/in.h> 
 21 #include <sys/ioctl.h> 
 22 #include <sys/socket.h> 
 32 #include <system_error>  
 33 #include <unordered_map> 
 44 #define IS_MINGW() defined(__MINGW32__) 
 46 #if IS_MINGW() && !defined(POLLRDNORM) && !defined(POLLRDBAND) 
 61 #pragma message("mingw 不支持分布式训练。") 
 62 typedef struct pollfd {
 
 66 } WSAPOLLFD, *PWSAPOLLFD, *LPWSAPOLLFD;
 
 69 #define POLLIN (0x0100 | 0x0200) 
 70 #define POLLPRI 0x0400 
 72 #define POLLOUT 0x0010 
 79 template <
typename PollFD>
 
 80 int PollImpl(PollFD* pfd, 
int nfds, std::chrono::seconds timeout) noexcept(
true) {
 
 86  xgboost::MingWError();
 
 89  return WSAPoll(pfd, nfds, std::chrono::milliseconds(timeout).count());
 
 93  return poll(pfd, nfds, timeout.count() < 0 ? -1 : std::chrono::milliseconds(timeout).count());
 
 99  if ((revents & POLLERR) != 0) {
 
 101  auto str = strerror(err);
 
 104  " 代码:" + std::to_string(err));
 
 106  if ((revents & POLLNVAL) != 0) {
 
 109  if ((revents & POLLHUP) != 0) {
 
 121 #if defined(POLLRDHUP) 
 123  if ((revents & POLLRDHUP) != 0) {
 
 140  pfd.events |= POLLIN;
 
 151  pfd.events |= POLLOUT;
 
 164  pfd.events |= POLLPRI;
 
 174  const auto& pfd = 
fds.find(fd);
 
 175  return pfd != 
fds.end() && ((pfd->second.events & POLLIN) != 0);
 
 186  const auto& pfd = 
fds.find(fd);
 
 187  return pfd != 
fds.end() && ((pfd->second.events & POLLOUT) != 0);
 
 198  bool check_error = 
true) {
 
 199  std::vector<pollfd> fdset;
 
 200  fdset.reserve(
fds.size());
 
 201  for (
auto kv : 
fds) {
 
 202  fdset.push_back(kv.second);
 
 204  std::int32_t ret = 
PollImpl(fdset.data(), fdset.size(), timeout);
 
 207  "轮询超时:" + std::to_string(timeout.count()) + 
" 秒。",
 
 208  std::make_error_code(std::errc::timed_out));
 
 209  } 
else if (ret < 0) {
 
 213  for (
auto& pfd : fdset) {
 
 215  if (check_error && !result.OK()) {
 
 219  auto revents = pfd.revents & pfd.events;
 
 220  fds[pfd.fd].events = revents;
 
 225  std::unordered_map<SOCKET, pollfd> 
fds;
 
 230 #if IS_MINGW() && !defined(POLLRDNORM) && !defined(POLLRDBAND) 
用于简单通信的 TCP 套接字。
定义: socket.h:267
HandleT const & Handle() const
返回原生套接字文件描述符。
定义: socket.h:539
int PollImpl(PollFD *pfd, int nfds, std::chrono::seconds timeout) noexcept(true)
定义: poll_utils.h:80
std::enable_if_t< std::is_integral_v< E >, xgboost::collective::Result > PollError(E const &revents)
定义: poll_utils.h:98
auto Fail(std::string msg, char const *file=__builtin_FILE(), std::int32_t line=__builtin_LINE())
返回失败。
定义: result.h:124
auto Success() noexcept(true)
返回成功。
定义: result.h:120
collective::Result FailWithCode(std::string msg)
定义: socket.h:78
int SOCKET
定义: poll_utils.h:40
size_t sock_size_t
定义: poll_utils.h:41
用于执行轮询的辅助数据结构
定义: poll_utils.h:131
void WatchException(SOCKET fd)
添加文件描述符以监视异常
定义: poll_utils.h:161
bool CheckWrite(xgboost::collective::TCPSocket const &socket) const
定义: poll_utils.h:189
void WatchRead(xgboost::collective::TCPSocket const &socket)
定义: poll_utils.h:142
xgboost::collective::Result Poll(std::chrono::seconds timeout, bool check_error=true)
对定义的集合执行轮询,包括读、写、异常
定义: poll_utils.h:197
void WatchWrite(xgboost::collective::TCPSocket const &socket)
定义: poll_utils.h:153
bool CheckRead(SOCKET fd) const
检查描述符是否准备好读取。
定义: poll_utils.h:173
void WatchException(xgboost::collective::TCPSocket const &socket)
定义: poll_utils.h:166
bool CheckWrite(SOCKET fd) const
检查描述符是否准备好写入。
定义: poll_utils.h:185
void WatchWrite(SOCKET fd)
添加文件描述符以监视写入
定义: poll_utils.h:148
bool CheckRead(xgboost::collective::TCPSocket const &socket) const
定义: poll_utils.h:177
std::unordered_map< SOCKET, pollfd > fds
定义: poll_utils.h:225
void WatchRead(SOCKET fd)
添加文件描述符以监视读取
定义: poll_utils.h:137
一种比抛出 dmlc 异常更容易处理的错误类型。我们可以记录并传播 s...
定义: result.h:67