/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #define __STDC_LIMIT_MACROS #include #include #include #include #include #include #include #include #include #include #include "ValidatingCodec.hh" #include "Symbol.hh" #include "Types.hh" #include "ValidSchema.hh" #include "Decoder.hh" #include "Encoder.hh" #include "NodeImpl.hh" namespace rmf_avro { using boost::make_shared; namespace parsing { using boost::shared_ptr; using boost::static_pointer_cast; using std::map; using std::pair; using std::vector; using std::string; using std::reverse; using std::ostringstream; using std::istringstream; using std::stack; using std::find_if; using std::make_pair; typedef pair NodePair; class ResolvingGrammarGenerator : public ValidatingGrammarGenerator { Production doGenerate(const NodePtr& writer, const NodePtr& reader, map > &m, const map > &m2); Production resolveRecords(const NodePtr& writer, const NodePtr& reader, map > &m, const map > &m2); Production resolveUnion(const NodePtr& writer, const NodePtr& reader, map > &m, const map > &m2); static vector > fields(const NodePtr& n) { vector > result; size_t c = n->names(); for (size_t i = 0; i < c; ++i) { result.push_back(make_pair(n->nameAt(i), i)); } return result; } static int bestBranch(const NodePtr& writer, const NodePtr& reader); Production getWriterProduction(const NodePtr& n, const map >& m2); public: Symbol generate( const ValidSchema& writer, const ValidSchema& reader); }; Symbol ResolvingGrammarGenerator::generate( const ValidSchema& writer, const ValidSchema& reader) { map > m2; const NodePtr& rr = reader.root(); const NodePtr& rw = writer.root(); Production backup = ValidatingGrammarGenerator::doGenerate(rw, m2); fixup(backup, m2); map > m; Production main = doGenerate(rr, rw, m, m2); fixup(main, m); return Symbol::rootSymbol(main, backup); } int ResolvingGrammarGenerator::bestBranch(const NodePtr& writer, const NodePtr& reader) { Type t = writer->type(); const size_t c = reader->leaves(); for (size_t j = 0; j < c; ++j) { const NodePtr& r = reader->leafAt(j); if (t == r->type()) { if (r->hasName()) { if (r->name() == writer->name()) { return j; } } else { return j; } } } for (size_t j = 0; j < c; ++j) { const NodePtr& r = reader->leafAt(j); Type rt = r->type(); switch (t) { case AVRO_INT: if (rt == AVRO_LONG || rt == AVRO_DOUBLE || rt == AVRO_FLOAT) { return j; } break; case AVRO_LONG: case AVRO_FLOAT: if (rt == AVRO_DOUBLE) { return j; } break; default: break; } } return -1; } template struct equalsFirst { const T1& v_; equalsFirst(const T1& v) : v_(v) { } bool operator()(const pair& p) { return p.first == v_; } }; Production ResolvingGrammarGenerator::getWriterProduction(const NodePtr& n, const map >& m2) { const NodePtr& nn = (n->type() == AVRO_SYMBOLIC) ? static_cast(*n).getNode() : n; map >::const_iterator it2 = m2.find(nn); return (it2 != m2.end()) ? *(it2->second) : ValidatingGrammarGenerator::generate(nn); } Production ResolvingGrammarGenerator::resolveRecords( const NodePtr& writer, const NodePtr& reader, map >& m, const map >& m2) { Production result; vector > wf = fields(writer); vector > rf = fields(reader); vector fieldOrder; fieldOrder.reserve(reader->names()); for (vector >::const_iterator it = wf.begin(); it != wf.end(); ++it) { vector >::iterator it2 = find_if(rf.begin(), rf.end(), equalsFirst(it->first)); if (it2 != rf.end()) { Production p = doGenerate(writer->leafAt(it->second), reader->leafAt(it2->second), m, m2); copy(p.rbegin(), p.rend(), back_inserter(result)); fieldOrder.push_back(it2->second); rf.erase(it2); } else { Production p = getWriterProduction(writer->leafAt(it->second), m2); result.push_back(Symbol::skipStart()); if (p.size() == 1) { result.push_back(p[0]); } else { result.push_back(Symbol::indirect(boost::make_shared(p))); } } } if (! rf.empty()) { throw Exception("Don't know how to handle excess fields for reader."); } reverse(result.begin(), result.end()); result.push_back(Symbol::sizeListAction(fieldOrder)); result.push_back(Symbol::recordAction()); return result; } Production ResolvingGrammarGenerator::resolveUnion( const NodePtr& writer, const NodePtr& reader, map >& m, const map >& m2) { vector v; size_t c = writer->leaves(); v.reserve(c); for (size_t i = 0; i < c; ++i) { Production p = doGenerate(writer->leafAt(i), reader, m, m2); v.push_back(p); } Symbol r[] = { Symbol::alternative(v), Symbol::writerUnionAction() }; return Production(r, r + 2); } Production ResolvingGrammarGenerator::doGenerate( const NodePtr& writer, const NodePtr& reader, map > &m, const map > &m2) { Type writerType = writer->type(); Type readerType = reader->type(); if (writerType == readerType) { switch (writerType) { case AVRO_NULL: return Production(1, Symbol::nullSymbol()); case AVRO_BOOL: return Production(1, Symbol::boolSymbol()); case AVRO_INT: return Production(1, Symbol::intSymbol()); case AVRO_LONG: return Production(1, Symbol::longSymbol()); case AVRO_FLOAT: return Production(1, Symbol::floatSymbol()); case AVRO_DOUBLE: return Production(1, Symbol::doubleSymbol()); case AVRO_STRING: return Production(1, Symbol::stringSymbol()); case AVRO_BYTES: return Production(1, Symbol::bytesSymbol()); case AVRO_FIXED: if (writer->name() == reader->name() && writer->fixedSize() == reader->fixedSize()) { Symbol r[] = { Symbol::sizeCheckSymbol(reader->fixedSize()), Symbol::fixedSymbol() }; Production result(r, r + 2); m[make_pair(writer, reader)] = boost::make_shared(result); return result; } break; case AVRO_RECORD: if (writer->name() == reader->name()) { const pair key(writer, reader); m.erase(key); Production result = resolveRecords(writer, reader, m, m2); const bool found = m.find(key) != m.end(); shared_ptr p = boost::make_shared(result); m[key] = p; return found ? Production(1, Symbol::indirect(p)) : result; } break; case AVRO_ENUM: if (writer->name() == reader->name()) { Symbol r[] = { Symbol::enumAdjustSymbol(writer, reader), Symbol::enumSymbol(), }; Production result(r, r + 2); m[make_pair(writer, reader)] = boost::make_shared(result); return result; } break; case AVRO_ARRAY: { Production p = getWriterProduction(writer->leafAt(0), m2); Symbol r[] = { Symbol::arrayEndSymbol(), Symbol::repeater( doGenerate(writer->leafAt(0), reader->leafAt(0), m, m2), p, true), Symbol::arrayStartSymbol() }; return Production(r, r + 3); } case AVRO_MAP: { Production v = doGenerate(writer->leafAt(1), reader->leafAt(1), m, m2); v.push_back(Symbol::stringSymbol()); Production v2 = getWriterProduction(writer->leafAt(1), m2); v2.push_back(Symbol::stringSymbol()); Symbol r[] = { Symbol::mapEndSymbol(), Symbol::repeater(v, v2, false), Symbol::mapStartSymbol() }; return Production(r, r + 3); } case AVRO_UNION: return resolveUnion(writer, reader, m, m2); case AVRO_SYMBOLIC: { shared_ptr w = static_pointer_cast(writer); shared_ptr r = static_pointer_cast(reader); NodePair p(w->getNode(), r->getNode()); map >::iterator it = m.find(p); if (it != m.end() && it->second) { return *it->second; } else { m[p] = shared_ptr(); return Production(1, Symbol::placeholder(p)); } } default: throw Exception("Unknown node type"); } } else if (writerType == AVRO_UNION) { return resolveUnion(writer, reader, m, m2); } else { switch (readerType) { case AVRO_LONG: if (writerType == AVRO_INT) { return Production(1, Symbol::resolveSymbol(Symbol::sInt, Symbol::sLong)); } break; case AVRO_FLOAT: if (writerType == AVRO_INT || writerType == AVRO_LONG) { return Production(1, Symbol::resolveSymbol(writerType == AVRO_INT ? Symbol::sInt : Symbol::sLong, Symbol::sFloat)); } break; case AVRO_DOUBLE: if (writerType == AVRO_INT || writerType == AVRO_LONG || writerType == AVRO_FLOAT) { return Production(1, Symbol::resolveSymbol(writerType == AVRO_INT ? Symbol::sInt : writerType == AVRO_LONG ? Symbol::sLong : Symbol::sFloat, Symbol::sDouble)); } break; case AVRO_UNION: { int j = bestBranch(writer, reader); if (j >= 0) { Production p = doGenerate(writer, reader->leafAt(j), m, m2); Symbol r[] = { Symbol::unionAdjustSymbol(j, p), Symbol::unionSymbol() }; return Production(r, r + 2); } } break; case AVRO_NULL: case AVRO_BOOL: case AVRO_INT: case AVRO_STRING: case AVRO_BYTES: case AVRO_ENUM: case AVRO_ARRAY: case AVRO_MAP: case AVRO_RECORD: break; default: throw Exception("Unknown node type"); } } return Production(1, Symbol::error(writer, reader)); } class ResolvingDecoderHandler { Decoder& base_; public: ResolvingDecoderHandler(Decoder& base) : base_(base) { } size_t handle(const Symbol& s) { switch (s.kind()) { case Symbol::sWriterUnion: return base_.decodeUnionIndex(); default: return 0; } } }; template class ResolvingDecoderImpl : public ResolvingDecoder { const DecoderPtr base_; ResolvingDecoderHandler handler_; Parser parser_; void init(InputStream& is); void decodeNull(); bool decodeBool(); int32_t decodeInt(); int64_t decodeLong(); float decodeFloat(); double decodeDouble(); void decodeString(string& value); void skipString(); void decodeBytes(vector& value); void skipBytes(); void decodeFixed(size_t n, vector& value); void skipFixed(size_t n); size_t decodeEnum(); size_t arrayStart(); size_t arrayNext(); size_t skipArray(); size_t mapStart(); size_t mapNext(); size_t skipMap(); size_t decodeUnionIndex(); const vector& fieldOrder(); public: ResolvingDecoderImpl(const ValidSchema& writer, const ValidSchema& reader, const DecoderPtr& base) : base_(base), handler_(*base_), parser_(ResolvingGrammarGenerator().generate(reader, writer), &(*base_), handler_) { } }; template void ResolvingDecoderImpl

