XRootD
XrdClXCpSrc.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #ifndef SRC_XRDCL_XRDCLXCPSRC_HH_
26 #define SRC_XRDCL_XRDCLXCPSRC_HH_
27 
28 #include "XrdCl/XrdClFile.hh"
29 #include "XrdCl/XrdClSyncQueue.hh"
30 #include "XrdSys/XrdSysPthread.hh"
31 
32 namespace XrdCl
33 {
34 
35 class XCpCtx;
36 
37 class XCpSrc
38 {
39  friend class ChunkHandler;
40 
41  public:
42 
53  XCpSrc( uint32_t chunkSize, uint8_t parallel, int64_t fileSize, XCpCtx *ctx );
54 
58  void Start();
59 
63  void Stop()
64  {
65  pRunning = false;
66  }
67 
71  void Delete()
72  {
73  XrdSysMutexHelper lck( pMtx );
74  --pRefCount;
75  if( !pRefCount )
76  {
77  lck.UnLock();
78  delete this;
79  }
80  }
81 
88  {
89  XrdSysMutexHelper lck( pMtx );
90  ++pRefCount;
91  return this;
92  }
93 
97  bool IsRunning()
98  {
99  return pRunning;
100  }
101 
106  bool HasData()
107  {
108  XrdSysMutexHelper lck( pMtx );
109  return pCurrentOffset < pBlkEnd || !pRecovered.empty() || !pOngoing.empty();
110  }
111 
112 
113 
119  uint64_t TransferRate();
120 
126  static void DeleteChunk( PageInfo *&chunk )
127  {
128  if( chunk )
129  {
130  delete[] static_cast<char*>( chunk->GetBuffer() );
131  delete chunk;
132  chunk = 0;
133  }
134  }
135 
136  private:
137 
143  virtual ~XCpSrc();
144 
148  static void* Run( void* arg );
149 
154  void StartDownloading();
155 
166  XRootDStatus Initialize();
167 
175  XRootDStatus Recover();
176 
189  XRootDStatus ReadChunks();
190 
205  void Steal( XCpSrc *src );
206 
215  XRootDStatus GetWork();
216 
225  void ReportResponse( XRootDStatus *status, PageInfo *chunk, File *handle );
226 
230  template<typename T>
231  static void DeletePtr( T *&obj )
232  {
233  delete obj;
234  obj = 0;
235  }
236 
243  static bool FilesEqual( File *f1, File *f2 )
244  {
245  if( !f1 || !f2 ) return false;
246 
247  const std::string lastURL = "LastURL";
248  std::string url1, url2;
249 
250  f1->GetProperty( lastURL, url1 );
251  f2->GetProperty( lastURL, url2 );
252 
253  // remove cgi information
254  size_t pos = url1.find( '?' );
255  if( pos != std::string::npos )
256  url1 = url1.substr( 0 , pos );
257  pos = url2.find( '?' );
258  if( pos != std::string::npos )
259  url2 = url2.substr( 0 , pos );
260 
261  return url1 == url2;
262  }
263 
267  uint32_t pChunkSize;
268 
272  uint8_t pParallel;
273 
277  int64_t pFileSize;
278 
282  pthread_t pThread;
283 
287  XCpCtx *pCtx;
288 
292  std::string pUrl;
293 
297  File *pFile;
298 
299  std::map<File*, uint8_t> pFailed;
300 
304  uint64_t pCurrentOffset;
305 
309  uint64_t pBlkEnd;
310 
314  uint64_t pDataTransfered;
315 
320  std::map<uint64_t, uint64_t> pOngoing;
321 
326  std::map<uint64_t, uint64_t> pRecovered;
327 
334  SyncQueue<XRootDStatus*> pReports;
335 
339  XrdSysRecMutex pMtx;
340 
344  size_t pRefCount;
345 
351  bool pRunning;
352 
356  time_t pStartTime;
357 
362  time_t pTransferTime;
363 
368  bool pUsePgRead;
369 };
370 
371 } /* namespace XrdCl */
372 
373 #endif /* SRC_XRDCL_XRDCLXCPSRC_HH_ */
XrdOucString File
A file.
Definition: XrdClFile.hh:46
bool IsRunning()
Definition: XrdClXCpSrc.hh:97
static void DeleteChunk(PageInfo *&chunk)
Definition: XrdClXCpSrc.hh:126
XCpSrc(uint32_t chunkSize, uint8_t parallel, int64_t fileSize, XCpCtx *ctx)
Definition: XrdClXCpSrc.cc:110
XCpSrc * Self()
Definition: XrdClXCpSrc.hh:87
uint64_t TransferRate()
Definition: XrdClXCpSrc.cc:584
void * GetBuffer()
Get the buffer.