/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include #include #ifdef HAVE_BOOST_ASIO #include #endif #include #include #define BUFFER_UNITTEST #include "buffer/BufferStream.hh" #include "buffer/BufferReader.hh" #include "buffer/BufferPrint.hh" using namespace rmf_avro; using std::cout; using std::endl; using detail::kDefaultBlockSize; using detail::kMinBlockSize; using detail::kMaxBlockSize; std::string makeString(size_t len) { std::string newstring; newstring.reserve(len); for(size_t i=0; i < len; ++i) { char newchar = '0' + i%16; if(newchar > '9') { newchar += 7; } newstring.push_back(newchar); } return newstring; } void printBuffer(const InputBuffer &buf) { rmf_avro::istream is(buf); cout << is.rdbuf() << endl; } void TestReserve() { BOOST_MESSAGE( "TestReserve"); { OutputBuffer ob; BOOST_CHECK_EQUAL(ob.size(), 0U); BOOST_CHECK_EQUAL(ob.freeSpace(), 0U); BOOST_CHECK_EQUAL(ob.numChunks(), 0); BOOST_CHECK_EQUAL(ob.numDataChunks(), 0); } { size_t reserveSize = kMinBlockSize/2; OutputBuffer ob (reserveSize); BOOST_CHECK_EQUAL(ob.size(), 0U); BOOST_CHECK_EQUAL(ob.freeSpace(), kMinBlockSize); BOOST_CHECK_EQUAL(ob.numChunks(), 1); BOOST_CHECK_EQUAL(ob.numDataChunks(), 0); // reserve should add a single block reserveSize += 8192; ob.reserve(reserveSize); BOOST_CHECK_EQUAL(ob.size(), 0U); BOOST_CHECK_EQUAL(ob.freeSpace(), reserveSize); BOOST_CHECK_EQUAL(ob.numChunks(), 2); BOOST_CHECK_EQUAL(ob.numDataChunks(), 0); // reserve should add two blocks, one of the maximum size and // one of the minimum size reserveSize += (kMaxBlockSize + kMinBlockSize/2); ob.reserve(reserveSize); BOOST_CHECK_EQUAL(ob.size(), 0U); BOOST_CHECK_EQUAL(ob.freeSpace(), reserveSize + kMinBlockSize/2); BOOST_CHECK_EQUAL(ob.numChunks(), 4); BOOST_CHECK_EQUAL(ob.numDataChunks(), 0); } } void addDataToBuffer(OutputBuffer &buf, size_t size) { std::string data = makeString(size); buf.writeTo(data.c_str(), data.size()); } void TestGrow() { BOOST_MESSAGE( "TestGrow"); { OutputBuffer ob; // add exactly one block addDataToBuffer(ob, kDefaultBlockSize); BOOST_CHECK_EQUAL(ob.size(), kDefaultBlockSize); BOOST_CHECK_EQUAL(ob.freeSpace(), 0U); BOOST_CHECK_EQUAL(ob.numChunks(), 0); BOOST_CHECK_EQUAL(ob.numDataChunks(), 1); // add another block, half full addDataToBuffer(ob, kDefaultBlockSize/2); BOOST_CHECK_EQUAL(ob.size(), kDefaultBlockSize + kDefaultBlockSize/2); BOOST_CHECK_EQUAL(ob.freeSpace(), kDefaultBlockSize/2); BOOST_CHECK_EQUAL(ob.numChunks(), 1); BOOST_CHECK_EQUAL(ob.numDataChunks(), 2); // reserve more capacity size_t reserveSize = ob.freeSpace() + 8192; ob.reserve(reserveSize); BOOST_CHECK_EQUAL(ob.size(), kDefaultBlockSize + kDefaultBlockSize/2); BOOST_CHECK_EQUAL(ob.freeSpace(), reserveSize); BOOST_CHECK_EQUAL(ob.numChunks(), 2); BOOST_CHECK_EQUAL(ob.numDataChunks(), 2); // fill beyond capacity addDataToBuffer(ob, reserveSize + 1); BOOST_CHECK_EQUAL(ob.size(), kDefaultBlockSize + kDefaultBlockSize/2 + reserveSize +1); BOOST_CHECK_EQUAL(ob.freeSpace(), kDefaultBlockSize - 1); BOOST_CHECK_EQUAL(ob.numChunks(), 1); BOOST_CHECK_EQUAL(ob.numDataChunks(), 4); } } void TestDiscard() { BOOST_MESSAGE( "TestDiscard"); { OutputBuffer ob; size_t dataSize = kDefaultBlockSize*2 + kDefaultBlockSize/2; addDataToBuffer(ob, dataSize); BOOST_CHECK_EQUAL(ob.size(), dataSize); BOOST_CHECK_EQUAL(ob.freeSpace(), kDefaultBlockSize/2); BOOST_CHECK_EQUAL(ob.numChunks(), 1); BOOST_CHECK_EQUAL(ob.numDataChunks(), 3); ob.discardData(); BOOST_CHECK_EQUAL(ob.size(), 0U); BOOST_CHECK_EQUAL(ob.freeSpace(), kDefaultBlockSize/2); BOOST_CHECK_EQUAL(ob.numChunks(), 1); BOOST_CHECK_EQUAL(ob.numDataChunks(), 0); } { // discard no bytes OutputBuffer ob; size_t dataSize = kDefaultBlockSize*2 + kDefaultBlockSize/2; addDataToBuffer(ob, dataSize); BOOST_CHECK_EQUAL(ob.size(), dataSize); BOOST_CHECK_EQUAL(ob.freeSpace(), kDefaultBlockSize/2); BOOST_CHECK_EQUAL(ob.numChunks(), 1); BOOST_CHECK_EQUAL(ob.numDataChunks(), 3); ob.discardData(0); BOOST_CHECK_EQUAL(ob.size(), dataSize); BOOST_CHECK_EQUAL(ob.freeSpace(), kDefaultBlockSize/2); BOOST_CHECK_EQUAL(ob.numChunks(), 1); BOOST_CHECK_EQUAL(ob.numDataChunks(), 3); } { // discard exactly one block OutputBuffer ob; size_t dataSize = kDefaultBlockSize*2 + kDefaultBlockSize/2; addDataToBuffer(ob, dataSize); BOOST_CHECK_EQUAL(ob.size(), dataSize); BOOST_CHECK_EQUAL(ob.freeSpace(), kDefaultBlockSize/2); BOOST_CHECK_EQUAL(ob.numChunks(), 1); BOOST_CHECK_EQUAL(ob.numDataChunks(), 3); ob.discardData(kDefaultBlockSize); BOOST_CHECK_EQUAL(ob.size(), dataSize - kDefaultBlockSize); BOOST_CHECK_EQUAL(ob.freeSpace(), kDefaultBlockSize/2); BOOST_CHECK_EQUAL(ob.numChunks(), 1); BOOST_CHECK_EQUAL(ob.numDataChunks(), 2); } { OutputBuffer ob; size_t dataSize = kDefaultBlockSize*2 + kDefaultBlockSize/2; addDataToBuffer(ob, dataSize); BOOST_CHECK_EQUAL(ob.size(), dataSize); BOOST_CHECK_EQUAL(ob.freeSpace(), kDefaultBlockSize/2); BOOST_CHECK_EQUAL(ob.numChunks(), 1); BOOST_CHECK_EQUAL(ob.numDataChunks(), 3); size_t remainder = dataSize % 100; // discard data 100 bytes at a time size_t discarded = 0; while(ob.size() > 100) { ob.discardData(100); dataSize -= 100; discarded += 100; BOOST_CHECK_EQUAL(ob.size(), dataSize); BOOST_CHECK_EQUAL(ob.freeSpace(), kDefaultBlockSize/2); BOOST_CHECK_EQUAL(ob.numChunks(), 1); int chunks = 3 - (discarded / kDefaultBlockSize); BOOST_CHECK_EQUAL(ob.numDataChunks(), chunks); } BOOST_CHECK_EQUAL(ob.size(), remainder); BOOST_CHECK_EQUAL(ob.freeSpace(), kDefaultBlockSize/2); BOOST_CHECK_EQUAL(ob.numChunks(), 1); BOOST_CHECK_EQUAL(ob.numDataChunks(), 1); try { ob.discardData(ob.size()+1); } catch (std::exception &e) { std::cout << "Intentionally triggered exception: " << e.what() << std::endl; } ob.discardData(ob.size()); BOOST_CHECK_EQUAL(ob.size(), 0U); BOOST_CHECK_EQUAL(ob.freeSpace(), kDefaultBlockSize/2); BOOST_CHECK_EQUAL(ob.numChunks(), 1); BOOST_CHECK_EQUAL(ob.numDataChunks(), 0); } } void TestConvertToInput() { BOOST_MESSAGE( "TestConvertToInput"); { OutputBuffer ob; size_t dataSize = kDefaultBlockSize*2 + kDefaultBlockSize/2; addDataToBuffer(ob, dataSize); InputBuffer ib(ob); BOOST_CHECK_EQUAL(ib.size(), dataSize); BOOST_CHECK_EQUAL(ib.numChunks(), 3); BOOST_CHECK_EQUAL(ob.size(), dataSize); BOOST_CHECK_EQUAL(ob.freeSpace(), kDefaultBlockSize/2); BOOST_CHECK_EQUAL(ob.numChunks(), 1); BOOST_CHECK_EQUAL(ob.numDataChunks(), 3); } } void TestExtractToInput() { BOOST_MESSAGE( "TestExtractToInput"); { OutputBuffer ob; size_t dataSize = kDefaultBlockSize*2 + kDefaultBlockSize/2; addDataToBuffer(ob, dataSize); InputBuffer ib = ob.extractData(); BOOST_CHECK_EQUAL(ib.size(), dataSize); BOOST_CHECK_EQUAL(ib.numChunks(), 3); BOOST_CHECK_EQUAL(ob.size(), 0U); BOOST_CHECK_EQUAL(ob.freeSpace(), kDefaultBlockSize/2); BOOST_CHECK_EQUAL(ob.numChunks(), 1); BOOST_CHECK_EQUAL(ob.numDataChunks(), 0); } { // extract no bytes OutputBuffer ob; size_t dataSize = kDefaultBlockSize*2 + kDefaultBlockSize/2; addDataToBuffer(ob, dataSize); InputBuffer ib = ob.extractData(0); BOOST_CHECK_EQUAL(ib.size(), 0U); BOOST_CHECK_EQUAL(ib.numChunks(), 0); BOOST_CHECK_EQUAL(ob.size(), dataSize); BOOST_CHECK_EQUAL(ob.freeSpace(), kDefaultBlockSize/2); BOOST_CHECK_EQUAL(ob.numChunks(), 1); BOOST_CHECK_EQUAL(ob.numDataChunks(), 3); } { // extract exactly one block OutputBuffer ob; size_t dataSize = kDefaultBlockSize*2 + kDefaultBlockSize/2; addDataToBuffer(ob, dataSize); InputBuffer ib = ob.extractData(kDefaultBlockSize); BOOST_CHECK_EQUAL(ib.size(), kDefaultBlockSize); BOOST_CHECK_EQUAL(ib.numChunks(), 1); BOOST_CHECK_EQUAL(ob.size(), dataSize - kDefaultBlockSize); BOOST_CHECK_EQUAL(ob.freeSpace(), kDefaultBlockSize/2); BOOST_CHECK_EQUAL(ob.numChunks(), 1); BOOST_CHECK_EQUAL(ob.numDataChunks(), 2); } { OutputBuffer ob; size_t dataSize = kDefaultBlockSize*2 + kDefaultBlockSize/2; addDataToBuffer(ob, dataSize); size_t remainder = dataSize % 100; // extract data 100 bytes at a time size_t extracted = 0; while(ob.size() > 100) { ob.extractData(100); dataSize -= 100; extracted += 100; BOOST_CHECK_EQUAL(ob.size(), dataSize); BOOST_CHECK_EQUAL(ob.freeSpace(), kDefaultBlockSize/2); BOOST_CHECK_EQUAL(ob.numChunks(), 1); int chunks = 3 - (extracted / kDefaultBlockSize); BOOST_CHECK_EQUAL(ob.numDataChunks(), chunks); } BOOST_CHECK_EQUAL(ob.size(), remainder); BOOST_CHECK_EQUAL(ob.freeSpace(), kDefaultBlockSize/2); BOOST_CHECK_EQUAL(ob.numChunks(), 1); BOOST_CHECK_EQUAL(ob.numDataChunks(), 1); try { ob.extractData(ob.size()+1); } catch (std::exception &e) { std::cout << "Intentionally triggered exception: " << e.what() << std::endl; } InputBuffer ib = ob.extractData(remainder); BOOST_CHECK_EQUAL(ib.size(), remainder); BOOST_CHECK_EQUAL(ib.numChunks(), 1); BOOST_CHECK_EQUAL(ob.size(), 0U); BOOST_CHECK_EQUAL(ob.freeSpace(), kDefaultBlockSize/2); BOOST_CHECK_EQUAL(ob.numChunks(), 1); BOOST_CHECK_EQUAL(ob.numDataChunks(), 0); } } void TestAppend() { BOOST_MESSAGE( "TestAppend"); { OutputBuffer ob; size_t dataSize = kDefaultBlockSize + kDefaultBlockSize/2; addDataToBuffer(ob, dataSize); OutputBuffer a; a.append(ob); BOOST_CHECK_EQUAL(a.size(), dataSize); BOOST_CHECK_EQUAL(a.freeSpace(), 0U); BOOST_CHECK_EQUAL(a.numChunks(), 0); BOOST_CHECK_EQUAL(a.numDataChunks(), 2); // reserve on a, then append from an input buffer a.reserve(7000); InputBuffer ib(ob); a.append(ib); BOOST_CHECK_EQUAL(a.size(), dataSize*2); BOOST_CHECK_EQUAL(a.freeSpace(), 7000U); BOOST_CHECK_EQUAL(a.numChunks(), 1); BOOST_CHECK_EQUAL(a.numDataChunks(), 4); } } void TestBufferStream() { BOOST_MESSAGE( "TestBufferStream"); { // write enough bytes to a buffer, to create at least 3 blocks std::string junk = makeString(kDefaultBlockSize); ostream os; int i = 0; for(; i < 3; ++i) { os << junk; } const OutputBuffer &buf = os.getBuffer(); cout << "Buffer has " << buf.size() << " bytes\n"; BOOST_CHECK_EQUAL(buf.size(), junk.size() * i); } } template void TestEof() { // create a message full of eof chars std::vector eofs(sizeof(T) * 3 / 2, -1); OutputBuffer buf1; buf1.writeTo(&eofs[0], eofs.size()); OutputBuffer buf2; buf2.writeTo(&eofs[0], eofs.size()); // append the buffers, so the first // character on a buffer boundary is eof buf1.append(buf2); rmf_avro::istream is(buf1); for(int i = 0; i < 3; ++i) { T d; char *addr = reinterpret_cast(&d); is.read(addr, sizeof(T)); BOOST_CHECK_EQUAL(is.gcount(), static_cast(sizeof(T))); BOOST_CHECK_EQUAL(is.eof(), false); } char c; is.read(&c, sizeof(c)); BOOST_CHECK_EQUAL(is.gcount(), 0); BOOST_CHECK_EQUAL(is.eof(), true); } void TestBufferStreamEof() { BOOST_MESSAGE( "TestBufferStreamEof"); TestEof(); TestEof(); TestEof(); TestEof(); } void TestSeekAndTell() { BOOST_MESSAGE( "TestSeekAndTell"); { std::string junk = makeString(kDefaultBlockSize/2); ostream os; // write enough bytes to a buffer, to create at least 3 blocks int i = 0; for(; i < 5; ++i) { os << junk; } const OutputBuffer &buf = os.getBuffer(); cout << "Buffer has " << buf.size() << " bytes\n"; istream is(os.getBuffer()); BOOST_CHECK_EQUAL(is.getBuffer().size(), junk.size() * i); is.seekg(2000); BOOST_CHECK_EQUAL(is.tellg(), static_cast(2000)); is.seekg(6000); BOOST_CHECK_EQUAL(is.tellg(), static_cast(6000)); is.seekg(is.getBuffer().size()); BOOST_CHECK_EQUAL(is.tellg(), static_cast(is.getBuffer().size())); is.seekg(is.getBuffer().size()+1); BOOST_CHECK_EQUAL(is.tellg(), static_cast(-1)); } } void TestReadSome() { BOOST_MESSAGE( "TestReadSome"); { std::string junk = makeString(kDefaultBlockSize/2); ostream os; // write enough bytes to a buffer, to create at least 3 blocks int i = 0; for(; i < 5; ++i) { os << junk; } cout << "Buffer has " << os.getBuffer().size() << " bytes\n"; istream is(os.getBuffer()); char datain[5000]; while(is.rdbuf()->in_avail()) { size_t bytesAvail = static_cast(is.rdbuf()->in_avail()); cout << "Bytes avail = " << bytesAvail << endl; size_t in = static_cast(is.readsome(datain, sizeof(datain))); cout << "Bytes read = " << in << endl; BOOST_CHECK_EQUAL(bytesAvail, in); } } } void TestSeek() { BOOST_MESSAGE( "TestSeek"); { const std::string str = "SampleMessage"; rmf_avro::OutputBuffer tmp1, tmp2, tmp3; tmp1.writeTo(str.c_str(), 3); // Sam tmp2.writeTo(str.c_str()+3, 7); // pleMess tmp3.writeTo(str.c_str()+10, 3); // age tmp2.append(tmp3); tmp1.append(tmp2); BOOST_CHECK_EQUAL(tmp3.numDataChunks(), 1); BOOST_CHECK_EQUAL(tmp2.numDataChunks(), 2); BOOST_CHECK_EQUAL(tmp1.numDataChunks(), 3); rmf_avro::InputBuffer buf(tmp1); cout << "Starting string: " << str << '\n'; BOOST_CHECK_EQUAL(static_cast( buf.size()) , str.size()); rmf_avro::istream is(buf); const std::string part1 = "Sample"; char buffer[16]; is.read(buffer, part1.size()); std::string sample1(buffer, part1.size()); cout << "After reading bytes: " << sample1 << '\n'; BOOST_CHECK_EQUAL(sample1, part1); const std::string part2 = "Message"; is.read(buffer, part2.size()); std::string sample2(buffer, part2.size()); cout << "After reading remaining bytes: " << sample2 << '\n'; BOOST_CHECK_EQUAL(sample2, part2); cout << "Seeking back " << '\n'; is.seekg( - static_cast(part2.size()), std::ios_base::cur); std::streampos loc = is.tellg(); cout << "Saved loc = " << loc << '\n'; BOOST_CHECK_EQUAL(static_cast( loc ), (str.size()-part2.size())); cout << "Reading remaining bytes: " << is.rdbuf() << '\n'; cout << "bytes avail = " << is.rdbuf()->in_avail() << '\n'; BOOST_CHECK_EQUAL(is.rdbuf()->in_avail(), 0); cout << "Moving to saved loc = " << loc << '\n'; is.seekg(loc); cout << "bytes avail = " << is.rdbuf()->in_avail() << '\n'; std::ostringstream oss; oss << is.rdbuf(); cout << "After reading bytes: " << oss.str() << '\n'; BOOST_CHECK_EQUAL(oss.str(), part2); } } void TestIterator() { BOOST_MESSAGE( "TestIterator"); { OutputBuffer ob(2 * kMaxBlockSize + 10); BOOST_CHECK_EQUAL(ob.numChunks(), 3); BOOST_CHECK_EQUAL(ob.size(), 0U); BOOST_CHECK_EQUAL(ob.freeSpace(), 2 * kMaxBlockSize + kMinBlockSize); BOOST_CHECK_EQUAL (std::distance(ob.begin(), ob.end()), 3); OutputBuffer::const_iterator iter = ob.begin(); BOOST_CHECK_EQUAL( iter->size(), kMaxBlockSize); ++iter; BOOST_CHECK_EQUAL( iter->size(), kMaxBlockSize); ++iter; BOOST_CHECK_EQUAL( iter->size(), kMinBlockSize); ++iter; BOOST_CHECK( iter == ob.end()); size_t toWrite = kMaxBlockSize + kMinBlockSize; ob.wroteTo(toWrite); BOOST_CHECK_EQUAL(ob.size(), toWrite); BOOST_CHECK_EQUAL(ob.freeSpace(), kMaxBlockSize); BOOST_CHECK_EQUAL(ob.numChunks(), 2); BOOST_CHECK_EQUAL(ob.numDataChunks(), 2); InputBuffer ib = ob; BOOST_CHECK_EQUAL (std::distance(ib.begin(), ib.end()), 2); size_t acc = 0; for(OutputBuffer::const_iterator iter = ob.begin(); iter != ob.end(); ++iter) { acc += iter->size(); } BOOST_CHECK_EQUAL(ob.freeSpace(), acc); try { ob.wroteTo(acc+1); } catch (std::exception &e) { std::cout << "Intentionally triggered exception: " << e.what() << std::endl; } } } #ifdef HAVE_BOOST_ASIO void server(boost::barrier &b) { using boost::asio::ip::tcp; boost::asio::io_service io_service; tcp::acceptor a(io_service, tcp::endpoint(tcp::v4(), 33333)); tcp::socket sock(io_service); a.listen(); b.wait(); a.accept(sock); rmf_avro::OutputBuffer buf(100); size_t length = sock.receive(buf); buf.wroteTo(length); cout << "Server got " << length << " bytes\n"; InputBuffer rbuf(buf); std::string res; rmf_avro::InputBuffer::const_iterator iter = rbuf.begin(); while(iter != rbuf.end() ) { res.append(boost::asio::buffer_cast(*iter), boost::asio::buffer_size(*iter)); cout << "Received Buffer size: " << boost::asio::buffer_size(*iter) << endl; BOOST_CHECK_EQUAL(length, boost::asio::buffer_size(*iter)); cout << "Received Buffer: \"" << res << '"' << endl; ++iter; } BOOST_CHECK_EQUAL(res, "hello world"); } void TestAsioBuffer() { using boost::asio::ip::tcp; BOOST_MESSAGE( "TestAsioBuffer"); { boost::barrier b(2); boost::thread t(boost::bind(server, boost::ref(b))); b.wait(); // set up the thing boost::asio::io_service io_service; tcp::resolver resolver(io_service); tcp::resolver::query query(tcp::v4(), "localhost", "33333"); tcp::resolver::iterator endpoint_iterator = resolver.resolve(query); tcp::resolver::iterator end; tcp::socket socket(io_service); boost::system::error_code error = boost::asio::error::host_not_found; while (error && endpoint_iterator != end) { socket.close(); socket.connect(*endpoint_iterator++, error); } if (error) { throw error; } std::string hello = "hello "; std::string world = "world"; rmf_avro::OutputBuffer buf; buf.writeTo(hello.c_str(), hello.size()); BOOST_CHECK_EQUAL(buf.size(), hello.size()); rmf_avro::OutputBuffer buf2; buf2.writeTo(world.c_str(), world.size()); BOOST_CHECK_EQUAL(buf2.size(), world.size()); buf.append(buf2); BOOST_CHECK_EQUAL(buf.size(), hello.size() + world.size()); cout << "Distance " << std::distance(buf.begin(), buf.end()) << endl; BOOST_CHECK_EQUAL(std::distance(buf.begin(), buf.end()), 1); const rmf_avro::InputBuffer rbuf(buf); rmf_avro::InputBuffer::const_iterator iter = rbuf.begin(); while(iter != rbuf.end() ) { std::string str(boost::asio::buffer_cast(*iter), boost::asio::buffer_size(*iter)); cout << "Buffer size: " << boost::asio::buffer_size(*iter) << endl; cout << "Buffer: \"" << str << '"' << endl; ++iter; } cout << "Buffer size " << rbuf.size() << endl; std::size_t wrote = boost::asio::write(socket, rbuf); cout << "Wrote " << wrote << endl; BOOST_CHECK_EQUAL(wrote, rbuf.size()); t.join(); } } #else void TestAsioBuffer() { cout << "Skipping asio test\n"; } #endif // HAVE_BOOST_ASIO void TestSplit() { BOOST_MESSAGE( "TestSplit"); { const std::string str = "This message is to be split"; rmf_avro::OutputBuffer buf; buf.writeTo(str.c_str(), str.size()); char datain[12]; rmf_avro::istream is(buf); size_t in = static_cast(is.readsome(datain, sizeof(datain))); BOOST_CHECK_EQUAL(in, sizeof(datain)); BOOST_CHECK_EQUAL(static_cast(is.tellg()), sizeof(datain)); OutputBuffer part2; part2.append(is.getBuffer()); BOOST_CHECK_EQUAL(part2.size(), buf.size()); InputBuffer part1 = part2.extractData(static_cast(is.tellg())); BOOST_CHECK_EQUAL(part2.size(), str.size() - in); printBuffer(part1); printBuffer(part2); } } void TestSplitOnBorder() { BOOST_MESSAGE( "TestSplitOnBorder"); { const std::string part1 = "This message"; const std::string part2 = " is to be split"; rmf_avro::OutputBuffer buf; buf.writeTo(part1.c_str(), part1.size()); size_t firstChunkSize = buf.size(); { rmf_avro::OutputBuffer tmp; tmp.writeTo(part2.c_str(), part2.size()); buf.append(tmp); printBuffer(InputBuffer(buf)); } BOOST_CHECK_EQUAL(buf.numDataChunks(), 2); size_t bufsize = buf.size(); boost::scoped_array datain(new char[firstChunkSize]); rmf_avro::istream is(buf); size_t in = static_cast(is.readsome(&datain[0], firstChunkSize)); BOOST_CHECK_EQUAL(in, firstChunkSize); OutputBuffer newBuf; newBuf.append(is.getBuffer()); newBuf.discardData(static_cast(is.tellg())); BOOST_CHECK_EQUAL(newBuf.numDataChunks(), 1); BOOST_CHECK_EQUAL(newBuf.size(), bufsize - in); cout << is.rdbuf() << endl; printBuffer(newBuf); } } void TestSplitTwice() { BOOST_MESSAGE( "TestSplitTwice"); { const std::string msg1 = makeString(30); rmf_avro::OutputBuffer buf1; buf1.writeTo(msg1.c_str(), msg1.size()); BOOST_CHECK_EQUAL(buf1.size(), msg1.size()); printBuffer(buf1); rmf_avro::istream is(buf1); char buffer[6]; is.readsome(buffer, 5); buffer[5] = 0; std::cout << "buffer =" << buffer << std::endl; buf1.discardData(static_cast(is.tellg())); printBuffer(buf1); rmf_avro::istream is2(buf1); is2.seekg(15); buf1.discardData(static_cast(is2.tellg())); printBuffer(buf1); } } void TestCopy() { BOOST_MESSAGE( "TestCopy"); const std::string msg = makeString(30); // Test1, small data, small buffer { std::cout << "Test1\n"; // put a small amount of data in the buffer rmf_avro::OutputBuffer wb; wb.writeTo(msg.c_str(), msg.size()); BOOST_CHECK_EQUAL(msg.size(), wb.size()); BOOST_CHECK_EQUAL(wb.numDataChunks(), 1); BOOST_CHECK_EQUAL(kDefaultBlockSize - msg.size(), wb.freeSpace()); // copy starting at offset 5 and copying 10 less bytes BufferReader br(wb); br.seek(5); rmf_avro::InputBuffer ib = br.copyData(msg.size() - 10); printBuffer(ib); BOOST_CHECK_EQUAL(ib.numChunks(), 1); BOOST_CHECK_EQUAL(ib.size(), msg.size()-10); // buf 1 should be unchanged BOOST_CHECK_EQUAL(msg.size(), wb.size()); BOOST_CHECK_EQUAL(wb.numDataChunks(), 1); BOOST_CHECK_EQUAL(kDefaultBlockSize - msg.size(), wb.freeSpace()); // make sure wb is still functional wb.reserve(kDefaultBlockSize); BOOST_CHECK_EQUAL(wb.size(), msg.size()); BOOST_CHECK_EQUAL(wb.numChunks(), 2); BOOST_CHECK_EQUAL(kDefaultBlockSize * 2 - msg.size(), wb.freeSpace()); } // Test2, small data, large buffer { std::cout << "Test2\n"; // put a small amount of data in the buffer const OutputBuffer::size_type bufsize= 3*kMaxBlockSize; rmf_avro::OutputBuffer wb(bufsize); BOOST_CHECK_EQUAL(wb.numChunks(), 3); BOOST_CHECK_EQUAL(wb.freeSpace(), bufsize); wb.writeTo(msg.c_str(), msg.size()); BOOST_CHECK_EQUAL(wb.size(), msg.size()); BOOST_CHECK_EQUAL(wb.numDataChunks(), 1); BOOST_CHECK_EQUAL(bufsize - msg.size(), wb.freeSpace()); BufferReader br(wb); br.seek(5); rmf_avro::InputBuffer ib = br.copyData(msg.size() - 10); printBuffer(ib); BOOST_CHECK_EQUAL(ib.numChunks(), 1); BOOST_CHECK_EQUAL(ib.size(), msg.size()-10); // wb should be unchanged BOOST_CHECK_EQUAL(msg.size(), wb.size()); BOOST_CHECK_EQUAL(wb.numChunks(), 3); BOOST_CHECK_EQUAL(wb.numDataChunks(), 1); BOOST_CHECK_EQUAL(bufsize - msg.size(), wb.freeSpace()); // reserving a small amount should have no effect wb.reserve(1); BOOST_CHECK_EQUAL(msg.size(), wb.size()); BOOST_CHECK_EQUAL(wb.numChunks(), 3); BOOST_CHECK_EQUAL(bufsize - msg.size(), wb.freeSpace()); // reserve more (will get extra block) wb.reserve(bufsize); BOOST_CHECK_EQUAL(msg.size(), wb.size()); BOOST_CHECK_EQUAL(wb.numChunks(), 4); BOOST_CHECK_EQUAL(kMaxBlockSize * 3 - msg.size() + kMinBlockSize, wb.freeSpace()); } // Test3 Border case, buffer is exactly full { std::cout << "Test3\n"; const OutputBuffer::size_type bufsize= 2*kDefaultBlockSize; rmf_avro::OutputBuffer wb; for(unsigned i = 0; i < bufsize; ++i) { wb.writeTo('a'); } BOOST_CHECK_EQUAL(wb.size(), bufsize); BOOST_CHECK_EQUAL(wb.freeSpace(), 0U); BOOST_CHECK_EQUAL(wb.numChunks(), 0); BOOST_CHECK_EQUAL(wb.numDataChunks(), 2); // copy where the chunks overlap BufferReader br(wb); br.seek(bufsize/2 - 10); rmf_avro::InputBuffer ib = br.copyData(20); printBuffer(ib); BOOST_CHECK_EQUAL(ib.size(), 20U); BOOST_CHECK_EQUAL(ib.numChunks(), 2); // wb should be unchanged BOOST_CHECK_EQUAL(wb.size(), bufsize); BOOST_CHECK_EQUAL(wb.freeSpace(), 0U); BOOST_CHECK_EQUAL(wb.numDataChunks(), 2); } // Test4, no data { const OutputBuffer::size_type bufsize= 2*kMaxBlockSize; std::cout << "Test4\n"; rmf_avro::OutputBuffer wb(bufsize); BOOST_CHECK_EQUAL(wb.numChunks(), 2); BOOST_CHECK_EQUAL(wb.size(), 0U); BOOST_CHECK_EQUAL(wb.freeSpace(), bufsize); rmf_avro::InputBuffer ib; try { BufferReader br(wb); br.seek(10); } catch (std::exception &e) { cout << "Intentially triggered exception: " << e.what() << endl; } try { BufferReader br(wb); rmf_avro::InputBuffer ib = br.copyData(10); } catch (std::exception &e) { cout << "Intentially triggered exception: " << e.what() << endl; } BOOST_CHECK_EQUAL(ib.numChunks(), 0); BOOST_CHECK_EQUAL(ib.size(), 0U); // wb should keep all blocks remaining BOOST_CHECK_EQUAL(wb.numChunks(), 2); BOOST_CHECK_EQUAL(wb.size(), 0U); BOOST_CHECK_EQUAL(wb.freeSpace(), bufsize); } } // this is reproducing a sequence of steps that caused a crash void TestBug() { BOOST_MESSAGE( "TestBug"); { OutputBuffer rxBuf; OutputBuffer buf; rxBuf.reserve(64 * 1024); rxBuf.wroteTo(2896); { rmf_avro::InputBuffer ib(rxBuf.extractData()); buf.append(ib); } buf.discardData(61); rxBuf.reserve(64 * 1024); rxBuf.wroteTo(381); { rmf_avro::InputBuffer ib(rxBuf.extractData()); buf.append(ib); } buf.discardData(3216); rxBuf.reserve(64 * 1024); } } bool safeToDelete = false; void deleteForeign(const std::string &val) { std::cout << "Deleting foreign string containing " << val << '\n'; BOOST_CHECK(safeToDelete); } void TestForeign () { BOOST_MESSAGE( "TestForeign"); { std::string hello = "hello "; std::string there = "there "; std::string world = "world "; OutputBuffer copy; { OutputBuffer buf; buf.writeTo(hello.c_str(), hello.size()); buf.appendForeignData(there.c_str(), there.size(), boost::bind(&deleteForeign, there)); buf.writeTo(world.c_str(), world.size()); printBuffer(buf); BOOST_CHECK_EQUAL(buf.size(), 18U); copy = buf; } std::cout << "Leaving inner scope\n"; safeToDelete = true; } std::cout << "Leaving outer scope\n"; safeToDelete = false; } void TestForeignDiscard () { BOOST_MESSAGE( "TestForeign"); { std::string hello = "hello "; std::string again = "again "; std::string there = "there "; std::string world = "world "; OutputBuffer buf; buf.writeTo(hello.c_str(), hello.size()); buf.appendForeignData(again.c_str(), again.size(), boost::bind(&deleteForeign, again)); buf.appendForeignData(there.c_str(), there.size(), boost::bind(&deleteForeign, there)); buf.writeTo(world.c_str(), world.size()); printBuffer(buf); BOOST_CHECK_EQUAL(buf.size(), 24U); // discard some data including half the foreign buffer buf.discardData(9); printBuffer(buf); BOOST_CHECK_EQUAL(buf.size(), 15U); // discard some more data, which will lop off the first foreign buffer safeToDelete = true; buf.discardData(6); safeToDelete = false; printBuffer(buf); BOOST_CHECK_EQUAL(buf.size(), 9U); // discard some more data, which will lop off the second foreign buffer safeToDelete = true; buf.discardData(3); safeToDelete = false; printBuffer(buf); BOOST_CHECK_EQUAL(buf.size(), 6U); } } void TestPrinter() { BOOST_MESSAGE( "TestPrinter"); { OutputBuffer ob; addDataToBuffer(ob, 128); std::cout << ob << std::endl; } } struct BufferTestSuite : public boost::unit_test::test_suite { BufferTestSuite() : boost::unit_test::test_suite("BufferTestSuite") { add (BOOST_TEST_CASE( TestReserve )); add (BOOST_TEST_CASE( TestGrow )); add (BOOST_TEST_CASE( TestDiscard )); add (BOOST_TEST_CASE( TestConvertToInput )); add (BOOST_TEST_CASE( TestExtractToInput )); add (BOOST_TEST_CASE( TestAppend )); add (BOOST_TEST_CASE( TestBufferStream )); add (BOOST_TEST_CASE( TestBufferStreamEof )); add (BOOST_TEST_CASE( TestSeekAndTell )); add (BOOST_TEST_CASE( TestReadSome )); add (BOOST_TEST_CASE( TestSeek)); add (BOOST_TEST_CASE( TestIterator)); add (BOOST_TEST_CASE( TestAsioBuffer)); add (BOOST_TEST_CASE( TestSplit)); add (BOOST_TEST_CASE( TestSplitOnBorder)); add (BOOST_TEST_CASE( TestSplitTwice)); add (BOOST_TEST_CASE( TestCopy)); add (BOOST_TEST_CASE( TestBug)); add (BOOST_TEST_CASE( TestForeign)); add (BOOST_TEST_CASE( TestForeignDiscard)); add (BOOST_TEST_CASE( TestPrinter)); } }; boost::unit_test::test_suite* init_unit_test_suite( int, char* [] ) { boost::unit_test::test_suite *test (BOOST_TEST_SUITE ("Buffer Unit Tests")); test->add (new BufferTestSuite() ); return test; }