::init(InputStream& is) { base_->init(is); parser_.reset(); } template void ResolvingDecoderImpl

::decodeNull() { parser_.advance(Symbol::sNull); base_->decodeNull(); } template bool ResolvingDecoderImpl

::decodeBool() { parser_.advance(Symbol::sBool); return base_->decodeBool(); } template int32_t ResolvingDecoderImpl

::decodeInt() { parser_.advance(Symbol::sInt); return base_->decodeInt(); } template int64_t ResolvingDecoderImpl

::decodeLong() { Symbol::Kind k = parser_.advance(Symbol::sLong); return k == Symbol::sInt ? base_->decodeInt() : base_->decodeLong(); } template float ResolvingDecoderImpl

::decodeFloat() { Symbol::Kind k = parser_.advance(Symbol::sFloat); return k == Symbol::sInt ? base_->decodeInt() : k == Symbol::sLong ? base_->decodeLong() : base_->decodeFloat(); } template double ResolvingDecoderImpl

::decodeDouble() { Symbol::Kind k = parser_.advance(Symbol::sDouble); return k == Symbol::sInt ? base_->decodeInt() : k == Symbol::sLong ? base_->decodeLong() : k == Symbol::sFloat ? base_->decodeFloat() : base_->decodeDouble(); } template void ResolvingDecoderImpl

