25 #ifndef SRC_XRDZIP_XRDZIPINFLCACHE_HH_
26 #define SRC_XRDZIP_XRDZIPINFLCACHE_HH_
62 typedef std::tuple<uint64_t, uint32_t, void*, ResponseHandler*> read_args_t;
63 typedef std::tuple<XRootDStatus, uint64_t, buffer_t> read_resp_t;
65 struct greater_read_resp_t
67 inline bool operator() (
const read_resp_t &lhs,
const read_resp_t &rhs )
const
69 return std::get<1>( lhs ) > std::get<1>( rhs );
73 typedef std::priority_queue<read_resp_t, std::vector<read_resp_t>, greater_read_resp_t> resp_queue_t;
83 strm.next_in = Z_NULL;
85 strm.next_out = Z_NULL;
89 int rc = inflateInit2( &strm, -MAX_WBITS );
101 std::unique_lock<std::mutex> lck( mtx );
102 rdreqs.emplace( offset, length, buffer, handler );
108 std::unique_lock<std::mutex> lck( mtx );
109 rdrsps.emplace( st, offset, std::move( buffer ) );
115 inline bool HasInput()
const
117 return strm.avail_in != 0;
120 inline bool HasOutput()
const
122 return strm.avail_out != 0;
125 inline void Input(
const read_resp_t &rdrsp )
127 const buffer_t &buffer = std::get<2>( rdrsp );
128 strm.avail_in = buffer.size();
129 strm.next_in = (Bytef*)buffer.data();
132 inline void Output(
const read_args_t &rdreq )
134 strm.avail_out = std::get<1>( rdreq );
135 strm.next_out = (Bytef*)std::get<2>( rdreq );
138 inline bool Consecutive(
const read_resp_t &resp )
const
140 return ( std::get<1>( resp ) == inabsoff );
145 while( HasInput() || HasOutput() || !rdreqs.empty() || !rdrsps.empty() )
147 if( !HasOutput() && !rdreqs.empty() )
148 Output( rdreqs.front() );
150 if( !HasInput() && !rdrsps.empty() && Consecutive( rdrsps.top() ) )
151 Input( rdrsps.top() );
153 if( !HasInput() || !HasOutput() )
return;
156 XRootDStatus st = std::get<0>( rdrsps.top() );
157 if( !st.IsOK() )
return CallHandler( st );
160 uInt avail_before = strm.avail_in;
162 int rc = inflate( &strm, Z_SYNC_FLUSH );
163 st = ToXRootDStatus( rc,
"inflate" );
164 if( !st.IsOK() )
return CallHandler( st );
166 inabsoff += avail_before - strm.avail_in;
168 if( !strm.avail_out )
169 CallHandler( XRootDStatus() );
174 if( !strm.avail_in && !rdrsps.empty() )
179 static inline AnyObject* PkgRsp( ChunkInfo *chunk )
181 if( !chunk )
return nullptr;
182 AnyObject *rsp =
new AnyObject();
187 inline void CallHandler(
const XRootDStatus &st )
189 if( rdreqs.empty() )
return;
190 read_args_t args = std::move( rdreqs.front() );
193 ChunkInfo *chunk =
nullptr;
194 if( st.IsOK() ) chunk =
new ChunkInfo( std::get<0>( args ),
196 std::get<2>( args ) );
198 ResponseHandler *handler = std::get<3>( args );
199 handler->HandleResponse(
new XRootDStatus( st ), PkgRsp( chunk ) );
204 std::string msg =
"[zlib] " + func +
" : ";
224 std::queue<read_args_t> rdreqs;
Handle an async response.
Utility class for inflating a compressed buffer.
void QueueRsp(const XRootDStatus &st, uint64_t offset, buffer_t &&buffer)
void QueueReq(uint64_t offset, uint32_t length, void *buffer, ResponseHandler *handler)
std::vector< char > buffer_t
const uint16_t errUnknown
Unknown error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const uint16_t errInvalidArgs
const uint16_t suContinue
bool IsOK() const
We're fine.
An exception for carrying the XRootDStatus of InflCache.
XrdCl::XRootDStatus status
ZipError(const XrdCl::XRootDStatus &status)