LCOV - code coverage report
Current view: top level - backends/glass - glass_databasereplicator.cc (source / functions) Hit Total Coverage
Test: Test Coverage for xapian-core 954b5873a738 Lines: 125 158 79.1 %
Date: 2019-06-30 05:20:33 Functions: 9 9 100.0 %
Branches: 96 310 31.0 %

           Branch data     Line data    Source code
       1                 :            : /** @file glass_databasereplicator.cc
       2                 :            :  * @brief Support for glass database replication
       3                 :            :  */
       4                 :            : /* Copyright 2008 Lemur Consulting Ltd
       5                 :            :  * Copyright 2009,2010,2011,2012,2013,2014,2015,2016 Olly Betts
       6                 :            :  *
       7                 :            :  * This program is free software; you can redistribute it and/or
       8                 :            :  * modify it under the terms of the GNU General Public License as
       9                 :            :  * published by the Free Software Foundation; either version 2 of the
      10                 :            :  * License, or (at your option) any later version.
      11                 :            :  *
      12                 :            :  * This program is distributed in the hope that it will be useful,
      13                 :            :  * but WITHOUT ANY WARRANTY; without even the implied warranty of
      14                 :            :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      15                 :            :  * GNU General Public License for more details.
      16                 :            :  *
      17                 :            :  * You should have received a copy of the GNU General Public License
      18                 :            :  * along with this program; if not, write to the Free Software
      19                 :            :  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301
      20                 :            :  * USA
      21                 :            :  */
      22                 :            : 
      23                 :            : #include <config.h>
      24                 :            : 
      25                 :            : #include "glass_databasereplicator.h"
      26                 :            : 
      27                 :            : #include "xapian/error.h"
      28                 :            : 
      29                 :            : #include "../flint_lock.h"
      30                 :            : #include "glass_defs.h"
      31                 :            : #include "glass_replicate_internal.h"
      32                 :            : #include "glass_version.h"
      33                 :            : #include "compression_stream.h"
      34                 :            : #include "debuglog.h"
      35                 :            : #include "fd.h"
      36                 :            : #include "internaltypes.h"
      37                 :            : #include "io_utils.h"
      38                 :            : #include "pack.h"
      39                 :            : #include "posixy_wrapper.h"
      40                 :            : #include "net/remoteconnection.h"
      41                 :            : #include "replicationprotocol.h"
      42                 :            : #include "str.h"
      43                 :            : #include "stringutils.h"
      44                 :            : 
      45                 :            : #include <algorithm>
      46                 :            : #include <cerrno>
      47                 :            : 
      48                 :            : [[noreturn]]
      49                 :            : static void
      50                 :         45 : throw_connection_closed_unexpectedly()
      51                 :            : {
      52 [ +  - ][ +  - ]:         45 :     throw Xapian::NetworkError("Connection closed unexpectedly");
                 [ +  - ]
      53                 :            : }
      54                 :            : 
      55                 :            : using namespace std;
      56                 :            : using namespace Xapian;
      57                 :            : 
      58                 :            : static const char * dbnames =
      59                 :            :         "/postlist." GLASS_TABLE_EXTENSION "\0"
      60                 :            :         "/docdata." GLASS_TABLE_EXTENSION "\0\0"
      61                 :            :         "/termlist." GLASS_TABLE_EXTENSION "\0"
      62                 :            :         "/position." GLASS_TABLE_EXTENSION "\0"
      63                 :            :         "/spelling." GLASS_TABLE_EXTENSION "\0"
      64                 :            :         "/synonym." GLASS_TABLE_EXTENSION;
      65                 :            : 
      66                 :         77 : GlassDatabaseReplicator::GlassDatabaseReplicator(const string & db_dir_)
      67         [ +  - ]:         77 :     : db_dir(db_dir_)
      68                 :            : {
      69         [ +  - ]:         77 :     std::fill_n(fds, sizeof(fds) / sizeof(fds[0]), -1);
      70                 :         77 : }
      71                 :            : 
      72                 :            : void
      73                 :         13 : GlassDatabaseReplicator::commit() const
      74                 :            : {
      75         [ +  + ]:         91 :     for (size_t i = 0; i != Glass::MAX_; ++i) {
      76                 :         78 :         int fd = fds[i];
      77         [ +  + ]:         78 :         if (fd >= 0) {
      78                 :         47 :             io_sync(fd);
      79                 :            : #if 0 // FIXME: close or keep open?
      80                 :            :             close(fd);
      81                 :            :             fds[i] = -1;
      82                 :            : #endif
      83                 :            :         }
      84                 :            :     }
      85                 :         13 : }
      86                 :            : 
      87                 :        231 : GlassDatabaseReplicator::~GlassDatabaseReplicator()
      88                 :            : {
      89         [ +  + ]:        539 :     for (size_t i = 0; i != Glass::MAX_; ++i) {
      90                 :        462 :         int fd = fds[i];
      91         [ +  + ]:        462 :         if (fd >= 0) {
      92                 :        127 :             close(fd);
      93                 :            :         }
      94                 :            :     }
      95         [ -  + ]:        154 : }
      96                 :            : 
      97                 :            : bool
      98                 :          9 : GlassDatabaseReplicator::check_revision_at_least(const string & rev,
      99                 :            :                                                  const string & target) const
     100                 :            : {
     101                 :            :     LOGCALL(DB, bool, "GlassDatabaseReplicator::check_revision_at_least", rev | target);
     102                 :            : 
     103                 :            :     glass_revision_number_t rev_val;
     104                 :            :     glass_revision_number_t target_val;
     105                 :            : 
     106                 :          9 :     const char * ptr = rev.data();
     107                 :          9 :     const char * end = ptr + rev.size();
     108         [ -  + ]:          9 :     if (!unpack_uint(&ptr, end, &rev_val)) {
     109 [ #  # ][ #  # ]:          0 :         throw NetworkError("Invalid revision string supplied to check_revision_at_least");
                 [ #  # ]
     110                 :            :     }
     111                 :            : 
     112                 :          9 :     ptr = target.data();
     113                 :          9 :     end = ptr + target.size();
     114         [ -  + ]:          9 :     if (!unpack_uint(&ptr, end, &target_val)) {
     115 [ #  # ][ #  # ]:          0 :         throw NetworkError("Invalid revision string supplied to check_revision_at_least");
                 [ #  # ]
     116                 :            :     }
     117                 :            : 
     118                 :          9 :     RETURN(rev_val >= target_val);
     119                 :            : }
     120                 :            : 
     121                 :            : void
     122                 :         13 : GlassDatabaseReplicator::process_changeset_chunk_version(string & buf,
     123                 :            :                                                          RemoteConnection & conn,
     124                 :            :                                                          double end_time) const
     125                 :            : {
     126                 :         13 :     const char *ptr = buf.data();
     127                 :         13 :     const char *end = ptr + buf.size();
     128                 :            : 
     129                 :            :     glass_revision_number_t rev;
     130         [ -  + ]:         13 :     if (!unpack_uint(&ptr, end, &rev))
     131 [ #  # ][ #  # ]:          0 :         throw NetworkError("Invalid revision in changeset");
                 [ #  # ]
     132                 :            : 
     133                 :            :     string::size_type size;
     134         [ -  + ]:         13 :     if (!unpack_uint(&ptr, end, &size))
     135 [ #  # ][ #  # ]:          0 :         throw NetworkError("Invalid version file size in changeset");
                 [ #  # ]
     136                 :            : 
     137                 :            :     // Get the new version file into buf.
     138         [ +  - ]:         13 :     buf.erase(0, ptr - buf.data());
     139         [ +  - ]:         13 :     int res = conn.get_message_chunk(buf, size, end_time);
     140         [ -  + ]:         13 :     if (res <= 0) {
     141         [ #  # ]:          0 :         if (res < 0)
     142                 :          0 :             throw_connection_closed_unexpectedly();
     143 [ #  # ][ #  # ]:          0 :         throw NetworkError("Unexpected end of changeset (6)");
                 [ #  # ]
     144                 :            :     }
     145                 :            : 
     146                 :            :     // Write size bytes from start of buf to new version file.
     147         [ +  - ]:         13 :     string tmpfile = db_dir;
     148         [ +  - ]:         13 :     tmpfile += "/v.rtmp";
     149         [ +  - ]:         13 :     int fd = posixy_open(tmpfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_CLOEXEC, 0666);
     150         [ -  + ]:         13 :     if (fd == -1) {
     151         [ #  # ]:          0 :         string msg = "Failed to open ";
     152         [ #  # ]:          0 :         msg += tmpfile;
     153         [ #  # ]:          0 :         throw DatabaseError(msg, errno);
     154                 :            :     }
     155                 :            :     {
     156                 :         13 :         FD closer(fd);
     157         [ +  - ]:         13 :         io_write(fd, buf.data(), size);
     158         [ +  - ]:         13 :         io_sync(fd);
     159                 :            :     }
     160         [ +  - ]:         26 :     string version_file = db_dir;
     161         [ +  - ]:         13 :     version_file += "/iamglass";
     162 [ +  - ][ -  + ]:         13 :     if (!io_tmp_rename(tmpfile, version_file)) {
     163         [ #  # ]:          0 :         string msg("Couldn't create new version file ");
     164         [ #  # ]:          0 :         msg += version_file;
     165         [ #  # ]:          0 :         throw DatabaseError(msg, errno);
     166                 :            :     }
     167                 :            : 
     168         [ +  - ]:         26 :     buf.erase(0, size);
     169                 :         13 : }
     170                 :            : 
     171                 :            : void
     172                 :        220 : GlassDatabaseReplicator::process_changeset_chunk_blocks(Glass::table_type table,
     173                 :            :                                                         unsigned v,
     174                 :            :                                                         string & buf,
     175                 :            :                                                         RemoteConnection & conn,
     176                 :            :                                                         double end_time) const
     177                 :            : {
     178                 :        220 :     const char *ptr = buf.data();
     179                 :        220 :     const char *end = ptr + buf.size();
     180                 :            : 
     181                 :        220 :     unsigned int changeset_blocksize = GLASS_MIN_BLOCKSIZE << v;
     182 [ +  - ][ -  + ]:        220 :     if (changeset_blocksize > 65536 ||
     183                 :        220 :         (changeset_blocksize & (changeset_blocksize - 1))) {
     184 [ #  # ][ #  # ]:          0 :         throw NetworkError("Invalid blocksize in changeset");
                 [ #  # ]
     185                 :            :     }
     186                 :            :     uint4 block_number;
     187         [ -  + ]:        220 :     if (!unpack_uint(&ptr, end, &block_number))
     188 [ #  # ][ #  # ]:          0 :         throw NetworkError("Invalid block number in changeset");
                 [ #  # ]
     189                 :            : 
     190         [ +  - ]:        220 :     buf.erase(0, ptr - buf.data());
     191                 :            : 
     192                 :        220 :     int fd = fds[table];
     193         [ +  + ]:        220 :     if (fd == -1) {
     194         [ +  - ]:        127 :         string db_path = db_dir;
     195         [ +  - ]:        127 :         db_path += dbnames + table * (11 + CONST_STRLEN(GLASS_TABLE_EXTENSION));
     196         [ +  - ]:        127 :         fd = posixy_open(db_path.c_str(), O_WRONLY | O_CREAT | O_CLOEXEC, 0666);
     197         [ -  + ]:        127 :         if (fd == -1) {
     198         [ #  # ]:          0 :             string msg = "Failed to open ";
     199         [ #  # ]:          0 :             msg += db_path;
     200         [ #  # ]:          0 :             throw DatabaseError(msg, errno);
     201                 :            :         }
     202                 :        127 :         fds[table] = fd;
     203                 :            :     }
     204                 :            : 
     205         [ +  - ]:        220 :     int res = conn.get_message_chunk(buf, changeset_blocksize, end_time);
     206         [ +  + ]:        220 :     if (res <= 0) {
     207         [ +  - ]:          8 :         if (res < 0)
     208                 :          8 :             throw_connection_closed_unexpectedly();
     209 [ #  # ][ #  # ]:          0 :         throw NetworkError("Unexpected end of changeset (4)");
                 [ #  # ]
     210                 :            :     }
     211                 :            : 
     212         [ +  - ]:        212 :     io_write_block(fd, buf.data(), changeset_blocksize, block_number);
     213         [ +  - ]:        212 :     buf.erase(0, changeset_blocksize);
     214                 :        212 : }
     215                 :            : 
     216                 :            : string
     217                 :         59 : GlassDatabaseReplicator::apply_changeset_from_conn(RemoteConnection & conn,
     218                 :            :                                                    double end_time,
     219                 :            :                                                    bool valid) const
     220                 :            : {
     221                 :            :     LOGCALL(DB, string, "GlassDatabaseReplicator::apply_changeset_from_conn", conn | end_time | valid);
     222                 :            : 
     223                 :            :     // Lock the database to perform modifications.
     224         [ +  - ]:         59 :     FlintLock lock(db_dir);
     225         [ +  - ]:        118 :     string explanation;
     226         [ +  - ]:         59 :     FlintLock::reason why = lock.lock(true, false, explanation);
     227         [ -  + ]:         59 :     if (why != FlintLock::SUCCESS) {
     228                 :          0 :         lock.throw_databaselockerror(why, db_dir, explanation);
     229                 :            :     }
     230                 :            : 
     231         [ +  - ]:         59 :     int type = conn.get_message_chunked(end_time);
     232         [ +  + ]:         59 :     if (type < 0)
     233                 :         21 :         throw_connection_closed_unexpectedly();
     234                 :            :     AssertEq(type, REPL_REPLY_CHANGESET);
     235                 :            : 
     236         [ +  - ]:         38 :     string buf;
     237                 :            :     // Read enough to be certain that we've got the header part of the
     238                 :            :     // changeset.
     239                 :            : 
     240 [ +  - ][ -  + ]:         38 :     if (conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time) < 0)
     241                 :          0 :         throw_connection_closed_unexpectedly();
     242                 :         38 :     const char *ptr = buf.data();
     243                 :         38 :     const char *end = ptr + buf.size();
     244                 :            :     // Check the magic string.
     245         [ -  + ]:         38 :     if (!startswith(buf, CHANGES_MAGIC_STRING)) {
     246 [ #  # ][ #  # ]:          0 :         throw NetworkError("Invalid ChangeSet magic string");
                 [ #  # ]
     247                 :            :     }
     248                 :         38 :     ptr += CONST_STRLEN(CHANGES_MAGIC_STRING);
     249         [ -  + ]:         38 :     if (ptr == end)
     250 [ #  # ][ #  # ]:          0 :         throw NetworkError("Couldn't read a valid version number from changeset");
                 [ #  # ]
     251                 :         38 :     unsigned int changes_version = *ptr++;
     252         [ -  + ]:         38 :     if (changes_version != CHANGES_VERSION)
     253 [ #  # ][ #  # ]:          0 :         throw NetworkError("Unsupported changeset version");
                 [ #  # ]
     254                 :            : 
     255                 :            :     glass_revision_number_t startrev;
     256                 :            :     glass_revision_number_t endrev;
     257                 :            : 
     258         [ -  + ]:         38 :     if (!unpack_uint(&ptr, end, &startrev))
     259 [ #  # ][ #  # ]:          0 :         throw NetworkError("Couldn't read a valid start revision from changeset");
                 [ #  # ]
     260         [ -  + ]:         38 :     if (!unpack_uint(&ptr, end, &endrev))
     261 [ #  # ][ #  # ]:          0 :         throw NetworkError("Couldn't read a valid end revision from changeset");
                 [ #  # ]
     262                 :            : 
     263         [ -  + ]:         38 :     if (endrev <= startrev)
     264 [ #  # ][ #  # ]:          0 :         throw NetworkError("End revision in changeset is not later than start revision");
                 [ #  # ]
     265                 :            : 
     266         [ -  + ]:         38 :     if (ptr == end)
     267 [ #  # ][ #  # ]:          0 :         throw NetworkError("Unexpected end of changeset (1)");
                 [ #  # ]
     268                 :            : 
     269         [ +  - ]:         38 :     if (valid) {
     270                 :            :         // Check the revision number.
     271                 :            :         // If the database was not known to be valid, we cannot
     272                 :            :         // reliably determine its revision number, so must skip this
     273                 :            :         // check.
     274         [ +  - ]:         38 :         GlassVersion version_file(db_dir);
     275         [ +  - ]:         38 :         version_file.read();
     276         [ +  + ]:         38 :         if (startrev != version_file.get_revision())
     277 [ +  - ][ +  - ]:         38 :             throw NetworkError("Changeset supplied is for wrong revision number");
                 [ +  - ]
     278                 :            :     }
     279                 :            : 
     280                 :         37 :     unsigned char changes_type = *ptr++;
     281         [ -  + ]:         37 :     if (changes_type != 0) {
     282 [ #  # ][ #  # ]:          0 :         throw NetworkError("Unsupported changeset type: " + str(changes_type));
         [ #  # ][ #  # ]
     283                 :            :         // FIXME - support changes of type 1, produced when DANGEROUS mode is
     284                 :            :         // on.
     285                 :            :     }
     286                 :            : 
     287                 :            :     // Clear the bits of the buffer which have been read.
     288         [ +  - ]:         37 :     buf.erase(0, ptr - buf.data());
     289                 :            : 
     290                 :            :     // Read the items from the changeset.
     291                 :            :     while (true) {
     292 [ +  - ][ +  + ]:        262 :         if (conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time) < 0)
     293                 :         16 :             throw_connection_closed_unexpectedly();
     294                 :        246 :         ptr = buf.data();
     295                 :        246 :         end = ptr + buf.size();
     296         [ -  + ]:        246 :         if (ptr == end)
     297 [ #  # ][ #  # ]:          0 :             throw NetworkError("Unexpected end of changeset (3)");
                 [ #  # ]
     298                 :            : 
     299                 :            :         // Read the type of the next chunk of data
     300                 :            :         // chunk type can be (in binary):
     301                 :            :         //
     302                 :            :         // 11111111 - last chunk
     303                 :            :         // 11111110 - version file
     304                 :            :         // 00BBBTTT - table block:
     305                 :            :         //   Block size = (GLASS_MIN_BLOCKSIZE<<BBB) BBB=0..5
     306                 :            :         //   Table TTT=0..(Glass::MAX_-1)
     307                 :        246 :         unsigned char chunk_type = *ptr++;
     308         [ +  + ]:        246 :         if (chunk_type == 0xff)
     309                 :         13 :             break;
     310         [ +  + ]:        233 :         if (chunk_type == 0xfe) {
     311                 :            :             // Version file.
     312         [ +  - ]:         13 :             buf.erase(0, ptr - buf.data());
     313         [ +  - ]:         13 :             process_changeset_chunk_version(buf, conn, end_time);
     314                 :         13 :             continue;
     315                 :            :         }
     316                 :        220 :         size_t table_code = (chunk_type & 0x07);
     317         [ -  + ]:        220 :         if (table_code >= Glass::MAX_)
     318 [ #  # ][ #  # ]:          0 :             throw NetworkError("Bad table code in changeset file");
                 [ #  # ]
     319                 :        220 :         Glass::table_type table = static_cast<Glass::table_type>(table_code);
     320                 :        220 :         unsigned char v = (chunk_type >> 3) & 0x0f;
     321                 :            : 
     322                 :            :         // Process the chunk
     323         [ +  - ]:        220 :         buf.erase(0, ptr - buf.data());
     324         [ +  + ]:        233 :         process_changeset_chunk_blocks(table, v, buf, conn, end_time);
     325                 :            :     }
     326                 :            : 
     327         [ -  + ]:         13 :     if (ptr != end)
     328 [ #  # ][ #  # ]:         25 :         throw NetworkError("Junk found at end of changeset");
                 [ #  # ]
     329                 :            : 
     330         [ +  - ]:         13 :     buf.resize(0);
     331         [ +  - ]:         13 :     pack_uint(buf, endrev);
     332                 :            : 
     333         [ +  - ]:         13 :     commit();
     334                 :            : 
     335                 :         72 :     RETURN(buf);
     336                 :            : }
     337                 :            : 
     338                 :            : string
     339                 :         18 : GlassDatabaseReplicator::get_uuid() const
     340                 :            : {
     341                 :            :     LOGCALL(DB, string, "GlassDatabaseReplicator::get_uuid", NO_ARGS);
     342         [ +  - ]:         18 :     GlassVersion version_file(db_dir);
     343                 :            :     try {
     344         [ +  - ]:         18 :         version_file.read();
     345   [ #  #  #  # ]:          0 :     } catch (const Xapian::DatabaseError &) {
     346         [ #  # ]:          0 :         RETURN(string());
     347                 :            :     }
     348         [ +  - ]:         18 :     RETURN(version_file.get_uuid_string());
     349                 :            : }

Generated by: LCOV version 1.11