::decodeString(string& value) { parser_.advance(Symbol::sString); base_->decodeString(value); } template void ResolvingDecoderImpl

::skipString() { parser_.advance(Symbol::sString); base_->skipString(); } template void ResolvingDecoderImpl

::decodeBytes(vector& value) { parser_.advance(Symbol::sBytes); base_->decodeBytes(value); } template void ResolvingDecoderImpl

::skipBytes() { parser_.advance(Symbol::sBytes); base_->skipBytes(); } template void ResolvingDecoderImpl

::decodeFixed(size_t n, vector& value) { parser_.advance(Symbol::sFixed); parser_.assertSize(n); return base_->decodeFixed(n, value); } template void ResolvingDecoderImpl

::skipFixed(size_t n) { parser_.advance(Symbol::sFixed); parser_.assertSize(n); base_->skipFixed(n); } template size_t ResolvingDecoderImpl

::decodeEnum() { parser_.advance(Symbol::sEnum); size_t n = base_->decodeEnum(); return parser_.enumAdjust(n); } template size_t ResolvingDecoderImpl

::arrayStart() { parser_.advance(Symbol::sArrayStart); size_t result = base_->arrayStart(); if (result == 0) { parser_.popRepeater(); parser_.advance(Symbol::sArrayEnd); } else { parser_.setRepeatCount(result); } return result; } template size_t ResolvingDecoderImpl

