LCOV - code coverage report
Current view: top level - net - remoteconnection.cc (source / functions) Hit Total Coverage
Test: Test Coverage for xapian-core 954b5873a738 Lines: 173 232 74.6 %
Date: 2019-06-30 05:20:33 Functions: 14 15 93.3 %
Branches: 136 312 43.6 %

           Branch data     Line data    Source code
       1                 :            : /** @file  remoteconnection.cc
       2                 :            :  *  @brief RemoteConnection class used by the remote backend.
       3                 :            :  */
       4                 :            : /* Copyright (C) 2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2017 Olly Betts
       5                 :            :  *
       6                 :            :  * This program is free software; you can redistribute it and/or modify
       7                 :            :  * it under the terms of the GNU General Public License as published by
       8                 :            :  * the Free Software Foundation; either version 2 of the License, or
       9                 :            :  * (at your option) any later version.
      10                 :            :  *
      11                 :            :  * This program is distributed in the hope that it will be useful,
      12                 :            :  * but WITHOUT ANY WARRANTY; without even the implied warranty of
      13                 :            :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      14                 :            :  * GNU General Public License for more details.
      15                 :            :  *
      16                 :            :  * You should have received a copy of the GNU General Public License
      17                 :            :  * along with this program; if not, write to the Free Software
      18                 :            :  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
      19                 :            :  */
      20                 :            : 
      21                 :            : #include <config.h>
      22                 :            : 
      23                 :            : #include "remoteconnection.h"
      24                 :            : 
      25                 :            : #include <xapian/error.h>
      26                 :            : 
      27                 :            : #include "safefcntl.h"
      28                 :            : #include "safeunistd.h"
      29                 :            : 
      30                 :            : #ifdef HAVE_POLL_H
      31                 :            : # include <poll.h>
      32                 :            : #else
      33                 :            : # include "safesysselect.h"
      34                 :            : #endif
      35                 :            : 
      36                 :            : #include <algorithm>
      37                 :            : #include <cerrno>
      38                 :            : #include <climits>
      39                 :            : #include <cstdint>
      40                 :            : #include <string>
      41                 :            : #ifdef __WIN32__
      42                 :            : # include <type_traits>
      43                 :            : #endif
      44                 :            : 
      45                 :            : #include "debuglog.h"
      46                 :            : #include "fd.h"
      47                 :            : #include "filetests.h"
      48                 :            : #include "omassert.h"
      49                 :            : #include "overflow.h"
      50                 :            : #include "pack.h"
      51                 :            : #include "posixy_wrapper.h"
      52                 :            : #include "realtime.h"
      53                 :            : #include "socket_utils.h"
      54                 :            : 
      55                 :            : using namespace std;
      56                 :            : 
      57                 :            : #define CHUNKSIZE 4096
      58                 :            : 
      59                 :            : [[noreturn]]
      60                 :            : static void
      61                 :         98 : throw_database_closed()
      62                 :            : {
      63 [ +  - ][ +  - ]:         98 :     throw Xapian::DatabaseClosedError("Database has been closed");
                 [ +  - ]
      64                 :            : }
      65                 :            : 
      66                 :            : [[noreturn]]
      67                 :            : static void
      68                 :          0 : throw_network_error_insane_message_length()
      69                 :            : {
      70 [ #  # ][ #  # ]:          0 :     throw Xapian::NetworkError("Insane message length specified!");
                 [ #  # ]
      71                 :            : }
      72                 :            : 
      73                 :            : [[noreturn]]
      74                 :            : static void
      75                 :          2 : throw_timeout(const char* msg, const string& context)
      76                 :            : {
      77 [ +  - ][ +  - ]:          2 :     throw Xapian::NetworkTimeoutError(msg, context);
      78                 :            : }
      79                 :            : 
      80                 :            : #ifdef __WIN32__
      81                 :            : static inline void
      82                 :            : update_overlapped_offset(WSAOVERLAPPED & overlapped, DWORD n)
      83                 :            : {
      84                 :            :     if (add_overflows(overlapped.Offset, n, overlapped.Offset))
      85                 :            :         ++overlapped.OffsetHigh;
      86                 :            : }
      87                 :            : #endif
      88                 :            : 
      89                 :       1640 : RemoteConnection::RemoteConnection(int fdin_, int fdout_,
      90                 :            :                                    const string & context_)
      91         [ +  - ]:       1640 :     : fdin(fdin_), fdout(fdout_), context(context_)
      92                 :            : {
      93                 :            : #ifdef __WIN32__
      94                 :            :     memset(&overlapped, 0, sizeof(overlapped));
      95                 :            :     overlapped.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
      96                 :            :     if (!overlapped.hEvent)
      97                 :            :         throw Xapian::NetworkError("Failed to setup OVERLAPPED",
      98                 :            :                                    context, -int(GetLastError()));
      99                 :            : 
     100                 :            : #endif
     101                 :       1640 : }
     102                 :            : 
     103                 :            : #ifdef __WIN32__
     104                 :            : RemoteConnection::~RemoteConnection()
     105                 :            : {
     106                 :            :     if (overlapped.hEvent)
     107                 :            :         CloseHandle(overlapped.hEvent);
     108                 :            : }
     109                 :            : #endif
     110                 :            : 
     111                 :            : bool
     112                 :    5435588 : RemoteConnection::read_at_least(size_t min_len, double end_time)
     113                 :            : {
     114                 :            :     LOGCALL(REMOTE, bool, "RemoteConnection::read_at_least", min_len | end_time);
     115                 :            : 
     116         [ +  + ]:    5435588 :     if (buffer.length() >= min_len) return true;
     117                 :            : 
     118                 :            : #ifdef __WIN32__
     119                 :            :     HANDLE hin = fd_to_handle(fdin);
     120                 :            :     do {
     121                 :            :         char buf[CHUNKSIZE];
     122                 :            :         DWORD received;
     123                 :            :         BOOL ok = ReadFile(hin, buf, sizeof(buf), &received, &overlapped);
     124                 :            :         if (!ok) {
     125                 :            :             int errcode = GetLastError();
     126                 :            :             if (errcode != ERROR_IO_PENDING)
     127                 :            :                 throw Xapian::NetworkError("read failed", context, -errcode);
     128                 :            :             // Is asynch - just wait for the data to be received or a timeout.
     129                 :            :             DWORD waitrc;
     130                 :            :             waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
     131                 :            :             if (waitrc != WAIT_OBJECT_0) {
     132                 :            :                 LOGLINE(REMOTE, "read: timeout has expired");
     133                 :            :                 throw_timeout("Timeout expired while trying to read", context);
     134                 :            :             }
     135                 :            :             // Get the final result of the read.
     136                 :            :             if (!GetOverlappedResult(hin, &overlapped, &received, FALSE))
     137                 :            :                 throw Xapian::NetworkError("Failed to get overlapped result",
     138                 :            :                                            context, -int(GetLastError()));
     139                 :            :         }
     140                 :            : 
     141                 :            :         if (received == 0) {
     142                 :            :             return false;
     143                 :            :         }
     144                 :            : 
     145                 :            :         buffer.append(buf, received);
     146                 :            : 
     147                 :            :         // We must update the offset in the OVERLAPPED structure manually.
     148                 :            :         update_overlapped_offset(overlapped, received);
     149                 :            :     } while (buffer.length() < min_len);
     150                 :            : #else
     151                 :            :     // If there's no end_time, just use blocking I/O.
     152 [ +  + ][ -  + ]:    4493663 :     if (fcntl(fdin, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
     153                 :            :         throw Xapian::NetworkError("Failed to set fdin non-blocking-ness",
     154 [ #  # ][ #  # ]:          0 :                                    context, errno);
     155                 :            :     }
     156                 :            : 
     157                 :            :     while (true) {
     158                 :            :         char buf[CHUNKSIZE];
     159         [ +  - ]:    8129972 :         ssize_t received = read(fdin, buf, sizeof(buf));
     160                 :            : 
     161         [ +  + ]:    8129972 :         if (received > 0) {
     162         [ +  - ]:    4494449 :             buffer.append(buf, received);
     163         [ +  + ]:    4495093 :             if (buffer.length() >= min_len) return true;
     164                 :       1432 :             continue;
     165                 :            :         }
     166                 :            : 
     167         [ +  + ]:    3635523 :         if (received == 0) {
     168                 :        644 :             return false;
     169                 :            :         }
     170                 :            : 
     171                 :            :         LOGLINE(REMOTE, "read gave errno = " << errno);
     172         [ -  + ]:    3634879 :         if (errno == EINTR) continue;
     173                 :            : 
     174         [ -  + ]:    3634879 :         if (errno != EAGAIN)
     175 [ #  # ][ #  # ]:          0 :             throw Xapian::NetworkError("read failed", context, errno);
     176                 :            : 
     177                 :            :         Assert(end_time != 0.0);
     178                 :            :         while (true) {
     179                 :            :             // Calculate how far in the future end_time is.
     180                 :    3634879 :             double now = RealTime::now();
     181                 :    3634879 :             double time_diff = end_time - now;
     182                 :            :             // Check if the timeout has expired.
     183         [ -  + ]:    3634879 :             if (time_diff < 0) {
     184                 :            :                 LOGLINE(REMOTE, "read: timeout has expired");
     185                 :          0 :                 throw_timeout("Timeout expired while trying to read", context);
     186                 :            :             }
     187                 :            : 
     188                 :            :             // Wait until there is data, an error, or the timeout is reached.
     189                 :            : # ifdef HAVE_POLL
     190                 :            :             struct pollfd fds;
     191                 :    3634879 :             fds.fd = fdin;
     192                 :    3634879 :             fds.events = POLLIN;
     193         [ +  - ]:    3634879 :             int poll_result = poll(&fds, 1, int(time_diff * 1000));
     194         [ +  + ]:    3634879 :             if (poll_result > 0) break;
     195                 :            : 
     196         [ +  - ]:          2 :             if (poll_result == 0)
     197                 :          2 :                 throw_timeout("Timeout expired while trying to read", context);
     198                 :            : 
     199                 :            :             // EINTR means poll was interrupted by a signal.  EAGAIN means that
     200                 :            :             // allocation of internal data structures failed.
     201 [ #  # ][ #  # ]:          0 :             if (errno != EINTR && errno != EAGAIN)
     202                 :            :                 throw Xapian::NetworkError("poll failed during read",
     203 [ #  # ][ #  # ]:          2 :                                            context, errno);
     204                 :            : # else
     205                 :            :             if (fdin >= FD_SETSIZE) {
     206                 :            :                 // We can't block with a timeout, so just sleep and retry.
     207                 :            :                 RealTime::sleep(now + min(0.001, time_diff / 4));
     208                 :            :                 break;
     209                 :            :             }
     210                 :            :             fd_set fdset;
     211                 :            :             FD_ZERO(&fdset);
     212                 :            :             FD_SET(fdin, &fdset);
     213                 :            : 
     214                 :            :             struct timeval tv;
     215                 :            :             RealTime::to_timeval(time_diff, &tv);
     216                 :            :             int select_result = select(fdin + 1, &fdset, 0, 0, &tv);
     217                 :            :             if (select_result > 0) break;
     218                 :            : 
     219                 :            :             if (select_result == 0)
     220                 :            :                 throw_timeout("Timeout expired while trying to read", context);
     221                 :            : 
     222                 :            :             // EINTR means select was interrupted by a signal.  The Linux
     223                 :            :             // select(2) man page says: "Portable programs may wish to check
     224                 :            :             // for EAGAIN and loop, just as with EINTR" and that seems to be
     225                 :            :             // necessary for cygwin at least.
     226                 :            :             if (errno != EINTR && errno != EAGAIN)
     227                 :            :                 throw Xapian::NetworkError("select failed during read",
     228                 :            :                                            context, errno);
     229                 :            : # endif
     230                 :            :         }
     231                 :          0 :     }
     232                 :            : #endif
     233                 :    9071895 :     return true;
     234                 :            : }
     235                 :            : 
     236                 :            : void
     237                 :    2605050 : RemoteConnection::send_message(char type, const string &message,
     238                 :            :                                double end_time)
     239                 :            : {
     240                 :            :     LOGCALL_VOID(REMOTE, "RemoteConnection::send_message", type | message | end_time);
     241         [ +  + ]:    2605050 :     if (fdout == -1)
     242                 :         98 :         throw_database_closed();
     243                 :            : 
     244         [ +  - ]:    2604952 :     string header;
     245         [ +  - ]:    2604952 :     header += type;
     246         [ +  - ]:    2604952 :     pack_uint(header, message.size());
     247                 :            : 
     248                 :            : #ifdef __WIN32__
     249                 :            :     HANDLE hout = fd_to_handle(fdout);
     250                 :            :     const string * str = &header;
     251                 :            : 
     252                 :            :     size_t count = 0;
     253                 :            :     while (true) {
     254                 :            :         DWORD n;
     255                 :            :         BOOL ok = WriteFile(hout, str->data() + count, str->size() - count, &n, &overlapped);
     256                 :            :         if (!ok) {
     257                 :            :             int errcode = GetLastError();
     258                 :            :             if (errcode != ERROR_IO_PENDING)
     259                 :            :                 throw Xapian::NetworkError("write failed", context, -errcode);
     260                 :            :             // Just wait for the data to be sent, or a timeout.
     261                 :            :             DWORD waitrc;
     262                 :            :             waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
     263                 :            :             if (waitrc != WAIT_OBJECT_0) {
     264                 :            :                 LOGLINE(REMOTE, "write: timeout has expired");
     265                 :            :                 throw_timeout("Timeout expired while trying to write", context);
     266                 :            :             }
     267                 :            :             // Get the final result.
     268                 :            :             if (!GetOverlappedResult(hout, &overlapped, &n, FALSE))
     269                 :            :                 throw Xapian::NetworkError("Failed to get overlapped result",
     270                 :            :                                            context, -int(GetLastError()));
     271                 :            :         }
     272                 :            : 
     273                 :            :         count += n;
     274                 :            : 
     275                 :            :         // We must update the offset in the OVERLAPPED structure manually.
     276                 :            :         update_overlapped_offset(overlapped, n);
     277                 :            : 
     278                 :            :         if (count == str->size()) {
     279                 :            :             if (str == &message || message.empty()) return;
     280                 :            :             str = &message;
     281                 :            :             count = 0;
     282                 :            :         }
     283                 :            :     }
     284                 :            : #else
     285                 :            :     // If there's no end_time, just use blocking I/O.
     286 [ +  + ][ +  - ]:    2604952 :     if (fcntl(fdout, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
                 [ -  + ]
     287                 :            :         throw Xapian::NetworkError("Failed to set fdout non-blocking-ness",
     288 [ #  # ][ #  # ]:          0 :                                    context, errno);
     289                 :            :     }
     290                 :            : 
     291                 :    2604952 :     const string * str = &header;
     292                 :            : 
     293                 :    2604952 :     size_t count = 0;
     294                 :            :     while (true) {
     295                 :            :         // We've set write to non-blocking, so just try writing as there
     296                 :            :         // will usually be space.
     297         [ +  - ]:    4838303 :         ssize_t n = write(fdout, str->data() + count, str->size() - count);
     298                 :            : 
     299         [ +  + ]:    4838303 :         if (n >= 0) {
     300                 :    4838301 :             count += n;
     301         [ +  + ]:    4838301 :             if (count == str->size()) {
     302 [ +  + ][ +  + ]:    7443250 :                 if (str == &message || message.empty()) return;
                 [ +  + ]
     303                 :    2233350 :                 str = &message;
     304                 :    2233350 :                 count = 0;
     305                 :            :             }
     306                 :    2233351 :             continue;
     307                 :            :         }
     308                 :            : 
     309                 :            :         LOGLINE(REMOTE, "write gave errno = " << errno);
     310         [ -  + ]:          2 :         if (errno == EINTR) continue;
     311                 :            : 
     312         [ +  - ]:          2 :         if (errno != EAGAIN)
     313 [ +  - ][ +  - ]:          2 :             throw Xapian::NetworkError("write failed", context, errno);
     314                 :            : 
     315                 :          0 :         double now = RealTime::now();
     316                 :          0 :         double time_diff = end_time - now;
     317         [ #  # ]:          0 :         if (time_diff < 0) {
     318                 :            :             LOGLINE(REMOTE, "write: timeout has expired");
     319                 :          0 :             throw_timeout("Timeout expired while trying to write", context);
     320                 :            :         }
     321                 :            : 
     322                 :            :         // Wait until there is space or the timeout is reached.
     323                 :            : # ifdef HAVE_POLL
     324                 :            :         struct pollfd fds;
     325                 :          0 :         fds.fd = fdout;
     326                 :          0 :         fds.events = POLLOUT;
     327         [ #  # ]:          0 :         int result = poll(&fds, 1, int(time_diff * 1000));
     328                 :            : #  define POLLSELECT "poll"
     329                 :            : # else
     330                 :            :         if (fdout >= FD_SETSIZE) {
     331                 :            :             // We can't block with a timeout, so just sleep and retry.
     332                 :            :             RealTime::sleep(now + min(0.001, time_diff / 4));
     333                 :            :             continue;
     334                 :            :         }
     335                 :            : 
     336                 :            :         fd_set fdset;
     337                 :            :         FD_ZERO(&fdset);
     338                 :            :         FD_SET(fdout, &fdset);
     339                 :            : 
     340                 :            :         struct timeval tv;
     341                 :            :         RealTime::to_timeval(time_diff, &tv);
     342                 :            :         int result = select(fdout + 1, 0, &fdset, 0, &tv);
     343                 :            : #  define POLLSELECT "select"
     344                 :            : # endif
     345                 :            : 
     346         [ #  # ]:          0 :         if (result < 0) {
     347 [ #  # ][ #  # ]:          0 :             if (errno == EINTR || errno == EAGAIN) {
     348                 :            :                 // EINTR/EAGAIN means select was interrupted by a signal.
     349                 :            :                 // We could just retry the poll/select, but it's easier to just
     350                 :            :                 // retry the write.
     351                 :          0 :                 continue;
     352                 :            :             }
     353                 :            :             throw Xapian::NetworkError(POLLSELECT " failed during write",
     354 [ #  # ][ #  # ]:          2 :                                        context, errno);
     355                 :            : # undef POLLSELECT
     356                 :            :         }
     357                 :            : 
     358         [ #  # ]:          0 :         if (result == 0)
     359                 :          0 :             throw_timeout("Timeout expired while trying to write", context);
     360                 :    2604952 :     }
     361                 :            : #endif
     362                 :            : }
     363                 :            : 
     364                 :            : void
     365                 :         56 : RemoteConnection::send_file(char type, int fd, double end_time)
     366                 :            : {
     367                 :            :     LOGCALL_VOID(REMOTE, "RemoteConnection::send_file", type | fd | end_time);
     368         [ -  + ]:         56 :     if (fdout == -1)
     369                 :          0 :         throw_database_closed();
     370                 :            : 
     371                 :         56 :     off_t size = file_size(fd);
     372         [ -  + ]:         56 :     if (errno)
     373 [ #  # ][ #  # ]:          0 :         throw Xapian::NetworkError("Couldn't stat file to send", errno);
     374                 :            :     // FIXME: Use sendfile() or similar if available?
     375                 :            : 
     376                 :            :     char buf[CHUNKSIZE];
     377                 :         56 :     buf[0] = type;
     378                 :         56 :     size_t c = 1;
     379                 :            :     {
     380         [ +  - ]:         56 :         string enc_size;
     381         [ +  - ]:         56 :         pack_uint(enc_size, std::make_unsigned<off_t>::type(size));
     382                 :         56 :         c += enc_size.size();
     383                 :            :         // An encoded length should be just a few bytes.
     384                 :            :         AssertRel(c, <=, sizeof(buf));
     385                 :         56 :         memcpy(buf + 1, enc_size.data(), enc_size.size());
     386                 :            :     }
     387                 :            : 
     388                 :            : #ifdef __WIN32__
     389                 :            :     HANDLE hout = fd_to_handle(fdout);
     390                 :            :     size_t count = 0;
     391                 :            :     while (true) {
     392                 :            :         DWORD n;
     393                 :            :         BOOL ok = WriteFile(hout, buf + count, c - count, &n, &overlapped);
     394                 :            :         if (!ok) {
     395                 :            :             int errcode = GetLastError();
     396                 :            :             if (errcode != ERROR_IO_PENDING)
     397                 :            :                 throw Xapian::NetworkError("write failed", context, -errcode);
     398                 :            :             // Just wait for the data to be sent, or a timeout.
     399                 :            :             DWORD waitrc;
     400                 :            :             waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
     401                 :            :             if (waitrc != WAIT_OBJECT_0) {
     402                 :            :                 LOGLINE(REMOTE, "write: timeout has expired");
     403                 :            :                 throw_timeout("Timeout expired while trying to write", context);
     404                 :            :             }
     405                 :            :             // Get the final result.
     406                 :            :             if (!GetOverlappedResult(hout, &overlapped, &n, FALSE))
     407                 :            :                 throw Xapian::NetworkError("Failed to get overlapped result",
     408                 :            :                                            context, -int(GetLastError()));
     409                 :            :         }
     410                 :            : 
     411                 :            :         count += n;
     412                 :            : 
     413                 :            :         // We must update the offset in the OVERLAPPED structure manually.
     414                 :            :         update_overlapped_offset(overlapped, n);
     415                 :            : 
     416                 :            :         if (count == c) {
     417                 :            :             if (size == 0) return;
     418                 :            : 
     419                 :            :             ssize_t res;
     420                 :            :             do {
     421                 :            :                 res = read(fd, buf, sizeof(buf));
     422                 :            :             } while (res < 0 && errno == EINTR);
     423                 :            :             if (res < 0) throw Xapian::NetworkError("read failed", errno);
     424                 :            :             c = size_t(res);
     425                 :            : 
     426                 :            :             size -= c;
     427                 :            :             count = 0;
     428                 :            :         }
     429                 :            :     }
     430                 :            : #else
     431                 :            :     // If there's no end_time, just use blocking I/O.
     432 [ -  + ][ +  - ]:         56 :     if (fcntl(fdout, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
                 [ -  + ]
     433                 :            :         throw Xapian::NetworkError("Failed to set fdout non-blocking-ness",
     434 [ #  # ][ #  # ]:          0 :                                    context, errno);
     435                 :            :     }
     436                 :            : 
     437                 :         56 :     size_t count = 0;
     438                 :            :     while (true) {
     439                 :            :         // We've set write to non-blocking, so just try writing as there
     440                 :            :         // will usually be space.
     441         [ +  - ]:        172 :         ssize_t n = write(fdout, buf + count, c - count);
     442                 :            : 
     443         [ +  - ]:        172 :         if (n >= 0) {
     444                 :        172 :             count += n;
     445         [ +  - ]:        172 :             if (count == c) {
     446         [ +  + ]:        172 :                 if (size == 0) return;
     447                 :            : 
     448                 :            :                 ssize_t res;
     449         [ -  + ]:        116 :                 do {
     450         [ +  - ]:        116 :                     res = read(fd, buf, sizeof(buf));
     451         [ #  # ]:          0 :                 } while (res < 0 && errno == EINTR);
     452 [ -  + ][ #  # ]:        116 :                 if (res < 0) throw Xapian::NetworkError("read failed", errno);
                 [ #  # ]
     453                 :        116 :                 c = size_t(res);
     454                 :            : 
     455                 :        116 :                 size -= c;
     456                 :        116 :                 count = 0;
     457                 :            :             }
     458                 :        116 :             continue;
     459                 :            :         }
     460                 :            : 
     461                 :            :         LOGLINE(REMOTE, "write gave errno = " << errno);
     462         [ #  # ]:          0 :         if (errno == EINTR) continue;
     463                 :            : 
     464         [ #  # ]:          0 :         if (errno != EAGAIN)
     465 [ #  # ][ #  # ]:          0 :             throw Xapian::NetworkError("write failed", context, errno);
     466                 :            : 
     467                 :          0 :         double now = RealTime::now();
     468                 :          0 :         double time_diff = end_time - now;
     469         [ #  # ]:          0 :         if (time_diff < 0) {
     470                 :            :             LOGLINE(REMOTE, "write: timeout has expired");
     471                 :          0 :             throw_timeout("Timeout expired while trying to write", context);
     472                 :            :         }
     473                 :            : 
     474                 :            :         // Wait until there is space or the timeout is reached.
     475                 :            : # ifdef HAVE_POLL
     476                 :            :         struct pollfd fds;
     477                 :          0 :         fds.fd = fdout;
     478                 :          0 :         fds.events = POLLOUT;
     479         [ #  # ]:          0 :         int result = poll(&fds, 1, int(time_diff * 1000));
     480                 :            : #  define POLLSELECT "poll"
     481                 :            : # else
     482                 :            :         if (fdout >= FD_SETSIZE) {
     483                 :            :             // We can't block with a timeout, so just sleep and retry.
     484                 :            :             RealTime::sleep(now + min(0.001, time_diff / 4));
     485                 :            :             continue;
     486                 :            :         }
     487                 :            : 
     488                 :            :         fd_set fdset;
     489                 :            :         FD_ZERO(&fdset);
     490                 :            :         FD_SET(fdout, &fdset);
     491                 :            : 
     492                 :            :         struct timeval tv;
     493                 :            :         RealTime::to_timeval(time_diff, &tv);
     494                 :            :         int result = select(fdout + 1, 0, &fdset, 0, &tv);
     495                 :            : #  define POLLSELECT "select"
     496                 :            : # endif
     497                 :            : 
     498         [ #  # ]:          0 :         if (result < 0) {
     499 [ #  # ][ #  # ]:          0 :             if (errno == EINTR || errno == EAGAIN) {
     500                 :            :                 // EINTR/EAGAIN means select was interrupted by a signal.
     501                 :            :                 // We could just retry the poll/select, but it's easier to just
     502                 :            :                 // retry the write.
     503                 :          0 :                 continue;
     504                 :            :             }
     505                 :            :             throw Xapian::NetworkError(POLLSELECT " failed during write",
     506 [ #  # ][ #  # ]:          0 :                                        context, errno);
     507                 :            : # undef POLLSELECT
     508                 :            :         }
     509                 :            : 
     510         [ #  # ]:          0 :         if (result == 0)
     511                 :          0 :             throw_timeout("Timeout expired while trying to write", context);
     512                 :        116 :     }
     513                 :            : #endif
     514                 :            : }
     515                 :            : 
     516                 :            : int
     517                 :        186 : RemoteConnection::sniff_next_message_type(double end_time)
     518                 :            : {
     519                 :            :     LOGCALL(REMOTE, int, "RemoteConnection::sniff_next_message_type", end_time);
     520         [ -  + ]:        186 :     if (fdin == -1)
     521                 :          0 :         throw_database_closed();
     522                 :            : 
     523         [ +  + ]:        186 :     if (!read_at_least(1, end_time))
     524                 :          1 :         RETURN(-1);
     525                 :        185 :     unsigned char type = buffer[0];
     526                 :        185 :     RETURN(type);
     527                 :            : }
     528                 :            : 
     529                 :            : int
     530                 :    2605547 : RemoteConnection::get_message(string &result, double end_time)
     531                 :            : {
     532                 :            :     LOGCALL(REMOTE, int, "RemoteConnection::get_message", result | end_time);
     533         [ -  + ]:    2605547 :     if (fdin == -1)
     534                 :          0 :         throw_database_closed();
     535                 :            : 
     536 [ +  + ][ +  + ]:    2605547 :     if (!read_at_least(2, end_time))
     537                 :        598 :         RETURN(-1);
     538                 :            :     // This code assume things about the pack_uint() encoding in order to
     539                 :            :     // handle partial reads.
     540         [ +  - ]:    2604947 :     size_t len = static_cast<unsigned char>(buffer[1]);
     541         [ +  + ]:    2604947 :     if (len < 128) {
     542 [ +  - ][ -  + ]:    2380481 :         if (!read_at_least(len + 2, end_time))
     543                 :          0 :             RETURN(-1);
     544         [ +  - ]:    2380481 :         result.assign(buffer.data() + 2, len);
     545         [ +  - ]:    2380481 :         unsigned char type = buffer[0];
     546         [ +  - ]:    2380481 :         buffer.erase(0, len + 2);
     547                 :    2380481 :         RETURN(type);
     548                 :            :     }
     549                 :            : 
     550                 :            :     // We know the message payload is at least 128 bytes of data, and if we
     551                 :            :     // read that much we'll definitely have the whole of the length.
     552 [ +  - ][ -  + ]:     224466 :     if (!read_at_least(128 + 2, end_time))
     553                 :          0 :         RETURN(-1);
     554                 :     224466 :     const char* p = buffer.data();
     555                 :     224466 :     const char* p_end = p + buffer.size();
     556                 :     224466 :     ++p;
     557         [ -  + ]:     224466 :     if (!unpack_uint(&p, p_end, &len)) {
     558                 :          0 :         RETURN(-1);
     559                 :            :     }
     560                 :     224466 :     size_t header_len = (p - buffer.data());
     561 [ +  - ][ -  + ]:     224466 :     if (!read_at_least(header_len + len, end_time))
     562                 :          0 :         RETURN(-1);
     563         [ +  - ]:     224466 :     result.assign(buffer.data() + header_len, len);
     564         [ +  - ]:     224466 :     unsigned char type = buffer[0];
     565         [ +  - ]:     224466 :     buffer.erase(0, header_len + len);
     566                 :    2605545 :     RETURN(type);
     567                 :            : }
     568                 :            : 
     569                 :            : int
     570                 :        102 : RemoteConnection::get_message_chunked(double end_time)
     571                 :            : {
     572                 :            :     LOGCALL(REMOTE, int, "RemoteConnection::get_message_chunked", end_time);
     573                 :            : 
     574         [ -  + ]:        102 :     if (fdin == -1)
     575                 :          0 :         throw_database_closed();
     576                 :            : 
     577 [ +  - ][ -  + ]:        102 :     if (!read_at_least(2, end_time))
     578                 :          0 :         RETURN(-1);
     579                 :            :     // This code assume things about the pack_uint() encoding in order to
     580                 :            :     // handle partial reads.
     581         [ +  - ]:        102 :     uint_least64_t len = static_cast<unsigned char>(buffer[1]);
     582         [ +  + ]:        102 :     if (len < 128) {
     583                 :          9 :         chunked_data_left = off_t(len);
     584         [ +  - ]:          9 :         char type = buffer[0];
     585         [ +  - ]:          9 :         buffer.erase(0, 2);
     586                 :          9 :         RETURN(type);
     587                 :            :     }
     588                 :            : 
     589                 :            :     // We know the message payload is at least 128 bytes of data, and if we
     590                 :            :     // read that much we'll definitely have the whole of the length.
     591 [ +  - ][ +  + ]:         93 :     if (!read_at_least(128 + 2, end_time))
     592                 :         21 :         RETURN(-1);
     593                 :         72 :     const char* p = buffer.data();
     594                 :         72 :     const char* p_end = p + buffer.size();
     595                 :         72 :     ++p;
     596         [ -  + ]:         72 :     if (!unpack_uint(&p, p_end, &len)) {
     597                 :          0 :         RETURN(-1);
     598                 :            :     }
     599                 :         72 :     chunked_data_left = off_t(len);
     600                 :            :     // Check that the value of len fits in an off_t without loss.
     601         [ -  + ]:         72 :     if (rare(uint_least64_t(chunked_data_left) != len)) {
     602                 :          0 :         throw_network_error_insane_message_length();
     603                 :            :     }
     604                 :         72 :     size_t header_len = (p - buffer.data());
     605         [ +  - ]:         72 :     unsigned char type = buffer[0];
     606         [ +  - ]:         72 :     buffer.erase(0, header_len);
     607                 :        102 :     RETURN(type);
     608                 :            : }
     609                 :            : 
     610                 :            : int
     611                 :        533 : RemoteConnection::get_message_chunk(string &result, size_t at_least,
     612                 :            :                                     double end_time)
     613                 :            : {
     614                 :            :     LOGCALL(REMOTE, int, "RemoteConnection::get_message_chunk", result | at_least | end_time);
     615         [ -  + ]:        533 :     if (fdin == -1)
     616                 :          0 :         throw_database_closed();
     617                 :            : 
     618         [ +  + ]:        533 :     if (at_least <= result.size()) RETURN(true);
     619                 :        189 :     at_least -= result.size();
     620                 :            : 
     621                 :        189 :     bool read_enough = (off_t(at_least) <= chunked_data_left);
     622         [ +  + ]:        189 :     if (!read_enough) at_least = size_t(chunked_data_left);
     623                 :            : 
     624         [ +  + ]:        189 :     if (!read_at_least(at_least, end_time))
     625                 :         24 :         RETURN(-1);
     626                 :            : 
     627         [ +  - ]:        165 :     size_t retlen = min(off_t(buffer.size()), chunked_data_left);
     628                 :        165 :     result.append(buffer, 0, retlen);
     629                 :        165 :     buffer.erase(0, retlen);
     630                 :        165 :     chunked_data_left -= retlen;
     631                 :            : 
     632                 :        533 :     RETURN(int(read_enough));
     633                 :            : }
     634                 :            : 
     635                 :            : /** Write n bytes from block pointed to by p to file descriptor fd. */
     636                 :            : static void
     637                 :         58 : write_all(int fd, const char * p, size_t n)
     638                 :            : {
     639         [ +  + ]:        116 :     while (n) {
     640                 :         58 :         ssize_t c = write(fd, p, n);
     641         [ -  + ]:         58 :         if (c < 0) {
     642         [ #  # ]:          0 :             if (errno == EINTR) continue;
     643 [ #  # ][ #  # ]:          0 :             throw Xapian::NetworkError("Error writing to file", errno);
     644                 :            :         }
     645                 :         58 :         p += c;
     646                 :         58 :         n -= c;
     647                 :            :     }
     648                 :         58 : }
     649                 :            : 
     650                 :            : int
     651                 :         43 : RemoteConnection::receive_file(const string &file, double end_time)
     652                 :            : {
     653                 :            :     LOGCALL(REMOTE, int, "RemoteConnection::receive_file", file | end_time);
     654         [ -  + ]:         43 :     if (fdin == -1)
     655                 :          0 :         throw_database_closed();
     656                 :            : 
     657                 :            :     // FIXME: Do we want to be able to delete the file during writing?
     658         [ +  - ]:         43 :     FD fd(posixy_open(file.c_str(), O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, 0666));
     659         [ -  + ]:         43 :     if (fd == -1)
     660 [ #  # ][ #  # ]:          0 :         throw Xapian::NetworkError("Couldn't open file for writing: " + file, errno);
     661                 :            : 
     662         [ +  - ]:         43 :     int type = get_message_chunked(end_time);
     663         [ +  + ]:         58 :     do {
     664         [ +  - ]:         58 :         off_t min_read = min(chunked_data_left, off_t(CHUNKSIZE));
     665 [ +  - ][ -  + ]:         58 :         if (!read_at_least(min_read, end_time))
     666                 :          0 :             RETURN(-1);
     667         [ +  - ]:         58 :         write_all(fd, buffer.data(), min_read);
     668                 :         58 :         chunked_data_left -= min_read;
     669         [ +  - ]:         58 :         buffer.erase(0, min_read);
     670                 :            :     } while (chunked_data_left);
     671                 :         43 :     RETURN(type);
     672                 :            : }
     673                 :            : 
     674                 :            : void
     675                 :        188 : RemoteConnection::shutdown()
     676                 :            : {
     677                 :            :     LOGCALL_VOID(REMOTE, "RemoteConnection::shutdown", NO_ARGS);
     678                 :            : 
     679         [ +  + ]:        362 :     if (fdin < 0) return;
     680                 :            : 
     681                 :            :     // We can be called from a destructor, so we can't throw an exception.
     682                 :            :     try {
     683 [ +  - ][ +  - ]:        174 :         send_message(MSG_SHUTDOWN, string(), 0.0);
     684                 :            : #ifdef __WIN32__
     685                 :            :         HANDLE hin = fd_to_handle(fdin);
     686                 :            :         char dummy;
     687                 :            :         DWORD received;
     688                 :            :         BOOL ok = ReadFile(hin, &dummy, 1, &received, &overlapped);
     689                 :            :         if (!ok && GetLastError() == ERROR_IO_PENDING) {
     690                 :            :             // Wait for asynchronous read to complete.
     691                 :            :             (void)WaitForSingleObject(overlapped.hEvent, INFINITE);
     692                 :            :         }
     693                 :            : #else
     694                 :            :         // Wait for the connection to be closed - when this happens
     695                 :            :         // poll()/select() will report that a read won't block.
     696                 :            : # ifdef HAVE_POLL
     697                 :            :         struct pollfd fds;
     698                 :        174 :         fds.fd = fdin;
     699                 :        174 :         fds.events = POLLIN;
     700                 :            :         int res;
     701         [ -  + ]:        174 :         do {
     702         [ +  - ]:        174 :             res = poll(&fds, 1, -1);
     703 [ #  # ][ #  # ]:        174 :         } while (res < 0 && (errno == EINTR || errno == EAGAIN));
     704                 :            : # else
     705                 :            :         if (fdin < FD_SETSIZE) {
     706                 :            :             fd_set fdset;
     707                 :            :             FD_ZERO(&fdset);
     708                 :            :             FD_SET(fdin, &fdset);
     709                 :            :             int res;
     710                 :            :             do {
     711                 :            :                 res = select(fdin + 1, &fdset, 0, 0, NULL);
     712                 :            :             } while (res < 0 && (errno == EINTR || errno == EAGAIN));
     713                 :            :         }
     714                 :            : # endif
     715                 :            : #endif
     716                 :          0 :     } catch (...) {
     717                 :            :     }
     718                 :            : }
     719                 :            : 
     720                 :            : void
     721                 :       1570 : RemoteConnection::do_close()
     722                 :            : {
     723                 :            :     LOGCALL_VOID(REMOTE, "RemoteConnection::do_close", NO_ARGS);
     724                 :            : 
     725         [ +  + ]:       1570 :     if (fdin >= 0) {
     726                 :        774 :         close_fd_or_socket(fdin);
     727                 :            : 
     728                 :            :         // If the same fd is used in both directions, don't close it twice.
     729         [ +  - ]:        774 :         if (fdin == fdout) fdout = -1;
     730                 :            : 
     731                 :        774 :         fdin = -1;
     732                 :            :     }
     733                 :            : 
     734         [ -  + ]:       1570 :     if (fdout >= 0) {
     735                 :          0 :         close_fd_or_socket(fdout);
     736                 :          0 :         fdout = -1;
     737                 :            :     }
     738                 :       1570 : }
     739                 :            : 
     740                 :            : #ifdef __WIN32__
     741                 :            : DWORD
     742                 :            : RemoteConnection::calc_read_wait_msecs(double end_time)
     743                 :            : {
     744                 :            :     if (end_time == 0.0)
     745                 :            :         return INFINITE;
     746                 :            : 
     747                 :            :     // Calculate how far in the future end_time is.
     748                 :            :     double time_diff = end_time - RealTime::now();
     749                 :            : 
     750                 :            :     // DWORD is unsigned, so we mustn't try and return a negative value.
     751                 :            :     if (time_diff < 0.0) {
     752                 :            :         throw_timeout("Timeout expired before starting read", context);
     753                 :            :     }
     754                 :            :     return static_cast<DWORD>(time_diff * 1000.0);
     755                 :            : }
     756                 :            : #endif

Generated by: LCOV version 1.11