::arrayNext() { size_t result = base_->arrayNext(); if (result == 0) { parser_.popRepeater(); parser_.advance(Symbol::sArrayEnd); } else { parser_.setRepeatCount(result); } return result; } template size_t ResolvingDecoderImpl

::skipArray() { parser_.advance(Symbol::sArrayStart); size_t n = base_->skipArray(); if (n == 0) { parser_.pop(); } else { parser_.setRepeatCount(n); parser_.skip(*base_); } parser_.advance(Symbol::sArrayEnd); return 0; } template size_t ResolvingDecoderImpl

::mapStart() { parser_.advance(Symbol::sMapStart); size_t result = base_->mapStart(); if (result == 0) { parser_.popRepeater(); parser_.advance(Symbol::sMapEnd); } else { parser_.setRepeatCount(result); } return result; } template size_t ResolvingDecoderImpl

::mapNext() { size_t result = base_->mapNext(); if (result == 0) { parser_.popRepeater(); parser_.advance(Symbol::sMapEnd); } else { parser_.setRepeatCount(result); } return result; } template size_t ResolvingDecoderImpl

::skipMap() { parser_.advance(Symbol::sMapStart); size_t n = base_->skipMap(); if (n == 0) { parser_.pop(); } else { parser_.setRepeatCount(n); parser_.skip(*base_); } parser_.advance(Symbol::sMapEnd); return 0; } template size_t ResolvingDecoderImpl

::decodeUnionIndex() { parser_.advance(Symbol::sUnion); return parser_.unionAdjust(); } template const vector& ResolvingDecoderImpl

::fieldOrder() { parser_.advance(Symbol::sRecord); return parser_.sizeList(); } } // namespace parsing ResolvingDecoderPtr resolvingDecoder(const ValidSchema& writer, const ValidSchema& reader, const DecoderPtr& base) { return make_shared > >( writer, reader, base); } } // namespace rmf_avro