XRootD
Loading...
Searching...
No Matches
XrdClFileStateHandler.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
26#include "XrdCl/XrdClURL.hh"
27#include "XrdCl/XrdClLog.hh"
28#include "XrdCl/XrdClStatus.hh"
35#include "XrdCl/XrdClMonitor.hh"
41#include "XrdCl/XrdClUtils.hh"
42
43#ifdef WITH_XRDEC
45#endif
46
47#include "XrdOuc/XrdOucCRC.hh"
49#include "XrdOuc/XrdOucUtils.hh"
50
54
55#include <sstream>
56#include <memory>
57#include <numeric>
58#include <sys/time.h>
59#include <uuid/uuid.h>
60#include <mutex>
61
62namespace
63{
64 //----------------------------------------------------------------------------
65 // Helper callback for handling PgRead responses
66 //----------------------------------------------------------------------------
67 class PgReadHandler : public XrdCl::ResponseHandler
68 {
69 friend class PgReadRetryHandler;
70
71 public:
72
73 //------------------------------------------------------------------------
74 // Constructor
75 //------------------------------------------------------------------------
76 PgReadHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
77 XrdCl::ResponseHandler *userHandler,
78 uint64_t orgOffset ) :
79 stateHandler( stateHandler ),
80 userHandler( userHandler ),
81 orgOffset( orgOffset ),
82 maincall( true ),
83 retrycnt( 0 ),
84 nbrepair( 0 )
85 {
86 }
87
88 //------------------------------------------------------------------------
89 // Handle the response
90 //------------------------------------------------------------------------
91 void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
92 XrdCl::AnyObject *response,
93 XrdCl::HostList *hostList )
94 {
95 using namespace XrdCl;
96
97 std::unique_lock<std::mutex> lck( mtx );
98
99 if( !maincall )
100 {
101 //--------------------------------------------------------------------
102 // We are serving PgRead retry request
103 //--------------------------------------------------------------------
104 --retrycnt;
105 if( !status->IsOK() )
106 st.reset( status );
107 else
108 {
109 delete status; // by convention other args are null (see PgReadRetryHandler)
110 ++nbrepair; // update number of repaired pages
111 }
112
113 if( retrycnt == 0 )
114 {
115 //------------------------------------------------------------------
116 // All retries came back
117 //------------------------------------------------------------------
118 if( st->IsOK() )
119 {
120 PageInfo &pginf = XrdCl::To<PageInfo>( *resp );
121 pginf.SetNbRepair( nbrepair );
122 userHandler->HandleResponseWithHosts( st.release(), resp.release(), hosts.release() );
123 }
124 else
125 userHandler->HandleResponseWithHosts( st.release(), 0, 0 );
126 lck.unlock();
127 delete this;
128 }
129
130 return;
131 }
132
133 //----------------------------------------------------------------------
134 // We are serving main PgRead request
135 //----------------------------------------------------------------------
136 if( !status->IsOK() )
137 {
138 //--------------------------------------------------------------------
139 // The main PgRead request has failed
140 //--------------------------------------------------------------------
141 userHandler->HandleResponseWithHosts( status, response, hostList );
142 lck.unlock();
143 delete this;
144 return;
145 }
146
147 maincall = false;
148
149 //----------------------------------------------------------------------
150 // Do the integrity check
151 //----------------------------------------------------------------------
152 PageInfo *pginf = 0;
153 response->Get( pginf );
154
155 uint64_t pgoff = pginf->GetOffset();
156 uint32_t bytesRead = pginf->GetLength();
157 std::vector<uint32_t> &cksums = pginf->GetCksums();
158 char *buffer = reinterpret_cast<char*>( pginf->GetBuffer() );
159 size_t nbpages = XrdOucPgrwUtils::csNum( pgoff, bytesRead );
160 uint32_t pgsize = XrdSys::PageSize - pgoff % XrdSys::PageSize;
161 if( pgsize > bytesRead ) pgsize = bytesRead;
162
163 for( size_t pgnb = 0; pgnb < nbpages; ++pgnb )
164 {
165 uint32_t crcval = XrdOucCRC::Calc32C( buffer, pgsize );
166 if( crcval != cksums[pgnb] )
167 {
168 Log *log = DefaultEnv::GetLog();
169 log->Info( FileMsg, "[%p@%s] Received corrupted page, will retry page #%zu.",
170 (void*)this, stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
171
172 XRootDStatus st = XrdCl::FileStateHandler::PgReadRetry( stateHandler, pgoff, pgsize, pgnb, buffer, this, 0 );
173 if( !st.IsOK())
174 {
175 *status = st; // the reason for this failure
176 break;
177 }
178 ++retrycnt; // update the retry counter
179 }
180
181 bytesRead -= pgsize;
182 buffer += pgsize;
183 pgoff += pgsize;
184 pgsize = XrdSys::PageSize;
185 if( pgsize > bytesRead ) pgsize = bytesRead;
186 }
187
188
189 if( retrycnt == 0 )
190 {
191 //--------------------------------------------------------------------
192 // All went well!
193 //--------------------------------------------------------------------
194 userHandler->HandleResponseWithHosts( status, response, hostList );
195 lck.unlock();
196 delete this;
197 return;
198 }
199
200 //----------------------------------------------------------------------
201 // We have to wait for retries!
202 //----------------------------------------------------------------------
203 resp.reset( response );
204 hosts.reset( hostList );
205 st.reset( status );
206 }
207
208 void UpdateCksum( size_t pgnb, uint32_t crcval )
209 {
210 if( resp )
211 {
212 XrdCl::PageInfo *pginf = 0;
213 resp->Get( pginf );
214 pginf->GetCksums()[pgnb] = crcval;
215 }
216 }
217
218 private:
219
220 std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
221 XrdCl::ResponseHandler *userHandler;
222 uint64_t orgOffset;
223
224 std::unique_ptr<XrdCl::AnyObject> resp;
225 std::unique_ptr<XrdCl::HostList> hosts;
226 std::unique_ptr<XrdCl::XRootDStatus> st;
227
228 std::mutex mtx;
229 bool maincall;
230 size_t retrycnt;
231 size_t nbrepair;
232
233 };
234
235 //----------------------------------------------------------------------------
236 // Helper callback for handling PgRead retries
237 //----------------------------------------------------------------------------
238 class PgReadRetryHandler : public XrdCl::ResponseHandler
239 {
240 public:
241
242 PgReadRetryHandler( PgReadHandler *pgReadHandler, size_t pgnb ) : pgReadHandler( pgReadHandler ),
243 pgnb( pgnb )
244 {
245
246 }
247
248 //------------------------------------------------------------------------
249 // Handle the response
250 //------------------------------------------------------------------------
251 void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
252 XrdCl::AnyObject *response,
253 XrdCl::HostList *hostList )
254 {
255 using namespace XrdCl;
256
257 if( !status->IsOK() )
258 {
259 Log *log = DefaultEnv::GetLog();
260 log->Info( FileMsg, "[%p@%s] Failed to recover page #%zu.",
261 (void*)this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
262 pgReadHandler->HandleResponseWithHosts( status, response, hostList );
263 delete this;
264 return;
265 }
266
267 XrdCl::PageInfo *pginf = 0;
268 response->Get( pginf );
269 if( pginf->GetLength() > (uint32_t)XrdSys::PageSize || pginf->GetCksums().size() != 1 )
270 {
271 Log *log = DefaultEnv::GetLog();
272 log->Info( FileMsg, "[%p@%s] Failed to recover page #%zu.",
273 (void*)this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
274 // we retry a page at a time so the length cannot exceed 4KB
275 DeleteArgs( status, response, hostList );
276 pgReadHandler->HandleResponseWithHosts( new XRootDStatus( stError, errDataError ), 0, 0 );
277 delete this;
278 return;
279 }
280
281 uint32_t crcval = XrdOucCRC::Calc32C( pginf->GetBuffer(), pginf->GetLength() );
282 if( crcval != pginf->GetCksums().front() )
283 {
284 Log *log = DefaultEnv::GetLog();
285 log->Info( FileMsg, "[%p@%s] Failed to recover page #%zu.",
286 (void*)this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
287 DeleteArgs( status, response, hostList );
288 pgReadHandler->HandleResponseWithHosts( new XRootDStatus( stError, errDataError ), 0, 0 );
289 delete this;
290 return;
291 }
292
293 Log *log = DefaultEnv::GetLog();
294 log->Info( FileMsg, "[%p@%s] Successfully recovered page #%zu.",
295 (void*)this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
296
297 DeleteArgs( 0, response, hostList );
298 pgReadHandler->UpdateCksum( pgnb, crcval );
299 pgReadHandler->HandleResponseWithHosts( status, 0, 0 );
300 delete this;
301 }
302
303 private:
304
305 inline void DeleteArgs( XrdCl::XRootDStatus *status,
306 XrdCl::AnyObject *response,
307 XrdCl::HostList *hostList )
308 {
309 delete status;
310 delete response;
311 delete hostList;
312 }
313
314 PgReadHandler *pgReadHandler;
315 size_t pgnb;
316 };
317
318 //----------------------------------------------------------------------------
319 // Handle PgRead substitution with ordinary Read
320 //----------------------------------------------------------------------------
321 class PgReadSubstitutionHandler : public XrdCl::ResponseHandler
322 {
323 public:
324
325 //------------------------------------------------------------------------
326 // Constructor
327 //------------------------------------------------------------------------
328 PgReadSubstitutionHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
329 XrdCl::ResponseHandler *userHandler ) :
330 stateHandler( stateHandler ),
331 userHandler( userHandler )
332 {
333 }
334
335 //------------------------------------------------------------------------
336 // Handle the response
337 //------------------------------------------------------------------------
338 void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
339 XrdCl::AnyObject *rdresp,
340 XrdCl::HostList *hostList )
341 {
342 using namespace XrdCl;
343
344 if( !status->IsOK() )
345 {
346 userHandler->HandleResponseWithHosts( status, rdresp, hostList );
347 delete this;
348 return;
349 }
350
351
352 ChunkInfo *chunk = nullptr;
353 rdresp->Get( chunk );
354
355 if( !chunk )
356 {
357 userHandler->HandleResponseWithHosts( status, rdresp, hostList );
358 delete this;
359 return;
360 }
361
362 std::vector<uint32_t> cksums;
363 if( stateHandler->pIsChannelEncrypted )
364 {
365 size_t nbpages = chunk->length / XrdSys::PageSize;
366 if( chunk->length % XrdSys::PageSize )
367 ++nbpages;
368 cksums.reserve( nbpages );
369
370 size_t size = chunk->length;
371 char *buffer = reinterpret_cast<char*>( chunk->buffer );
372
373 for( size_t pg = 0; pg < nbpages; ++pg )
374 {
375 size_t pgsize = XrdSys::PageSize;
376 if( pgsize > size ) pgsize = size;
377 uint32_t crcval = XrdOucCRC::Calc32C( buffer, pgsize );
378 cksums.push_back( crcval );
379 buffer += pgsize;
380 size -= pgsize;
381 }
382 }
383
384 PageInfo *pages = new PageInfo( chunk->offset, chunk->length,
385 chunk->buffer, std::move( cksums ) );
386 delete rdresp;
387 AnyObject *response = new AnyObject();
388 response->Set( pages );
389 userHandler->HandleResponseWithHosts( status, response, hostList );
390
391 delete this;
392 }
393
394 private:
395
396 std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
397 XrdCl::ResponseHandler *userHandler;
398 };
399
400 //----------------------------------------------------------------------------
401 // Object that does things to the FileStateHandler when kXR_open returns
402 // and then calls the user handler
403 //----------------------------------------------------------------------------
404 class OpenHandler: public XrdCl::ResponseHandler
405 {
406 public:
407 //------------------------------------------------------------------------
408 // Constructor
409 //------------------------------------------------------------------------
410 OpenHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
411 XrdCl::ResponseHandler *userHandler ):
412 pStateHandler( stateHandler ),
413 pUserHandler( userHandler )
414 {
415 }
416
417 //------------------------------------------------------------------------
418 // Handle the response
419 //------------------------------------------------------------------------
420 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
421 XrdCl::AnyObject *response,
422 XrdCl::HostList *hostList )
423 {
424 using namespace XrdCl;
425
426 //----------------------------------------------------------------------
427 // Extract the statistics info
428 //----------------------------------------------------------------------
429 OpenInfo *openInfo = 0;
430 if( status->IsOK() )
431 response->Get( openInfo );
432#ifdef WITH_XRDEC
433 else
434 //--------------------------------------------------------------------
435 // Handle EC redirect
436 //--------------------------------------------------------------------
437 if( status->code == errRedirect )
438 {
439 std::string ecurl = status->GetErrorMessage();
440 EcHandler *ecHandler = GetEcHandler( hostList->front().url, ecurl );
441 if( ecHandler )
442 {
443 pStateHandler->pPlugin = ecHandler; // set the plugin for the File object
444 ecHandler->Open( pStateHandler->pOpenFlags, pUserHandler, 0/*TODO figure out right value for the timeout*/ );
445 return;
446 }
447 }
448#endif
449 //----------------------------------------------------------------------
450 // Notify the state handler and the client and say bye bye
451 //----------------------------------------------------------------------
452 pStateHandler->OnOpen( status, openInfo, hostList );
453 delete response;
454 if( pUserHandler )
455 pUserHandler->HandleResponseWithHosts( status, 0, hostList );
456 else
457 {
458 delete status;
459 delete hostList;
460 }
461 delete this;
462 }
463
464 private:
465 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
466 XrdCl::ResponseHandler *pUserHandler;
467 };
468
469 //----------------------------------------------------------------------------
470 // Object that does things to the FileStateHandler when kXR_close returns
471 // and then calls the user handler
472 //----------------------------------------------------------------------------
473 class CloseHandler: public XrdCl::ResponseHandler
474 {
475 public:
476 //------------------------------------------------------------------------
477 // Constructor
478 //------------------------------------------------------------------------
479 CloseHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
480 XrdCl::ResponseHandler *userHandler,
481 XrdCl::Message *message ):
482 pStateHandler( stateHandler ),
483 pUserHandler( userHandler ),
484 pMessage( message )
485 {
486 }
487
488 //------------------------------------------------------------------------
490 //------------------------------------------------------------------------
491 virtual ~CloseHandler()
492 {
493 delete pMessage;
494 }
495
496 //------------------------------------------------------------------------
497 // Handle the response
498 //------------------------------------------------------------------------
499 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
500 XrdCl::AnyObject *response,
501 XrdCl::HostList *hostList )
502 {
503 pStateHandler->OnClose( status );
504 if( pUserHandler )
505 pUserHandler->HandleResponseWithHosts( status, response, hostList );
506 else
507 {
508 delete response;
509 delete status;
510 delete hostList;
511 }
512
513 delete this;
514 }
515
516 private:
517 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
518 XrdCl::ResponseHandler *pUserHandler;
519 XrdCl::Message *pMessage;
520 };
521
522 //----------------------------------------------------------------------------
523 // Stateful message handler
524 //----------------------------------------------------------------------------
525 class StatefulHandler: public XrdCl::ResponseHandler
526 {
527 public:
528 //------------------------------------------------------------------------
529 // Constructor
530 //------------------------------------------------------------------------
531 StatefulHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
532 XrdCl::ResponseHandler *userHandler,
533 XrdCl::Message *message,
534 const XrdCl::MessageSendParams &sendParams ):
535 pStateHandler( stateHandler ),
536 pUserHandler( userHandler ),
537 pMessage( message ),
538 pSendParams( sendParams )
539 {
540 }
541
542 //------------------------------------------------------------------------
543 // Destructor
544 //------------------------------------------------------------------------
545 virtual ~StatefulHandler()
546 {
547 delete pMessage;
548 delete pSendParams.chunkList;
549 delete pSendParams.kbuff;
550 }
551
552 //------------------------------------------------------------------------
553 // Handle the response
554 //------------------------------------------------------------------------
555 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
556 XrdCl::AnyObject *response,
557 XrdCl::HostList *hostList )
558 {
559 using namespace XrdCl;
560 std::unique_ptr<AnyObject> responsePtr( response );
561 pSendParams.hostList = hostList;
562
563 //----------------------------------------------------------------------
564 // Houston we have a problem...
565 //----------------------------------------------------------------------
566 if( !status->IsOK() )
567 {
568 XrdCl::FileStateHandler::OnStateError( pStateHandler, status, pMessage, this, pSendParams );
569 return;
570 }
571
572 //----------------------------------------------------------------------
573 // We're clear
574 //----------------------------------------------------------------------
575 responsePtr.release();
576 XrdCl::FileStateHandler::OnStateResponse( pStateHandler, status, pMessage, response, hostList );
577 if( pUserHandler )
578 pUserHandler->HandleResponseWithHosts( status, response, hostList );
579 else
580 {
581 delete status,
582 delete response;
583 delete hostList;
584 }
585 delete this;
586 }
587
588 //------------------------------------------------------------------------
590 //------------------------------------------------------------------------
591 XrdCl::ResponseHandler *GetUserHandler()
592 {
593 return pUserHandler;
594 }
595
596 private:
597 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
598 XrdCl::ResponseHandler *pUserHandler;
599 XrdCl::Message *pMessage;
600 XrdCl::MessageSendParams pSendParams;
601 };
602
603 //----------------------------------------------------------------------------
604 // Release-buffer Handler
605 //----------------------------------------------------------------------------
606 class ReleaseBufferHandler: public XrdCl::ResponseHandler
607 {
608 public:
609
610 //------------------------------------------------------------------------
611 // Constructor
612 //------------------------------------------------------------------------
613 ReleaseBufferHandler( XrdCl::Buffer &&buffer, XrdCl::ResponseHandler *handler ) :
614 buffer( std::move( buffer ) ),
615 handler( handler )
616 {
617 }
618
619 //------------------------------------------------------------------------
620 // Handle the response
621 //------------------------------------------------------------------------
622 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
623 XrdCl::AnyObject *response,
624 XrdCl::HostList *hostList )
625 {
626 if (handler)
627 handler->HandleResponseWithHosts( status, response, hostList );
628 }
629
630 //------------------------------------------------------------------------
631 // Get the underlying buffer
632 //------------------------------------------------------------------------
633 XrdCl::Buffer& GetBuffer()
634 {
635 return buffer;
636 }
637
638 private:
639 XrdCl::Buffer buffer;
640 XrdCl::ResponseHandler *handler;
641 };
642}
643
644namespace XrdCl
645{
646 //----------------------------------------------------------------------------
647 // Constructor
648 //----------------------------------------------------------------------------
650 pFileState( Closed ),
651 pStatInfo( 0 ),
652 pFileUrl( 0 ),
653 pDataServer( 0 ),
654 pLoadBalancer( 0 ),
655 pStateRedirect( 0 ),
656 pWrtRecoveryRedir( 0 ),
657 pFileHandle( 0 ),
658 pOpenMode( 0 ),
659 pOpenFlags( 0 ),
660 pSessionId( 0 ),
661 pDoRecoverRead( true ),
662 pDoRecoverWrite( true ),
663 pFollowRedirects( true ),
664 pUseVirtRedirector( true ),
665 pIsChannelEncrypted( false ),
666 pAllowBundledClose( false ),
667 pPlugin( plugin )
668 {
669 pFileHandle = new uint8_t[4];
670 ResetMonitoringVars();
673 pLFileHandler = new LocalFileHandler();
674 }
675
676 //------------------------------------------------------------------------
681 //------------------------------------------------------------------------
682 FileStateHandler::FileStateHandler( bool useVirtRedirector, FilePlugIn *& plugin ):
683 pFileState( Closed ),
684 pStatInfo( 0 ),
685 pFileUrl( 0 ),
686 pDataServer( 0 ),
687 pLoadBalancer( 0 ),
688 pStateRedirect( 0 ),
689 pWrtRecoveryRedir( 0 ),
690 pFileHandle( 0 ),
691 pOpenMode( 0 ),
692 pOpenFlags( 0 ),
693 pSessionId( 0 ),
694 pDoRecoverRead( true ),
695 pDoRecoverWrite( true ),
696 pFollowRedirects( true ),
697 pUseVirtRedirector( useVirtRedirector ),
698 pAllowBundledClose( false ),
699 pPlugin( plugin )
700 {
701 pFileHandle = new uint8_t[4];
702 ResetMonitoringVars();
705 pLFileHandler = new LocalFileHandler();
706 }
707
708 //----------------------------------------------------------------------------
709 // Destructor
710 //----------------------------------------------------------------------------
712 {
713 //--------------------------------------------------------------------------
714 // This, in principle, should never ever happen. Except for the case
715 // when we're interfaced with ROOT that may call this desctructor from
716 // its garbage collector, from its __cxa_finalize, ie. after the XrdCl lib
717 // has been finalized by the linker. So, if we don't have the log object
718 // at this point we just give up the hope.
719 //--------------------------------------------------------------------------
720 if( DefaultEnv::GetLog() && pSessionId && !pDataServer->IsLocalFile() ) // if the file object was bound to a physical connection
721 DefaultEnv::GetPostMaster()->DecFileInstCnt( *pDataServer );
722
725
728
729 if( pFileState != Closed && DefaultEnv::GetLog() )
730 {
731 XRootDStatus st;
732 MonitorClose( &st );
733 ResetMonitoringVars();
734 }
735
736 // check if the logger is still there, this is only for root, as root might
737 // have unload us already so in this case we don't want to do anything
738 if( DefaultEnv::GetLog() && pUseVirtRedirector && pFileUrl && pFileUrl->IsMetalink() )
739 {
741 registry.Release( *pFileUrl );
742 }
743
744 delete pStatInfo;
745 delete pFileUrl;
746 delete pDataServer;
747 delete pLoadBalancer;
748 delete [] pFileHandle;
749 delete pLFileHandler;
750 }
751
752 //----------------------------------------------------------------------------
753 // Open the file pointed to by the given URL
754 //----------------------------------------------------------------------------
755 XRootDStatus FileStateHandler::Open( std::shared_ptr<FileStateHandler> &self,
756 const std::string &url,
757 uint16_t flags,
758 uint16_t mode,
759 ResponseHandler *handler,
760 uint16_t timeout )
761 {
762 XrdSysMutexHelper scopedLock( self->pMutex );
763
764 //--------------------------------------------------------------------------
765 // Check if we can proceed
766 //--------------------------------------------------------------------------
767 if( self->pFileState == Error )
768 return self->pStatus;
769
770 if( self->pFileState == OpenInProgress )
772
773 if( self->pFileState == CloseInProgress || self->pFileState == Opened ||
774 self->pFileState == Recovering )
776
777 self->pFileState = OpenInProgress;
778
779 //--------------------------------------------------------------------------
780 // Check if the parameters are valid
781 //--------------------------------------------------------------------------
782 Log *log = DefaultEnv::GetLog();
783
784 if( self->pFileUrl )
785 {
786 if( self->pUseVirtRedirector && self->pFileUrl->IsMetalink() )
787 {
789 registry.Release( *self->pFileUrl );
790 }
791 delete self->pFileUrl;
792 self->pFileUrl = 0;
793 }
794
795 self->pFileUrl = new URL( url );
796
797 //--------------------------------------------------------------------------
798 // Add unique uuid to each open request so replays due to error/timeout
799 // recovery can be correctly handled.
800 //--------------------------------------------------------------------------
801 URL::ParamsMap cgi = self->pFileUrl->GetParams();
802 uuid_t uuid;
803 char requuid[37]= {0};
804 uuid_generate( uuid );
805 uuid_unparse( uuid, requuid );
806 cgi["xrdcl.requuid"] = requuid;
807 self->pFileUrl->SetParams( cgi );
808
809 if( !self->pFileUrl->IsValid() )
810 {
811 log->Error( FileMsg, "[%p@%s] Trying to open invalid url: %s",
812 (void*)self.get(), self->pFileUrl->GetPath().c_str(), url.c_str() );
813 self->pStatus = XRootDStatus( stError, errInvalidArgs );
814 self->pFileState = Closed;
815 return self->pStatus;
816 }
817
818 //--------------------------------------------------------------------------
819 // Check if the recovery procedures should be enabled
820 //--------------------------------------------------------------------------
821 const URL::ParamsMap &urlParams = self->pFileUrl->GetParams();
822 URL::ParamsMap::const_iterator it;
823 it = urlParams.find( "xrdcl.recover-reads" );
824 if( (it != urlParams.end() && it->second == "false") ||
825 !self->pDoRecoverRead )
826 {
827 self->pDoRecoverRead = false;
828 log->Debug( FileMsg, "[%p@%s] Read recovery procedures are disabled",
829 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
830 }
831
832 it = urlParams.find( "xrdcl.recover-writes" );
833 if( (it != urlParams.end() && it->second == "false") ||
834 !self->pDoRecoverWrite )
835 {
836 self->pDoRecoverWrite = false;
837 log->Debug( FileMsg, "[%p@%s] Write recovery procedures are disabled",
838 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
839 }
840
841 //--------------------------------------------------------------------------
842 // Open the file
843 //--------------------------------------------------------------------------
844 log->Debug( FileMsg, "[%p@%s] Sending an open command", (void*)self.get(),
845 self->pFileUrl->GetObfuscatedURL().c_str() );
846
847 self->pOpenMode = mode;
848 self->pOpenFlags = flags;
849 OpenHandler *openHandler = new OpenHandler( self, handler );
850
851 Message *msg;
853 std::string path = self->pFileUrl->GetPathWithFilteredParams();
854 MessageUtils::CreateRequest( msg, req, path.length() );
855
856 req->requestid = kXR_open;
857 req->mode = mode;
858 req->options = flags | kXR_async | kXR_retstat;
859 req->dlen = path.length();
860 msg->Append( path.c_str(), path.length(), 24 );
861
863 MessageSendParams params; params.timeout = timeout;
864 params.followRedirects = self->pFollowRedirects;
866
867 XRootDStatus st = self->IssueRequest( *self->pFileUrl, msg, openHandler, params );
868
869 if( !st.IsOK() )
870 {
871 delete openHandler;
872 self->pStatus = st;
873 self->pFileState = Closed;
874 return st;
875 }
876 return st;
877 }
878
879 //----------------------------------------------------------------------------
880 // Close the file object
881 //----------------------------------------------------------------------------
882 XRootDStatus FileStateHandler::Close( std::shared_ptr<FileStateHandler> &self,
883 ResponseHandler *handler,
884 uint16_t timeout )
885 {
886 XrdSysMutexHelper scopedLock( self->pMutex );
887
888 //--------------------------------------------------------------------------
889 // Check if we can proceed
890 //--------------------------------------------------------------------------
891 if( self->pFileState == Error )
892 return self->pStatus;
893
894 if( self->pFileState == CloseInProgress )
896
897 if( self->pFileState == Closed )
899
900 if( self->pFileState == OpenInProgress || self->pFileState == Recovering )
902
903 if( !self->pAllowBundledClose && !self->pInTheFly.empty() )
905
906 self->pFileState = CloseInProgress;
907
908 Log *log = DefaultEnv::GetLog();
909 log->Debug( FileMsg, "[%p@%s] Sending a close command for handle %#x to %s",
910 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
911 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
912
913 //--------------------------------------------------------------------------
914 // Close the file
915 //--------------------------------------------------------------------------
916 Message *msg;
918 MessageUtils::CreateRequest( msg, req );
919
920 req->requestid = kXR_close;
921 memcpy( req->fhandle, self->pFileHandle, 4 );
922
924 msg->SetSessionId( self->pSessionId );
925 CloseHandler *closeHandler = new CloseHandler( self, handler, msg );
926 MessageSendParams params;
927 params.timeout = timeout;
928 params.followRedirects = false;
929 params.stateful = true;
931
932 XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, closeHandler, params );
933
934 if( !st.IsOK() )
935 {
936 // an invalid-session error means the connection to the server has been
937 // closed, which in turn means that the server closed the file already
940 st.code == errPollerError || st.code == errSocketError )
941 {
942 self->pFileState = Closed;
943 ResponseJob *job = new ResponseJob( closeHandler, new XRootDStatus(),
944 nullptr, nullptr );
946 return XRootDStatus();
947 }
948
949 delete closeHandler;
950 self->pStatus = st;
951 self->pFileState = Error;
952 return st;
953 }
954 return st;
955 }
956
957 //----------------------------------------------------------------------------
958 // Stat the file
959 //----------------------------------------------------------------------------
960 XRootDStatus FileStateHandler::Stat( std::shared_ptr<FileStateHandler> &self,
961 bool force,
962 ResponseHandler *handler,
963 uint16_t timeout )
964 {
965 XrdSysMutexHelper scopedLock( self->pMutex );
966
967 if( self->pFileState == Error ) return self->pStatus;
968
969 if( self->pFileState != Opened && self->pFileState != Recovering )
971
972 //--------------------------------------------------------------------------
973 // Return the cached info
974 //--------------------------------------------------------------------------
975 if( !force )
976 {
977 AnyObject *obj = new AnyObject();
978 obj->Set( new StatInfo( *self->pStatInfo ) );
979 if (handler)
980 handler->HandleResponseWithHosts( new XRootDStatus(), obj, new HostList() );
981 return XRootDStatus();
982 }
983
984 Log *log = DefaultEnv::GetLog();
985 log->Debug( FileMsg, "[%p@%s] Sending a stat command for handle %#x to %s",
986 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
987 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
988
989 //--------------------------------------------------------------------------
990 // Issue a new stat request
991 // stating a file handle doesn't work (fixed in 3.2.0) so we need to
992 // stat the pat
993 //--------------------------------------------------------------------------
994 Message *msg;
996 std::string path = self->pFileUrl->GetPath();
997 MessageUtils::CreateRequest( msg, req );
998
999 req->requestid = kXR_stat;
1000 memcpy( req->fhandle, self->pFileHandle, 4 );
1001
1002 MessageSendParams params;
1003 params.timeout = timeout;
1004 params.followRedirects = false;
1005 params.stateful = true;
1007
1009 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1010
1011 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1012 }
1013
1014 //----------------------------------------------------------------------------
1015 // Read a data chunk at a given offset - sync
1016 //----------------------------------------------------------------------------
1017 XRootDStatus FileStateHandler::Read( std::shared_ptr<FileStateHandler> &self,
1018 uint64_t offset,
1019 uint32_t size,
1020 void *buffer,
1021 ResponseHandler *handler,
1022 uint16_t timeout )
1023 {
1024 XrdSysMutexHelper scopedLock( self->pMutex );
1025
1026 if( self->pFileState == Error ) return self->pStatus;
1027
1028 if( self->pFileState != Opened && self->pFileState != Recovering )
1030
1031 Log *log = DefaultEnv::GetLog();
1032 log->Debug( FileMsg, "[%p@%s] Sending a read command for handle %#x to %s",
1033 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1034 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1035
1036 Message *msg;
1037 ClientReadRequest *req;
1038 MessageUtils::CreateRequest( msg, req );
1039
1040 req->requestid = kXR_read;
1041 req->offset = offset;
1042 req->rlen = size;
1043 memcpy( req->fhandle, self->pFileHandle, 4 );
1044
1045 ChunkList *list = new ChunkList();
1046 list->push_back( ChunkInfo( offset, size, buffer ) );
1047
1049 MessageSendParams params;
1050 params.timeout = timeout;
1051 params.followRedirects = false;
1052 params.stateful = true;
1053 params.chunkList = list;
1055 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1056
1057 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1058 }
1059
1060 //------------------------------------------------------------------------
1061 // Read data pages at a given offset
1062 //------------------------------------------------------------------------
1063 XRootDStatus FileStateHandler::PgRead( std::shared_ptr<FileStateHandler> &self,
1064 uint64_t offset,
1065 uint32_t size,
1066 void *buffer,
1067 ResponseHandler *handler,
1068 uint16_t timeout )
1069 {
1070 int issupported = true;
1071 AnyObject obj;
1073 int protver = 0;
1074 XRootDStatus st2 = Utils::GetProtocolVersion( *self->pDataServer, protver );
1075 if( st1.IsOK() && st2.IsOK() )
1076 {
1077 int *ptr = 0;
1078 obj.Get( ptr );
1079 issupported = ( ptr && (*ptr & kXR_suppgrw) ) && ( protver >= kXR_PROTPGRWVERSION );
1080 delete ptr;
1081 }
1082 else
1083 issupported = false;
1084
1085 if( !issupported )
1086 {
1087 DefaultEnv::GetLog()->Debug( FileMsg, "[%p@%s] PgRead not supported; substituting with Read.",
1088 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
1089 ResponseHandler *substitHandler = new PgReadSubstitutionHandler( self, handler );
1090 auto st = Read( self, offset, size, buffer, substitHandler, timeout );
1091 if( !st.IsOK() ) delete substitHandler;
1092 return st;
1093 }
1094
1095 ResponseHandler* pgHandler = new PgReadHandler( self, handler, offset );
1096 auto st = PgReadImpl( self, offset, size, buffer, PgReadFlags::None, pgHandler, timeout );
1097 if( !st.IsOK() ) delete pgHandler;
1098 return st;
1099 }
1100
1101 XRootDStatus FileStateHandler::PgReadRetry( std::shared_ptr<FileStateHandler> &self,
1102 uint64_t offset,
1103 uint32_t size,
1104 size_t pgnb,
1105 void *buffer,
1106 PgReadHandler *handler,
1107 uint16_t timeout )
1108 {
1109 if( size > (uint32_t)XrdSys::PageSize )
1110 return XRootDStatus( stError, errInvalidArgs, EINVAL,
1111 "PgRead retry size exceeded 4KB." );
1112
1113 ResponseHandler *retryHandler = new PgReadRetryHandler( handler, pgnb );
1114 XRootDStatus st = PgReadImpl( self, offset, size, buffer, PgReadFlags::Retry, retryHandler, timeout );
1115 if( !st.IsOK() ) delete retryHandler;
1116 return st;
1117 }
1118
1119 XRootDStatus FileStateHandler::PgReadImpl( std::shared_ptr<FileStateHandler> &self,
1120 uint64_t offset,
1121 uint32_t size,
1122 void *buffer,
1123 uint16_t flags,
1124 ResponseHandler *handler,
1125 uint16_t timeout )
1126 {
1127 XrdSysMutexHelper scopedLock( self->pMutex );
1128
1129 if( self->pFileState == Error ) return self->pStatus;
1130
1131 if( self->pFileState != Opened && self->pFileState != Recovering )
1133
1134 Log *log = DefaultEnv::GetLog();
1135 log->Debug( FileMsg, "[%p@%s] Sending a pgread command for handle %#x to %s",
1136 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1137 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1138
1139 Message *msg;
1141 MessageUtils::CreateRequest( msg, req, sizeof( ClientPgReadReqArgs ) );
1142
1143 req->requestid = kXR_pgread;
1144 req->offset = offset;
1145 req->rlen = size;
1146 memcpy( req->fhandle, self->pFileHandle, 4 );
1147
1148 //--------------------------------------------------------------------------
1149 // Now adjust the message size so it can hold PgRead arguments
1150 //--------------------------------------------------------------------------
1151 req->dlen = sizeof( ClientPgReadReqArgs );
1152 void *newBuf = msg->GetBuffer( sizeof( ClientPgReadRequest ) );
1153 memset( newBuf, 0, sizeof( ClientPgReadReqArgs ) );
1154 ClientPgReadReqArgs *args = reinterpret_cast<ClientPgReadReqArgs*>(
1155 msg->GetBuffer( sizeof( ClientPgReadRequest ) ) );
1156 args->reqflags = flags;
1157
1158 ChunkList *list = new ChunkList();
1159 list->push_back( ChunkInfo( offset, size, buffer ) );
1160
1162 MessageSendParams params;
1163 params.timeout = timeout;
1164 params.followRedirects = false;
1165 params.stateful = true;
1166 params.chunkList = list;
1168 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1169
1170 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1171 }
1172
1173 //----------------------------------------------------------------------------
1174 // Write a data chunk at a given offset - async
1175 //----------------------------------------------------------------------------
1176 XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1177 uint64_t offset,
1178 uint32_t size,
1179 const void *buffer,
1180 ResponseHandler *handler,
1181 uint16_t timeout )
1182 {
1183 XrdSysMutexHelper scopedLock( self->pMutex );
1184
1185 if( self->pFileState == Error ) return self->pStatus;
1186
1187 if( self->pFileState != Opened && self->pFileState != Recovering )
1189
1190 Log *log = DefaultEnv::GetLog();
1191 log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
1192 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1193 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1194
1195 Message *msg;
1196 ClientWriteRequest *req;
1197 MessageUtils::CreateRequest( msg, req );
1198
1199 req->requestid = kXR_write;
1200 req->offset = offset;
1201 req->dlen = size;
1202 memcpy( req->fhandle, self->pFileHandle, 4 );
1203
1204 ChunkList *list = new ChunkList();
1205 list->push_back( ChunkInfo( 0, size, (char*)buffer ) );
1206
1207 MessageSendParams params;
1208 params.timeout = timeout;
1209 params.followRedirects = false;
1210 params.stateful = true;
1211 params.chunkList = list;
1212
1214
1216 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1217
1218 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1219 }
1220
1221 //----------------------------------------------------------------------------
1222 // Write a data chunk at a given offset
1223 //----------------------------------------------------------------------------
1224 XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1225 uint64_t offset,
1226 Buffer &&buffer,
1227 ResponseHandler *handler,
1228 uint16_t timeout )
1229 {
1230 //--------------------------------------------------------------------------
1231 // If the memory is not page (4KB) aligned we cannot use the kernel buffer
1232 // so fall back to normal write
1233 //--------------------------------------------------------------------------
1234 if( !XrdSys::KernelBuffer::IsPageAligned( buffer.GetBuffer() ) || self->pIsChannelEncrypted )
1235 {
1236 Log *log = DefaultEnv::GetLog();
1237 log->Info( FileMsg, "[%p@%s] Buffer for handle %#x is not page aligned (4KB), "
1238 "cannot convert it to kernel space buffer.", (void*)self.get(),
1239 self->pFileUrl->GetObfuscatedURL().c_str(), *((uint32_t*)self->pFileHandle) );
1240
1241 void *buff = buffer.GetBuffer();
1242 uint32_t size = buffer.GetSize();
1243 ReleaseBufferHandler *wrtHandler =
1244 new ReleaseBufferHandler( std::move( buffer ), handler );
1245 XRootDStatus st = self->Write( self, offset, size, buff, wrtHandler, timeout );
1246 if( !st.IsOK() )
1247 {
1248 buffer = std::move( wrtHandler->GetBuffer() );
1249 delete wrtHandler;
1250 }
1251 return st;
1252 }
1253
1254 //--------------------------------------------------------------------------
1255 // Transfer the data from user space to kernel space
1256 //--------------------------------------------------------------------------
1257 uint32_t length = buffer.GetSize();
1258 char *ubuff = buffer.Release();
1259
1260 std::unique_ptr<XrdSys::KernelBuffer> kbuff( new XrdSys::KernelBuffer() );
1261 ssize_t ret = XrdSys::Move( ubuff, *kbuff, length );
1262 if( ret < 0 )
1264
1265 //--------------------------------------------------------------------------
1266 // Now create a write request and enqueue it
1267 //--------------------------------------------------------------------------
1268 return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1269 }
1270
1271 //----------------------------------------------------------------------------
1272 // Write a data from a given file descriptor at a given offset - async
1273 //----------------------------------------------------------------------------
1274 XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1275 uint64_t offset,
1276 uint32_t size,
1277 Optional<uint64_t> fdoff,
1278 int fd,
1279 ResponseHandler *handler,
1280 uint16_t timeout )
1281 {
1282 //--------------------------------------------------------------------------
1283 // Read the data from the file descriptor into a kernel buffer
1284 //--------------------------------------------------------------------------
1285 std::unique_ptr<XrdSys::KernelBuffer> kbuff( new XrdSys::KernelBuffer() );
1286 ssize_t ret = fdoff ? XrdSys::Read( fd, *kbuff, size, *fdoff ) :
1287 XrdSys::Read( fd, *kbuff, size );
1288 if( ret < 0 )
1290
1291 //--------------------------------------------------------------------------
1292 // Now create a write request and enqueue it
1293 //--------------------------------------------------------------------------
1294 return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1295 }
1296
1297 //----------------------------------------------------------------------------
1298 // Write number of pages at a given offset - async
1299 //----------------------------------------------------------------------------
1300 XRootDStatus FileStateHandler::PgWrite( std::shared_ptr<FileStateHandler> &self,
1301 uint64_t offset,
1302 uint32_t size,
1303 const void *buffer,
1304 std::vector<uint32_t> &cksums,
1305 ResponseHandler *handler,
1306 uint16_t timeout )
1307 {
1308 //--------------------------------------------------------------------------
1309 // Resolve timeout value
1310 //--------------------------------------------------------------------------
1311 if( timeout == 0 )
1312 {
1313 int val = DefaultRequestTimeout;
1314 XrdCl::DefaultEnv::GetEnv()->GetInt( "RequestTimeout", val );
1315 timeout = val;
1316 }
1317
1318 //--------------------------------------------------------------------------
1319 // Validate the digest vector size
1320 //--------------------------------------------------------------------------
1321 if( cksums.empty() )
1322 {
1323 const char *data = static_cast<const char*>( buffer );
1324 XrdOucPgrwUtils::csCalc( data, offset, size, cksums );
1325 }
1326 else
1327 {
1328 size_t crc32cCnt = XrdOucPgrwUtils::csNum( offset, size );
1329 if( crc32cCnt != cksums.size() )
1330 return XRootDStatus( stError, errInvalidArgs, 0, "Wrong number of crc32c digests." );
1331 }
1332
1333 //--------------------------------------------------------------------------
1334 // Create a context for PgWrite operation
1335 //--------------------------------------------------------------------------
1336 struct pgwrt_t
1337 {
1338 pgwrt_t( ResponseHandler *h ) : handler( h ), status( nullptr )
1339 {
1340 }
1341
1342 ~pgwrt_t()
1343 {
1344 if( handler )
1345 {
1346 // if all retries were successful no error status was set
1347 if( !status ) status = new XRootDStatus();
1348 handler->HandleResponse( status, nullptr );
1349 }
1350 }
1351
1352 static size_t GetPgNb( uint64_t pgoff, uint64_t offset, uint32_t fstpglen )
1353 {
1354 if( pgoff == offset ) return 0; // we need this if statement because we operate on unsigned integers
1355 return ( pgoff - ( offset + fstpglen ) ) / XrdSys::PageSize + 1;
1356 }
1357
1358 inline void SetStatus( XRootDStatus* s )
1359 {
1360 if( !status ) status = s;
1361 else delete s;
1362 }
1363
1364 ResponseHandler *handler;
1365 XRootDStatus *status;
1366 };
1367 auto pgwrt = std::make_shared<pgwrt_t>( handler );
1368
1369 int fLen, lLen;
1370 XrdOucPgrwUtils::csNum( offset, size, fLen, lLen );
1371 uint32_t fstpglen = fLen;
1372
1373 time_t start = ::time( nullptr );
1374 auto h = ResponseHandler::Wrap( [=]( XrdCl::XRootDStatus *s, XrdCl::AnyObject *r ) mutable
1375 {
1376 std::unique_ptr<AnyObject> scoped( r );
1377 // if the request failed simply pass the status to the
1378 // user handler
1379 if( !s->IsOK() )
1380 {
1381 pgwrt->SetStatus( s );
1382 return; // pgwrt destructor will call the handler
1383 }
1384 // also if the request was sucessful and there were no
1385 // corrupted pages pass the status to the user handler
1386 RetryInfo *inf = nullptr;
1387 r->Get( inf );
1388 if( !inf->NeedRetry() )
1389 {
1390 pgwrt->SetStatus( s );
1391 return; // pgwrt destructor will call the handler
1392 }
1393 delete s;
1394 // first adjust the timeout value
1395 uint16_t elapsed = ::time( nullptr ) - start;
1396 if( elapsed >= timeout )
1397 {
1398 pgwrt->SetStatus( new XRootDStatus( stError, errOperationExpired ) );
1399 return; // pgwrt destructor will call the handler
1400 }
1401 else timeout -= elapsed;
1402 // retransmit the corrupted pages
1403 for( size_t i = 0; i < inf->Size(); ++i )
1404 {
1405 auto tpl = inf->At( i );
1406 uint64_t pgoff = std::get<0>( tpl );
1407 uint32_t pglen = std::get<1>( tpl );
1408 const void *pgbuf = static_cast<const char*>( buffer ) + ( pgoff - offset );
1409 uint32_t pgdigest = cksums[pgwrt_t::GetPgNb( pgoff, offset, fstpglen )];
1410 auto h = ResponseHandler::Wrap( [=]( XrdCl::XRootDStatus *s, XrdCl::AnyObject *r ) mutable
1411 {
1412 std::unique_ptr<AnyObject> scoped( r );
1413 // if we failed simply set the status
1414 if( !s->IsOK() )
1415 {
1416 pgwrt->SetStatus( s );
1417 return; // the destructor will call the handler
1418 }
1419 delete s;
1420 // otherwise check if the data were not corrupted again
1421 RetryInfo *inf = nullptr;
1422 r->Get( inf );
1423 if( inf->NeedRetry() ) // so we failed in the end
1424 {
1425 DefaultEnv::GetLog()->Warning( FileMsg, "[%p@%s] Failed retransmitting corrupted "
1426 "page: pgoff=%llu, pglen=%u, pgdigest=%u", (void*)self.get(),
1427 self->pFileUrl->GetObfuscatedURL().c_str(), (unsigned long long) pgoff, pglen, pgdigest );
1428 pgwrt->SetStatus( new XRootDStatus( stError, errDataError, 0,
1429 "Failed to retransmit corrupted page" ) );
1430 }
1431 else
1432 DefaultEnv::GetLog()->Info( FileMsg, "[%p@%s] Succesfuly retransmitted corrupted "
1433 "page: pgoff=%llu, pglen=%u, pgdigest=%u", (void*)self.get(),
1434 self->pFileUrl->GetObfuscatedURL().c_str(), (unsigned long long) pgoff, pglen, pgdigest );
1435 } );
1436 auto st = PgWriteRetry( self, pgoff, pglen, pgbuf, pgdigest, h, timeout );
1437 if( !st.IsOK() ) pgwrt->SetStatus( new XRootDStatus( st ) );
1438 DefaultEnv::GetLog()->Info( FileMsg, "[%p@%s] Retransmitting corrupted page: "
1439 "pgoff=%llu, pglen=%u, pgdigest=%u", (void*)self.get(),
1440 self->pFileUrl->GetObfuscatedURL().c_str(), (unsigned long long) pgoff, pglen, pgdigest );
1441 }
1442 } );
1443
1444 auto st = PgWriteImpl( self, offset, size, buffer, cksums, 0, h, timeout );
1445 if( !st.IsOK() )
1446 {
1447 pgwrt->handler = nullptr;
1448 delete h;
1449 }
1450 return st;
1451 }
1452
1453 //------------------------------------------------------------------------
1454 // Write number of pages at a given offset - async
1455 //------------------------------------------------------------------------
1456 XRootDStatus FileStateHandler::PgWriteRetry( std::shared_ptr<FileStateHandler> &self,
1457 uint64_t offset,
1458 uint32_t size,
1459 const void *buffer,
1460 uint32_t digest,
1461 ResponseHandler *handler,
1462 uint16_t timeout )
1463 {
1464 std::vector<uint32_t> cksums{ digest };
1465 return PgWriteImpl( self, offset, size, buffer, cksums, PgReadFlags::Retry, handler, timeout );
1466 }
1467
1468 //------------------------------------------------------------------------
1469 // Write number of pages at a given offset - async
1470 //------------------------------------------------------------------------
1471 XRootDStatus FileStateHandler::PgWriteImpl( std::shared_ptr<FileStateHandler> &self,
1472 uint64_t offset,
1473 uint32_t size,
1474 const void *buffer,
1475 std::vector<uint32_t> &cksums,
1476 kXR_char flags,
1477 ResponseHandler *handler,
1478 uint16_t timeout )
1479 {
1480 XrdSysMutexHelper scopedLock( self->pMutex );
1481
1482 if( self->pFileState == Error ) return self->pStatus;
1483
1484 if( self->pFileState != Opened && self->pFileState != Recovering )
1486
1487 Log *log = DefaultEnv::GetLog();
1488 log->Debug( FileMsg, "[%p@%s] Sending a pgwrite command for handle %#x to %s",
1489 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1490 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1491
1492 //--------------------------------------------------------------------------
1493 // Create the message
1494 //--------------------------------------------------------------------------
1495 Message *msg;
1497 MessageUtils::CreateRequest( msg, req );
1498
1499 req->requestid = kXR_pgwrite;
1500 req->offset = offset;
1501 req->dlen = size + cksums.size() * sizeof( uint32_t );
1502 req->reqflags = flags;
1503 memcpy( req->fhandle, self->pFileHandle, 4 );
1504
1505 ChunkList *list = new ChunkList();
1506 list->push_back( ChunkInfo( offset, size, (char*)buffer ) );
1507
1508 MessageSendParams params;
1509 params.timeout = timeout;
1510 params.followRedirects = false;
1511 params.stateful = true;
1512 params.chunkList = list;
1513 params.crc32cDigests.swap( cksums );
1514
1516
1518 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1519
1520 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1521 }
1522
1523 //----------------------------------------------------------------------------
1524 // Commit all pending disk writes - async
1525 //----------------------------------------------------------------------------
1526 XRootDStatus FileStateHandler::Sync( std::shared_ptr<FileStateHandler> &self,
1527 ResponseHandler *handler,
1528 uint16_t timeout )
1529 {
1530 XrdSysMutexHelper scopedLock( self->pMutex );
1531
1532 if( self->pFileState == Error ) return self->pStatus;
1533
1534 if( self->pFileState != Opened && self->pFileState != Recovering )
1536
1537 Log *log = DefaultEnv::GetLog();
1538 log->Debug( FileMsg, "[%p@%s] Sending a sync command for handle %#x to %s",
1539 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1540 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1541
1542 Message *msg;
1543 ClientSyncRequest *req;
1544 MessageUtils::CreateRequest( msg, req );
1545
1546 req->requestid = kXR_sync;
1547 memcpy( req->fhandle, self->pFileHandle, 4 );
1548
1549 MessageSendParams params;
1550 params.timeout = timeout;
1551 params.followRedirects = false;
1552 params.stateful = true;
1554
1556 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1557
1558 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1559 }
1560
1561 //----------------------------------------------------------------------------
1562 // Truncate the file to a particular size - async
1563 //----------------------------------------------------------------------------
1564 XRootDStatus FileStateHandler::Truncate( std::shared_ptr<FileStateHandler> &self,
1565 uint64_t size,
1566 ResponseHandler *handler,
1567 uint16_t timeout )
1568 {
1569 XrdSysMutexHelper scopedLock( self->pMutex );
1570
1571 if( self->pFileState == Error ) return self->pStatus;
1572
1573 if( self->pFileState != Opened && self->pFileState != Recovering )
1575
1576 Log *log = DefaultEnv::GetLog();
1577 log->Debug( FileMsg, "[%p@%s] Sending a truncate command for handle %#x to %s",
1578 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1579 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1580
1581 Message *msg;
1583 MessageUtils::CreateRequest( msg, req );
1584
1585 req->requestid = kXR_truncate;
1586 memcpy( req->fhandle, self->pFileHandle, 4 );
1587 req->offset = size;
1588
1589 MessageSendParams params;
1590 params.timeout = timeout;
1591 params.followRedirects = false;
1592 params.stateful = true;
1594
1596 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1597
1598 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1599 }
1600
1601 //----------------------------------------------------------------------------
1602 // Read scattered data chunks in one operation - async
1603 //----------------------------------------------------------------------------
1604 XRootDStatus FileStateHandler::VectorRead( std::shared_ptr<FileStateHandler> &self,
1605 const ChunkList &chunks,
1606 void *buffer,
1607 ResponseHandler *handler,
1608 uint16_t timeout )
1609 {
1610 //--------------------------------------------------------------------------
1611 // Sanity check
1612 //--------------------------------------------------------------------------
1613 XrdSysMutexHelper scopedLock( self->pMutex );
1614
1615 if( self->pFileState == Error ) return self->pStatus;
1616
1617 if( self->pFileState != Opened && self->pFileState != Recovering )
1619
1620 Log *log = DefaultEnv::GetLog();
1621 log->Debug( FileMsg, "[%p@%s] Sending a vector read command for handle %#x to %s",
1622 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1623 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1624
1625 //--------------------------------------------------------------------------
1626 // Build the message
1627 //--------------------------------------------------------------------------
1628 Message *msg;
1629 ClientReadVRequest *req;
1630 MessageUtils::CreateRequest( msg, req, sizeof(readahead_list)*chunks.size() );
1631
1632 req->requestid = kXR_readv;
1633 req->dlen = sizeof(readahead_list)*chunks.size();
1634
1635 ChunkList *list = new ChunkList();
1636 char *cursor = (char*)buffer;
1637
1638 //--------------------------------------------------------------------------
1639 // Copy the chunk info
1640 //--------------------------------------------------------------------------
1641 readahead_list *dataChunk = (readahead_list*)msg->GetBuffer( 24 );
1642 for( size_t i = 0; i < chunks.size(); ++i )
1643 {
1644 dataChunk[i].rlen = chunks[i].length;
1645 dataChunk[i].offset = chunks[i].offset;
1646 memcpy( dataChunk[i].fhandle, self->pFileHandle, 4 );
1647
1648 void *chunkBuffer;
1649 if( cursor )
1650 {
1651 chunkBuffer = cursor;
1652 cursor += chunks[i].length;
1653 }
1654 else
1655 chunkBuffer = chunks[i].buffer;
1656
1657 list->push_back( ChunkInfo( chunks[i].offset,
1658 chunks[i].length,
1659 chunkBuffer ) );
1660 }
1661
1662 //--------------------------------------------------------------------------
1663 // Send the message
1664 //--------------------------------------------------------------------------
1665 MessageSendParams params;
1666 params.timeout = timeout;
1667 params.followRedirects = false;
1668 params.stateful = true;
1669 params.chunkList = list;
1671
1673 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1674
1675 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1676 }
1677
1678 //------------------------------------------------------------------------
1679 // Write scattered data chunks in one operation - async
1680 //------------------------------------------------------------------------
1681 XRootDStatus FileStateHandler::VectorWrite( std::shared_ptr<FileStateHandler> &self,
1682 const ChunkList &chunks,
1683 ResponseHandler *handler,
1684 uint16_t timeout )
1685 {
1686 //--------------------------------------------------------------------------
1687 // Sanity check
1688 //--------------------------------------------------------------------------
1689 XrdSysMutexHelper scopedLock( self->pMutex );
1690
1691 if( self->pFileState == Error ) return self->pStatus;
1692
1693 if( self->pFileState != Opened && self->pFileState != Recovering )
1695
1696 Log *log = DefaultEnv::GetLog();
1697 log->Debug( FileMsg, "[%p@%s] Sending a vector write command for handle %#x to %s",
1698 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1699 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1700
1701 //--------------------------------------------------------------------------
1702 // Determine the size of the payload
1703 //--------------------------------------------------------------------------
1704
1705 // the size of write vector
1706 uint32_t payloadSize = sizeof(XrdProto::write_list) * chunks.size();
1707
1708 //--------------------------------------------------------------------------
1709 // Build the message
1710 //--------------------------------------------------------------------------
1711 Message *msg;
1713 MessageUtils::CreateRequest( msg, req, payloadSize );
1714
1715 req->requestid = kXR_writev;
1716 req->dlen = sizeof(XrdProto::write_list) * chunks.size();
1717
1718 ChunkList *list = new ChunkList();
1719
1720 //--------------------------------------------------------------------------
1721 // Copy the chunk info
1722 //--------------------------------------------------------------------------
1723 XrdProto::write_list *writeList =
1724 reinterpret_cast<XrdProto::write_list*>( msg->GetBuffer( 24 ) );
1725
1726
1727
1728 for( size_t i = 0; i < chunks.size(); ++i )
1729 {
1730 writeList[i].wlen = chunks[i].length;
1731 writeList[i].offset = chunks[i].offset;
1732 memcpy( writeList[i].fhandle, self->pFileHandle, 4 );
1733
1734 list->push_back( ChunkInfo( chunks[i].offset,
1735 chunks[i].length,
1736 chunks[i].buffer ) );
1737 }
1738
1739 //--------------------------------------------------------------------------
1740 // Send the message
1741 //--------------------------------------------------------------------------
1742 MessageSendParams params;
1743 params.timeout = timeout;
1744 params.followRedirects = false;
1745 params.stateful = true;
1746 params.chunkList = list;
1748
1750 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1751
1752 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1753 }
1754
1755 //------------------------------------------------------------------------
1756 // Write scattered buffers in one operation - async
1757 //------------------------------------------------------------------------
1758 XRootDStatus FileStateHandler::WriteV( std::shared_ptr<FileStateHandler> &self,
1759 uint64_t offset,
1760 const struct iovec *iov,
1761 int iovcnt,
1762 ResponseHandler *handler,
1763 uint16_t timeout )
1764 {
1765 XrdSysMutexHelper scopedLock( self->pMutex );
1766
1767 if( self->pFileState == Error ) return self->pStatus;
1768
1769 if( self->pFileState != Opened && self->pFileState != Recovering )
1771
1772 Log *log = DefaultEnv::GetLog();
1773 log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
1774 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1775 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1776
1777 Message *msg;
1778 ClientWriteRequest *req;
1779 MessageUtils::CreateRequest( msg, req );
1780
1781 ChunkList *list = new ChunkList();
1782
1783 uint32_t size = 0;
1784 for( int i = 0; i < iovcnt; ++i )
1785 {
1786 if( iov[i].iov_len == 0 ) continue;
1787 size += iov[i].iov_len;
1788 list->push_back( ChunkInfo( 0, iov[i].iov_len,
1789 (char*)iov[i].iov_base ) );
1790 }
1791
1792 req->requestid = kXR_write;
1793 req->offset = offset;
1794 req->dlen = size;
1795 memcpy( req->fhandle, self->pFileHandle, 4 );
1796
1797 MessageSendParams params;
1798 params.timeout = timeout;
1799 params.followRedirects = false;
1800 params.stateful = true;
1801 params.chunkList = list;
1802
1804
1806 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1807
1808 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1809 }
1810
1811 //------------------------------------------------------------------------
1812 // Read data into scattered buffers in one operation - async
1813 //------------------------------------------------------------------------
1814 XRootDStatus FileStateHandler::ReadV( std::shared_ptr<FileStateHandler> &self,
1815 uint64_t offset,
1816 struct iovec *iov,
1817 int iovcnt,
1818 ResponseHandler *handler,
1819 uint16_t timeout )
1820 {
1821 XrdSysMutexHelper scopedLock( self->pMutex );
1822
1823 if( self->pFileState == Error ) return self->pStatus;
1824
1825 if( self->pFileState != Opened && self->pFileState != Recovering )
1827
1828 Log *log = DefaultEnv::GetLog();
1829 log->Debug( FileMsg, "[%p@%s] Sending a read command for handle %#x to %s",
1830 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1831 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1832
1833 Message *msg;
1834 ClientReadRequest *req;
1835 MessageUtils::CreateRequest( msg, req );
1836
1837 // calculate the total read size
1838 size_t size = std::accumulate( iov, iov + iovcnt, 0, []( size_t acc, iovec &rhs )
1839 {
1840 return acc + rhs.iov_len;
1841 } );
1842 req->requestid = kXR_read;
1843 req->offset = offset;
1844 req->rlen = size;
1846 memcpy( req->fhandle, self->pFileHandle, 4 );
1847
1848 ChunkList *list = new ChunkList();
1849 list->reserve( iovcnt );
1850 uint64_t choff = offset;
1851 for( int i = 0; i < iovcnt; ++i )
1852 {
1853 list->emplace_back( choff, iov[i].iov_len, iov[i].iov_base );
1854 choff += iov[i].iov_len;
1855 }
1856
1858 MessageSendParams params;
1859 params.timeout = timeout;
1860 params.followRedirects = false;
1861 params.stateful = true;
1862 params.chunkList = list;
1864 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1865
1866 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1867 }
1868
1869 //----------------------------------------------------------------------------
1870 // Performs a custom operation on an open file, server implementation
1871 // dependent - async
1872 //----------------------------------------------------------------------------
1873 XRootDStatus FileStateHandler::Fcntl( std::shared_ptr<FileStateHandler> &self,
1874 const Buffer &arg,
1875 ResponseHandler *handler,
1876 uint16_t timeout )
1877 {
1878 XrdSysMutexHelper scopedLock( self->pMutex );
1879
1880 if( self->pFileState == Error ) return self->pStatus;
1881
1882 if( self->pFileState != Opened && self->pFileState != Recovering )
1884
1885 Log *log = DefaultEnv::GetLog();
1886 log->Debug( FileMsg, "[%p@%s] Sending a fcntl command for handle %#x to %s",
1887 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1888 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1889
1890 Message *msg;
1891 ClientQueryRequest *req;
1892 MessageUtils::CreateRequest( msg, req, arg.GetSize() );
1893
1894 req->requestid = kXR_query;
1895 req->infotype = kXR_Qopaqug;
1896 req->dlen = arg.GetSize();
1897 memcpy( req->fhandle, self->pFileHandle, 4 );
1898 msg->Append( arg.GetBuffer(), arg.GetSize(), 24 );
1899
1900 MessageSendParams params;
1901 params.timeout = timeout;
1902 params.followRedirects = false;
1903 params.stateful = true;
1905
1907 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1908
1909 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1910 }
1911
1912 //----------------------------------------------------------------------------
1913 // Get access token to a file - async
1914 //----------------------------------------------------------------------------
1915 XRootDStatus FileStateHandler::Visa( std::shared_ptr<FileStateHandler> &self,
1916 ResponseHandler *handler,
1917 uint16_t timeout )
1918 {
1919 XrdSysMutexHelper scopedLock( self->pMutex );
1920
1921 if( self->pFileState == Error ) return self->pStatus;
1922
1923 if( self->pFileState != Opened && self->pFileState != Recovering )
1925
1926 Log *log = DefaultEnv::GetLog();
1927 log->Debug( FileMsg, "[%p@%s] Sending a visa command for handle %#x to %s",
1928 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1929 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1930
1931 Message *msg;
1932 ClientQueryRequest *req;
1933 MessageUtils::CreateRequest( msg, req );
1934
1935 req->requestid = kXR_query;
1936 req->infotype = kXR_Qvisa;
1937 memcpy( req->fhandle, self->pFileHandle, 4 );
1938
1939 MessageSendParams params;
1940 params.timeout = timeout;
1941 params.followRedirects = false;
1942 params.stateful = true;
1944
1946 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1947
1948 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1949 }
1950
1951 //------------------------------------------------------------------------
1952 // Set extended attributes - async
1953 //------------------------------------------------------------------------
1954 XRootDStatus FileStateHandler::SetXAttr( std::shared_ptr<FileStateHandler> &self,
1955 const std::vector<xattr_t> &attrs,
1956 ResponseHandler *handler,
1957 uint16_t timeout )
1958 {
1959 XrdSysMutexHelper scopedLock( self->pMutex );
1960
1961 if( self->pFileState == Error ) return self->pStatus;
1962
1963 if( self->pFileState != Opened && self->pFileState != Recovering )
1965
1966 Log *log = DefaultEnv::GetLog();
1967 log->Debug( FileMsg, "[%p@%s] Sending a fattr set command for handle %#x to %s",
1968 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1969 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1970
1971 //--------------------------------------------------------------------------
1972 // Issue a new fattr get request
1973 //--------------------------------------------------------------------------
1974 return XAttrOperationImpl( self, kXR_fattrSet, 0, attrs, handler, timeout );
1975 }
1976
1977 //------------------------------------------------------------------------
1978 // Get extended attributes - async
1979 //------------------------------------------------------------------------
1980 XRootDStatus FileStateHandler::GetXAttr( std::shared_ptr<FileStateHandler> &self,
1981 const std::vector<std::string> &attrs,
1982 ResponseHandler *handler,
1983 uint16_t timeout )
1984 {
1985 XrdSysMutexHelper scopedLock( self->pMutex );
1986
1987 if( self->pFileState == Error ) return self->pStatus;
1988
1989 if( self->pFileState != Opened && self->pFileState != Recovering )
1991
1992 Log *log = DefaultEnv::GetLog();
1993 log->Debug( FileMsg, "[%p@%s] Sending a fattr get command for handle %#x to %s",
1994 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1995 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1996
1997 //--------------------------------------------------------------------------
1998 // Issue a new fattr get request
1999 //--------------------------------------------------------------------------
2000 return XAttrOperationImpl( self, kXR_fattrGet, 0, attrs, handler, timeout );
2001 }
2002
2003 //------------------------------------------------------------------------
2004 // Delete extended attributes - async
2005 //------------------------------------------------------------------------
2006 XRootDStatus FileStateHandler::DelXAttr( std::shared_ptr<FileStateHandler> &self,
2007 const std::vector<std::string> &attrs,
2008 ResponseHandler *handler,
2009 uint16_t timeout )
2010 {
2011 XrdSysMutexHelper scopedLock( self->pMutex );
2012
2013 if( self->pFileState == Error ) return self->pStatus;
2014
2015 if( self->pFileState != Opened && self->pFileState != Recovering )
2017
2018 Log *log = DefaultEnv::GetLog();
2019 log->Debug( FileMsg, "[%p@%s] Sending a fattr del command for handle %#x to %s",
2020 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2021 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2022
2023 //--------------------------------------------------------------------------
2024 // Issue a new fattr del request
2025 //--------------------------------------------------------------------------
2026 return XAttrOperationImpl( self, kXR_fattrDel, 0, attrs, handler, timeout );
2027 }
2028
2029 //------------------------------------------------------------------------
2030 // List extended attributes - async
2031 //------------------------------------------------------------------------
2032 XRootDStatus FileStateHandler::ListXAttr( std::shared_ptr<FileStateHandler> &self,
2033 ResponseHandler *handler,
2034 uint16_t timeout )
2035 {
2036 XrdSysMutexHelper scopedLock( self->pMutex );
2037
2038 if( self->pFileState == Error ) return self->pStatus;
2039
2040 if( self->pFileState != Opened && self->pFileState != Recovering )
2042
2043 Log *log = DefaultEnv::GetLog();
2044 log->Debug( FileMsg, "[%p@%s] Sending a fattr list command for handle %#x to %s",
2045 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2046 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2047
2048 //--------------------------------------------------------------------------
2049 // Issue a new fattr get request
2050 //--------------------------------------------------------------------------
2051 static const std::vector<std::string> nothing;
2052 return XAttrOperationImpl( self, kXR_fattrList, ClientFattrRequest::aData,
2053 nothing, handler, timeout );
2054 }
2055
2056 //------------------------------------------------------------------------
2066 //------------------------------------------------------------------------
2067 XRootDStatus FileStateHandler::Checkpoint( std::shared_ptr<FileStateHandler> &self,
2068 kXR_char code,
2069 ResponseHandler *handler,
2070 uint16_t timeout )
2071 {
2072 XrdSysMutexHelper scopedLock( self->pMutex );
2073
2074 if( self->pFileState == Error ) return self->pStatus;
2075
2076 if( self->pFileState != Opened && self->pFileState != Recovering )
2078
2079 Log *log = DefaultEnv::GetLog();
2080 log->Debug( FileMsg, "[%p@%s] Sending a checkpoint command for handle %#x to %s",
2081 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2082 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2083
2084 Message *msg;
2086 MessageUtils::CreateRequest( msg, req );
2087
2088 req->requestid = kXR_chkpoint;
2089 req->opcode = code;
2090 memcpy( req->fhandle, self->pFileHandle, 4 );
2091
2092 MessageSendParams params;
2093 params.timeout = timeout;
2094 params.followRedirects = false;
2095 params.stateful = true;
2096
2098
2100 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2101
2102 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2103 }
2104
2105 //------------------------------------------------------------------------
2115 //------------------------------------------------------------------------
2116 XRootDStatus FileStateHandler::ChkptWrt( std::shared_ptr<FileStateHandler> &self,
2117 uint64_t offset,
2118 uint32_t size,
2119 const void *buffer,
2120 ResponseHandler *handler,
2121 uint16_t timeout )
2122 {
2123 XrdSysMutexHelper scopedLock( self->pMutex );
2124
2125 if( self->pFileState == Error ) return self->pStatus;
2126
2127 if( self->pFileState != Opened && self->pFileState != Recovering )
2129
2130 Log *log = DefaultEnv::GetLog();
2131 log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
2132 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2133 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2134
2135 Message *msg;
2137 MessageUtils::CreateRequest( msg, req, sizeof( ClientWriteRequest ) );
2138
2139 req->requestid = kXR_chkpoint;
2140 req->opcode = kXR_ckpXeq;
2141 req->dlen = 24; // as specified in the protocol specification
2142 memcpy( req->fhandle, self->pFileHandle, 4 );
2143
2145 wrtreq->requestid = kXR_write;
2146 wrtreq->offset = offset;
2147 wrtreq->dlen = size;
2148 memcpy( wrtreq->fhandle, self->pFileHandle, 4 );
2149
2150 ChunkList *list = new ChunkList();
2151 list->push_back( ChunkInfo( 0, size, (char*)buffer ) );
2152
2153 MessageSendParams params;
2154 params.timeout = timeout;
2155 params.followRedirects = false;
2156 params.stateful = true;
2157 params.chunkList = list;
2158
2160
2162 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2163
2164 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2165 }
2166
2167 //------------------------------------------------------------------------
2177 //------------------------------------------------------------------------
2178 XRootDStatus FileStateHandler::ChkptWrtV( std::shared_ptr<FileStateHandler> &self,
2179 uint64_t offset,
2180 const struct iovec *iov,
2181 int iovcnt,
2182 ResponseHandler *handler,
2183 uint16_t timeout )
2184 {
2185 XrdSysMutexHelper scopedLock( self->pMutex );
2186
2187 if( self->pFileState == Error ) return self->pStatus;
2188
2189 if( self->pFileState != Opened && self->pFileState != Recovering )
2191
2192 Log *log = DefaultEnv::GetLog();
2193 log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
2194 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2195 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2196
2197 Message *msg;
2199 MessageUtils::CreateRequest( msg, req, sizeof( ClientWriteRequest ) );
2200
2201 req->requestid = kXR_chkpoint;
2202 req->opcode = kXR_ckpXeq;
2203 req->dlen = 24; // as specified in the protocol specification
2204 memcpy( req->fhandle, self->pFileHandle, 4 );
2205
2206 ChunkList *list = new ChunkList();
2207 uint32_t size = 0;
2208 for( int i = 0; i < iovcnt; ++i )
2209 {
2210 if( iov[i].iov_len == 0 ) continue;
2211 size += iov[i].iov_len;
2212 list->push_back( ChunkInfo( 0, iov[i].iov_len,
2213 (char*)iov[i].iov_base ) );
2214 }
2215
2217 wrtreq->requestid = kXR_write;
2218 wrtreq->offset = offset;
2219 wrtreq->dlen = size;
2220 memcpy( wrtreq->fhandle, self->pFileHandle, 4 );
2221
2222 MessageSendParams params;
2223 params.timeout = timeout;
2224 params.followRedirects = false;
2225 params.stateful = true;
2226 params.chunkList = list;
2227
2229
2231 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2232
2233 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2234 }
2235
2236 //----------------------------------------------------------------------------
2237 // Check if the file is open
2238 //----------------------------------------------------------------------------
2240 {
2241 XrdSysMutexHelper scopedLock( pMutex );
2242
2243 if( pFileState == Opened || pFileState == Recovering )
2244 return true;
2245 return false;
2246 }
2247
2248 //----------------------------------------------------------------------------
2249 // Set file property
2250 //----------------------------------------------------------------------------
2251 bool FileStateHandler::SetProperty( const std::string &name,
2252 const std::string &value )
2253 {
2254 XrdSysMutexHelper scopedLock( pMutex );
2255 if( name == "ReadRecovery" )
2256 {
2257 if( value == "true" ) pDoRecoverRead = true;
2258 else pDoRecoverRead = false;
2259 return true;
2260 }
2261 else if( name == "WriteRecovery" )
2262 {
2263 if( value == "true" ) pDoRecoverWrite = true;
2264 else pDoRecoverWrite = false;
2265 return true;
2266 }
2267 else if( name == "FollowRedirects" )
2268 {
2269 if( value == "true" ) pFollowRedirects = true;
2270 else pFollowRedirects = false;
2271 return true;
2272 }
2273 else if( name == "BundledClose" )
2274 {
2275 if( value == "true" ) pAllowBundledClose = true;
2276 else pAllowBundledClose = false;
2277 return true;
2278 }
2279 return false;
2280 }
2281
2282 //----------------------------------------------------------------------------
2283 // Get file property
2284 //----------------------------------------------------------------------------
2285 bool FileStateHandler::GetProperty( const std::string &name,
2286 std::string &value ) const
2287 {
2288 XrdSysMutexHelper scopedLock( pMutex );
2289 if( name == "ReadRecovery" )
2290 {
2291 if( pDoRecoverRead ) value = "true";
2292 else value = "false";
2293 return true;
2294 }
2295 else if( name == "WriteRecovery" )
2296 {
2297 if( pDoRecoverWrite ) value = "true";
2298 else value = "false";
2299 return true;
2300 }
2301 else if( name == "FollowRedirects" )
2302 {
2303 if( pFollowRedirects ) value = "true";
2304 else value = "false";
2305 return true;
2306 }
2307 else if( name == "DataServer" && pDataServer )
2308 { value = pDataServer->GetHostId(); return true; }
2309 else if( name == "LastURL" && pDataServer )
2310 { value = pDataServer->GetURL(); return true; }
2311 else if( name == "WrtRecoveryRedir" && pWrtRecoveryRedir )
2312 { value = pWrtRecoveryRedir->GetHostId(); return true; }
2313 value = "";
2314 return false;
2315 }
2316
2317 //----------------------------------------------------------------------------
2318 // Process the results of the opening operation
2319 //----------------------------------------------------------------------------
2321 const OpenInfo *openInfo,
2322 const HostList *hostList )
2323 {
2324 Log *log = DefaultEnv::GetLog();
2325 XrdSysMutexHelper scopedLock( pMutex );
2326
2327 //--------------------------------------------------------------------------
2328 // Assign the data server and the load balancer
2329 //--------------------------------------------------------------------------
2330 std::string lastServer = pFileUrl->GetHostId();
2331 if( hostList )
2332 {
2333 delete pDataServer;
2334 delete pLoadBalancer;
2335 pLoadBalancer = 0;
2336 delete pWrtRecoveryRedir;
2337 pWrtRecoveryRedir = 0;
2338
2339 pDataServer = new URL( hostList->back().url );
2340 pDataServer->SetParams( pFileUrl->GetParams() );
2341 if( !( pUseVirtRedirector && pFileUrl->IsMetalink() ) ) pDataServer->SetPath( pFileUrl->GetPath() );
2342 lastServer = pDataServer->GetHostId();
2343 HostList::const_iterator itC;
2344 URL::ParamsMap params = pDataServer->GetParams();
2345 for( itC = hostList->begin(); itC != hostList->end(); ++itC )
2346 {
2347 MessageUtils::MergeCGI( params,
2348 itC->url.GetParams(),
2349 true );
2350 }
2351 pDataServer->SetParams( params );
2352
2353 HostList::const_reverse_iterator it;
2354 for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2355 if( it->loadBalancer )
2356 {
2357 pLoadBalancer = new URL( it->url );
2358 break;
2359 }
2360
2361 for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2362 if( it->flags & kXR_recoverWrts )
2363 {
2364 pWrtRecoveryRedir = new URL( it->url );
2365 break;
2366 }
2367 }
2368
2369 log->Debug(FileMsg, "[%p@%s] Open has returned with status %s",
2370 (void*)this, pFileUrl->GetObfuscatedURL().c_str(), status->ToStr().c_str() );
2371
2372 if( pDataServer && !pDataServer->IsLocalFile() )
2373 {
2374 //------------------------------------------------------------------------
2375 // Check if we are using a secure connection
2376 //------------------------------------------------------------------------
2377 XrdCl::AnyObject isencobj;
2379 QueryTransport( *pDataServer, XRootDQuery::IsEncrypted, isencobj );
2380 if( st.IsOK() )
2381 {
2382 bool *isenc;
2383 isencobj.Get( isenc );
2384 pIsChannelEncrypted = isenc ? *isenc : false;
2385 delete isenc;
2386 }
2387 }
2388
2389 //--------------------------------------------------------------------------
2390 // We have failed
2391 //--------------------------------------------------------------------------
2392 pStatus = *status;
2393 if( !pStatus.IsOK() || !openInfo )
2394 {
2395 log->Debug(FileMsg, "[%p@%s] Error while opening at %s: %s",
2396 (void*)this, pFileUrl->GetObfuscatedURL().c_str(), lastServer.c_str(),
2397 pStatus.ToStr().c_str() );
2398 FailQueuedMessages( pStatus );
2399 pFileState = Error;
2400
2401 //------------------------------------------------------------------------
2402 // Report to monitoring
2403 //------------------------------------------------------------------------
2405 if( mon )
2406 {
2408 i.file = pFileUrl;
2409 i.status = status;
2411 mon->Event( Monitor::EvErrIO, &i );
2412 }
2413 }
2414 //--------------------------------------------------------------------------
2415 // We have succeeded
2416 //--------------------------------------------------------------------------
2417 else
2418 {
2419 //------------------------------------------------------------------------
2420 // Store the response info
2421 //------------------------------------------------------------------------
2422 openInfo->GetFileHandle( pFileHandle );
2423 pSessionId = openInfo->GetSessionId();
2424 if( openInfo->GetStatInfo() )
2425 {
2426 delete pStatInfo;
2427 pStatInfo = new StatInfo( *openInfo->GetStatInfo() );
2428 }
2429
2430 log->Debug( FileMsg, "[%p@%s] successfully opened at %s, handle: %#x, "
2431 "session id: %llu", (void*)this, pFileUrl->GetObfuscatedURL().c_str(),
2432 pDataServer->GetHostId().c_str(), *((uint32_t*)pFileHandle),
2433 (unsigned long long) pSessionId );
2434
2435 //------------------------------------------------------------------------
2436 // Inform the monitoring about opening success
2437 //------------------------------------------------------------------------
2438 gettimeofday( &pOpenTime, 0 );
2440 if( mon )
2441 {
2443 i.file = pFileUrl;
2444 i.dataServer = pDataServer->GetHostId();
2445 i.oFlags = pOpenFlags;
2446 i.fSize = pStatInfo ? pStatInfo->GetSize() : 0;
2447 mon->Event( Monitor::EvOpen, &i );
2448 }
2449
2450 //------------------------------------------------------------------------
2451 // Resend the queued messages if any
2452 //------------------------------------------------------------------------
2453 ReSendQueuedMessages();
2454 pFileState = Opened;
2455 }
2456 }
2457
2458 //----------------------------------------------------------------------------
2459 // Process the results of the closing operation
2460 //----------------------------------------------------------------------------
2462 {
2463 Log *log = DefaultEnv::GetLog();
2464 XrdSysMutexHelper scopedLock( pMutex );
2465
2466 log->Debug(FileMsg, "[%p@%s] Close returned from %s with: %s", (void*)this,
2467 pFileUrl->GetObfuscatedURL().c_str(), pDataServer->GetHostId().c_str(),
2468 status->ToStr().c_str() );
2469
2470 log->Dump(FileMsg, "[%p@%s] Items in the fly %zu, queued for recovery %zu",
2471 (void*)this, pFileUrl->GetObfuscatedURL().c_str(), pInTheFly.size(), pToBeRecovered.size() );
2472
2473 MonitorClose( status );
2474 ResetMonitoringVars();
2475
2476 pStatus = *status;
2477 pFileState = Closed;
2478 }
2479
2480 //----------------------------------------------------------------------------
2481 // Handle an error while sending a stateful message
2482 //----------------------------------------------------------------------------
2483 void FileStateHandler::OnStateError( std::shared_ptr<FileStateHandler> &self,
2484 XRootDStatus *status,
2485 Message *message,
2486 ResponseHandler *userHandler,
2487 MessageSendParams &sendParams )
2488 {
2489 //--------------------------------------------------------------------------
2490 // It may be a redirection
2491 //--------------------------------------------------------------------------
2492 if( !status->IsOK() && status->code == errRedirect && self->pFollowRedirects )
2493 {
2494 static const std::string root = "root", xroot = "xroot", file = "file",
2495 roots = "roots", xroots = "xroots";
2496 std::string msg = status->GetErrorMessage();
2497 if( !msg.compare( 0, root.size(), root ) ||
2498 !msg.compare( 0, xroot.size(), xroot ) ||
2499 !msg.compare( 0, file.size(), file ) ||
2500 !msg.compare( 0, roots.size(), roots ) ||
2501 !msg.compare( 0, xroots.size(), xroots ) )
2502 {
2503 FileStateHandler::OnStateRedirection( self, msg, message, userHandler, sendParams );
2504 return;
2505 }
2506 }
2507
2508 //--------------------------------------------------------------------------
2509 // Handle error
2510 //--------------------------------------------------------------------------
2511 Log *log = DefaultEnv::GetLog();
2512 XrdSysMutexHelper scopedLock( self->pMutex );
2513 self->pInTheFly.erase( message );
2514
2515 log->Dump( FileMsg, "[%p@%s] File state error encountered. Message %s "
2516 "returned with %s", (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2517 message->GetObfuscatedDescription().c_str(), status->ToStr().c_str() );
2518
2519 //--------------------------------------------------------------------------
2520 // Report to monitoring
2521 //--------------------------------------------------------------------------
2523 if( mon )
2524 {
2526 i.file = self->pFileUrl;
2527 i.status = status;
2528
2529 ClientRequest *req = (ClientRequest*)message->GetBuffer();
2530 switch( req->header.requestid )
2531 {
2539 }
2540
2541 mon->Event( Monitor::EvErrIO, &i );
2542 }
2543
2544 //--------------------------------------------------------------------------
2545 // The message is not recoverable
2546 // (message using a kernel buffer is not recoverable by definition)
2547 //--------------------------------------------------------------------------
2548 if( !self->IsRecoverable( *status ) || sendParams.kbuff )
2549 {
2550 log->Error( FileMsg, "[%p@%s] Fatal file state error. Message %s "
2551 "returned with %s", (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2552 message->GetObfuscatedDescription().c_str(), status->ToStr().c_str() );
2553
2554 self->FailMessage( RequestData( message, userHandler, sendParams ), *status );
2555 delete status;
2556 return;
2557 }
2558
2559 //--------------------------------------------------------------------------
2560 // Insert the message to the recovery queue and start the recovery
2561 // procedure if we don't have any more message in the fly
2562 //--------------------------------------------------------------------------
2563 self->pCloseReason = *status;
2564 RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2565 delete status;
2566 }
2567
2568 //----------------------------------------------------------------------------
2569 // Handle stateful redirect
2570 //----------------------------------------------------------------------------
2571 void FileStateHandler::OnStateRedirection( std::shared_ptr<FileStateHandler> &self,
2572 const std::string &redirectUrl,
2573 Message *message,
2574 ResponseHandler *userHandler,
2575 MessageSendParams &sendParams )
2576 {
2577 XrdSysMutexHelper scopedLock( self->pMutex );
2578 self->pInTheFly.erase( message );
2579
2580 //--------------------------------------------------------------------------
2581 // Register the state redirect url and append the new cgi information to
2582 // the file URL
2583 //--------------------------------------------------------------------------
2584 if( !self->pStateRedirect )
2585 {
2586 std::ostringstream o;
2587 self->pStateRedirect = new URL( redirectUrl );
2588 URL::ParamsMap params = self->pFileUrl->GetParams();
2589 MessageUtils::MergeCGI( params,
2590 self->pStateRedirect->GetParams(),
2591 false );
2592 self->pFileUrl->SetParams( params );
2593 }
2594
2595 RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2596 }
2597
2598 //----------------------------------------------------------------------------
2599 // Handle stateful response
2600 //----------------------------------------------------------------------------
2601 void FileStateHandler::OnStateResponse( std::shared_ptr<FileStateHandler> &self,
2602 XRootDStatus *status,
2603 Message *message,
2604 AnyObject *response,
2605 HostList */*urlList*/ )
2606 {
2607 Log *log = DefaultEnv::GetLog();
2608 XrdSysMutexHelper scopedLock( self->pMutex );
2609
2610 log->Dump( FileMsg, "[%p@%s] Got state response for message %s",
2611 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2612 message->GetObfuscatedDescription().c_str() );
2613
2614 //--------------------------------------------------------------------------
2615 // Since this message may be the last "in-the-fly" and no recovery
2616 // is done if messages are in the fly, we may need to trigger recovery
2617 //--------------------------------------------------------------------------
2618 self->pInTheFly.erase( message );
2619 RunRecovery( self );
2620
2621 //--------------------------------------------------------------------------
2622 // Play with the actual response before returning it. This is a good
2623 // place to do caching in the future.
2624 //--------------------------------------------------------------------------
2625 ClientRequest *req = (ClientRequest*)message->GetBuffer();
2626 switch( req->header.requestid )
2627 {
2628 //------------------------------------------------------------------------
2629 // Cache the stat response
2630 //------------------------------------------------------------------------
2631 case kXR_stat:
2632 {
2633 StatInfo *info = 0;
2634 response->Get( info );
2635 delete self->pStatInfo;
2636 self->pStatInfo = new StatInfo( *info );
2637 break;
2638 }
2639
2640 //------------------------------------------------------------------------
2641 // Handle read response
2642 //------------------------------------------------------------------------
2643 case kXR_read:
2644 {
2645 ++self->pRCount;
2646 self->pRBytes += req->read.rlen;
2647 break;
2648 }
2649
2650 //------------------------------------------------------------------------
2651 // Handle read response
2652 //------------------------------------------------------------------------
2653 case kXR_pgread:
2654 {
2655 ++self->pRCount;
2656 self->pRBytes += req->pgread.rlen;
2657 break;
2658 }
2659
2660 //------------------------------------------------------------------------
2661 // Handle readv response
2662 //------------------------------------------------------------------------
2663 case kXR_readv:
2664 {
2665 ++self->pVRCount;
2666 size_t segs = req->header.dlen/sizeof(readahead_list);
2667 readahead_list *dataChunk = (readahead_list*)message->GetBuffer( 24 );
2668 for( size_t i = 0; i < segs; ++i )
2669 self->pVRBytes += dataChunk[i].rlen;
2670 self->pVSegs += segs;
2671 break;
2672 }
2673
2674 //------------------------------------------------------------------------
2675 // Handle write response
2676 //------------------------------------------------------------------------
2677 case kXR_write:
2678 {
2679 ++self->pWCount;
2680 self->pWBytes += req->write.dlen;
2681 break;
2682 }
2683
2684 //------------------------------------------------------------------------
2685 // Handle write response
2686 //------------------------------------------------------------------------
2687 case kXR_pgwrite:
2688 {
2689 ++self->pWCount;
2690 self->pWBytes += req->pgwrite.dlen;
2691 break;
2692 }
2693
2694 //------------------------------------------------------------------------
2695 // Handle writev response
2696 //------------------------------------------------------------------------
2697 case kXR_writev:
2698 {
2699 ++self->pVWCount;
2700 size_t size = req->header.dlen/sizeof(readahead_list);
2701 XrdProto::write_list *wrtList =
2702 reinterpret_cast<XrdProto::write_list*>( message->GetBuffer( 24 ) );
2703 for( size_t i = 0; i < size; ++i )
2704 self->pVWBytes += wrtList[i].wlen;
2705 break;
2706 }
2707 };
2708 }
2709
2710 //------------------------------------------------------------------------
2712 //------------------------------------------------------------------------
2713 void FileStateHandler::Tick( time_t now )
2714 {
2715 if (pMutex.CondLock())
2716 {TimeOutRequests( now );
2717 pMutex.UnLock();
2718 }
2719 }
2720
2721 //----------------------------------------------------------------------------
2722 // Declare timeout on requests being recovered
2723 //----------------------------------------------------------------------------
2725 {
2726 if( !pToBeRecovered.empty() )
2727 {
2728 Log *log = DefaultEnv::GetLog();
2729 log->Dump( FileMsg, "[%p@%s] Got a timer event", (void*)this,
2730 pFileUrl->GetObfuscatedURL().c_str() );
2731 RequestList::iterator it;
2733 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); )
2734 {
2735 if( it->params.expires <= now )
2736 {
2737 jobMan->QueueJob( new ResponseJob(
2738 it->handler,
2740 0, it->params.hostList ) );
2741 it = pToBeRecovered.erase( it );
2742 }
2743 else
2744 ++it;
2745 }
2746 }
2747 }
2748
2749 //----------------------------------------------------------------------------
2750 // Called in the child process after the fork
2751 //----------------------------------------------------------------------------
2753 {
2754 Log *log = DefaultEnv::GetLog();
2755
2756 if( pFileState == Closed || pFileState == Error )
2757 return;
2758
2759 if( (IsReadOnly() && pDoRecoverRead) ||
2760 (!IsReadOnly() && pDoRecoverWrite) )
2761 {
2762 log->Debug( FileMsg, "[%p@%s] Putting the file in recovery state in "
2763 "process %d", (void*)this, pFileUrl->GetObfuscatedURL().c_str(), getpid() );
2764 pFileState = Recovering;
2765 pInTheFly.clear();
2766 pToBeRecovered.clear();
2767 }
2768 else
2769 pFileState = Error;
2770 }
2771
2772 //------------------------------------------------------------------------
2773 // Try other data server
2774 //------------------------------------------------------------------------
2775 XRootDStatus FileStateHandler::TryOtherServer( std::shared_ptr<FileStateHandler> &self, uint16_t timeout )
2776 {
2777 XrdSysMutexHelper scopedLock( self->pMutex );
2778
2779 if( self->pFileState != Opened || !self->pLoadBalancer )
2781
2782 self->pFileState = Recovering;
2783
2784 Log *log = DefaultEnv::GetLog();
2785 log->Debug( FileMsg, "[%p@%s] Reopen file at next data server.",
2786 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
2787
2788 // merge CGI
2789 auto lbcgi = self->pLoadBalancer->GetParams();
2790 auto dtcgi = self->pDataServer->GetParams();
2791 MessageUtils::MergeCGI( lbcgi, dtcgi, false );
2792 // update tried CGI
2793 auto itr = lbcgi.find( "tried" );
2794 if( itr == lbcgi.end() )
2795 lbcgi["tried"] = self->pDataServer->GetHostName();
2796 else
2797 {
2798 std::string tried = itr->second;
2799 tried += "," + self->pDataServer->GetHostName();
2800 lbcgi["tried"] = tried;
2801 }
2802 self->pLoadBalancer->SetParams( lbcgi );
2803
2804 return ReOpenFileAtServer( self, *self->pLoadBalancer, timeout );
2805 }
2806
2807 //------------------------------------------------------------------------
2808 // Generic implementation of xattr operation
2809 //------------------------------------------------------------------------
2810 template<typename T>
2811 Status FileStateHandler::XAttrOperationImpl( std::shared_ptr<FileStateHandler> &self,
2812 kXR_char subcode,
2813 kXR_char options,
2814 const std::vector<T> &attrs,
2815 ResponseHandler *handler,
2816 uint16_t timeout )
2817 {
2818 //--------------------------------------------------------------------------
2819 // Issue a new fattr request
2820 //--------------------------------------------------------------------------
2821 Message *msg;
2822 ClientFattrRequest *req;
2823 MessageUtils::CreateRequest( msg, req );
2824
2825 req->requestid = kXR_fattr;
2826 req->subcode = subcode;
2827 req->numattr = attrs.size();
2828 req->options = options;
2829 memcpy( req->fhandle, self->pFileHandle, 4 );
2831 if( !st.IsOK() ) return st;
2832
2833 MessageSendParams params;
2834 params.timeout = timeout;
2835 params.followRedirects = false;
2836 params.stateful = true;
2838
2840 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2841
2842 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2843 }
2844
2845 //----------------------------------------------------------------------------
2846 // Send a message to a host or put it in the recovery queue
2847 //----------------------------------------------------------------------------
2848 Status FileStateHandler::SendOrQueue( std::shared_ptr<FileStateHandler> &self,
2849 const URL &url,
2850 Message *msg,
2851 ResponseHandler *handler,
2852 MessageSendParams &sendParams )
2853 {
2854 //--------------------------------------------------------------------------
2855 // Recovering
2856 //--------------------------------------------------------------------------
2857 if( self->pFileState == Recovering )
2858 {
2859 return RecoverMessage( self, RequestData( msg, handler, sendParams ), false );
2860 }
2861
2862 //--------------------------------------------------------------------------
2863 // Trying to send
2864 //--------------------------------------------------------------------------
2865 if( self->pFileState == Opened )
2866 {
2867 msg->SetSessionId( self->pSessionId );
2868 XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, handler, sendParams );
2869
2870 //------------------------------------------------------------------------
2871 // Invalid session id means that the connection has been broken while we
2872 // were idle so we haven't been informed about this fact earlier.
2873 //------------------------------------------------------------------------
2874 if( !st.IsOK() && st.code == errInvalidSession && self->IsRecoverable( st ) )
2875 return RecoverMessage( self, RequestData( msg, handler, sendParams ), false );
2876
2877 if( st.IsOK() )
2878 self->pInTheFly.insert(msg);
2879 else
2880 delete handler;
2881 return st;
2882 }
2883 return Status( stError, errInvalidOp );
2884 }
2885
2886 //----------------------------------------------------------------------------
2887 // Check if the stateful error is recoverable
2888 //----------------------------------------------------------------------------
2889 bool FileStateHandler::IsRecoverable( const XRootDStatus &status ) const
2890 {
2891 const auto recoverable_errors = {
2898 };
2899
2900 if (pDoRecoverRead || pDoRecoverWrite)
2901 for (const auto error : recoverable_errors)
2902 if (status.code == error)
2903 return IsReadOnly() ? pDoRecoverRead : pDoRecoverWrite;
2904
2905 return false;
2906 }
2907
2908 //----------------------------------------------------------------------------
2909 // Check if the file is open for read only
2910 //----------------------------------------------------------------------------
2911 bool FileStateHandler::IsReadOnly() const
2912 {
2913 if( (pOpenFlags & kXR_open_read) && !(pOpenFlags & kXR_open_updt) &&
2914 !(pOpenFlags & kXR_open_apnd ) )
2915 return true;
2916 return false;
2917 }
2918
2919 //----------------------------------------------------------------------------
2920 // Recover a message
2921 //----------------------------------------------------------------------------
2922 Status FileStateHandler::RecoverMessage( std::shared_ptr<FileStateHandler> &self,
2923 RequestData rd,
2924 bool callbackOnFailure )
2925 {
2926 self->pFileState = Recovering;
2927
2928 Log *log = DefaultEnv::GetLog();
2929 log->Dump( FileMsg, "[%p@%s] Putting message %s in the recovery list",
2930 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2931 rd.request->GetObfuscatedDescription().c_str() );
2932
2933 Status st = RunRecovery( self );
2934 if( st.IsOK() )
2935 {
2936 self->pToBeRecovered.push_back( rd );
2937 return st;
2938 }
2939
2940 if( callbackOnFailure )
2941 self->FailMessage( rd, st );
2942
2943 return st;
2944 }
2945
2946 //----------------------------------------------------------------------------
2947 // Run the recovery procedure if appropriate
2948 //----------------------------------------------------------------------------
2949 Status FileStateHandler::RunRecovery( std::shared_ptr<FileStateHandler> &self )
2950 {
2951 if( self->pFileState != Recovering )
2952 return Status();
2953
2954 if( !self->pInTheFly.empty() )
2955 return Status();
2956
2957 Log *log = DefaultEnv::GetLog();
2958 log->Debug( FileMsg, "[%p@%s] Running the recovery procedure", (void*)self.get(),
2959 self->pFileUrl->GetObfuscatedURL().c_str() );
2960
2961 Status st;
2962 if( self->pStateRedirect )
2963 {
2964 SendClose( self, 0 );
2965 st = ReOpenFileAtServer( self, *self->pStateRedirect, 0 );
2966 delete self->pStateRedirect; self->pStateRedirect = 0;
2967 }
2968 else if( self->IsReadOnly() && self->pLoadBalancer )
2969 st = ReOpenFileAtServer( self, *self->pLoadBalancer, 0 );
2970 else
2971 st = ReOpenFileAtServer( self, *self->pDataServer, 0 );
2972
2973 if( !st.IsOK() )
2974 {
2975 self->pFileState = Error;
2976 self->pStatus = st;
2977 self->FailQueuedMessages( st );
2978 }
2979
2980 return st;
2981 }
2982
2983 //----------------------------------------------------------------------------
2984 // Send a close and ignore the response
2985 //----------------------------------------------------------------------------
2986 XRootDStatus FileStateHandler::SendClose( std::shared_ptr<FileStateHandler> &self,
2987 uint16_t timeout )
2988 {
2989 Message *msg;
2990 ClientCloseRequest *req;
2991 MessageUtils::CreateRequest( msg, req );
2992
2993 req->requestid = kXR_close;
2994 memcpy( req->fhandle, self->pFileHandle, 4 );
2995
2997 msg->SetSessionId( self->pSessionId );
2998 ResponseHandler *handler = ResponseHandler::Wrap(
2999 [self]( XRootDStatus&, AnyObject& ) mutable { self.reset(); } );
3000 MessageSendParams params;
3001 params.timeout = timeout;
3002 params.followRedirects = false;
3003 params.stateful = true;
3004
3006
3007 return self->IssueRequest( *self->pDataServer, msg, handler, params );
3008 }
3009
3010 //----------------------------------------------------------------------------
3011 // Re-open the current file at a given server
3012 //----------------------------------------------------------------------------
3013 XRootDStatus FileStateHandler::ReOpenFileAtServer( std::shared_ptr<FileStateHandler> &self,
3014 const URL &url,
3015 uint16_t timeout )
3016 {
3017 Log *log = DefaultEnv::GetLog();
3018 log->Dump( FileMsg, "[%p@%s] Sending a recovery open command to %s",
3019 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(), url.GetObfuscatedURL().c_str() );
3020
3021 //--------------------------------------------------------------------------
3022 // Remove the kXR_delete and kXR_new flags, as we don't want the recovery
3023 // procedure to delete a file that has been partially updated or fail it
3024 // because a partially uploaded file already exists.
3025 //--------------------------------------------------------------------------
3026 if( self->pOpenFlags & kXR_delete)
3027 {
3028 self->pOpenFlags &= ~kXR_delete;
3029 self->pOpenFlags |= kXR_open_updt;
3030 }
3031
3032 self->pOpenFlags &= ~kXR_new;
3033
3034 Message *msg;
3035 ClientOpenRequest *req;
3036 URL u = url;
3037
3038 if( url.GetPath().empty() )
3039 u.SetPath( self->pFileUrl->GetPath() );
3040
3041 std::string path = u.GetPathWithFilteredParams();
3042 MessageUtils::CreateRequest( msg, req, path.length() );
3043
3044 req->requestid = kXR_open;
3045 req->mode = self->pOpenMode;
3046 req->options = self->pOpenFlags;
3047 req->dlen = path.length();
3048 msg->Append( path.c_str(), path.length(), 24 );
3049
3050 // create a new reopen handler
3051 // (it is not assigned to 'pReOpenHandler' in order not to bump the reference counter
3052 // until we know that 'SendMessage' was successful)
3053 OpenHandler *openHandler = new OpenHandler( self, 0 );
3054 MessageSendParams params; params.timeout = timeout;
3057
3058 //--------------------------------------------------------------------------
3059 // Issue the open request
3060 //--------------------------------------------------------------------------
3061 XRootDStatus st = self->IssueRequest( url, msg, openHandler, params );
3062
3063 // if there was a problem destroy the open handler
3064 if( !st.IsOK() )
3065 {
3066 delete openHandler;
3067 self->pStatus = st;
3068 self->pFileState = Closed;
3069 }
3070 return st;
3071 }
3072
3073 //------------------------------------------------------------------------
3074 // Fail a message
3075 //------------------------------------------------------------------------
3076 void FileStateHandler::FailMessage( RequestData rd, XRootDStatus status )
3077 {
3078 Log *log = DefaultEnv::GetLog();
3079 log->Dump( FileMsg, "[%p@%s] Failing message %s with %s",
3080 (void*)this, pFileUrl->GetObfuscatedURL().c_str(),
3081 rd.request->GetObfuscatedDescription().c_str(),
3082 status.ToStr().c_str() );
3083
3084 StatefulHandler *sh = dynamic_cast<StatefulHandler*>(rd.handler);
3085 if( !sh )
3086 {
3087 Log *log = DefaultEnv::GetLog();
3088 log->Error( FileMsg, "[%p@%s] Internal error while recovering %s",
3089 (void*)this, pFileUrl->GetObfuscatedURL().c_str(),
3090 rd.request->GetObfuscatedDescription().c_str() );
3091 return;
3092 }
3093
3094 JobManager *jobMan = DefaultEnv::GetPostMaster()->GetJobManager();
3095 ResponseHandler *userHandler = sh->GetUserHandler();
3096 jobMan->QueueJob( new ResponseJob(
3097 userHandler,
3098 new XRootDStatus( status ),
3099 0, rd.params.hostList ) );
3100
3101 delete sh;
3102 }
3103
3104 //----------------------------------------------------------------------------
3105 // Fail queued messages
3106 //----------------------------------------------------------------------------
3107 void FileStateHandler::FailQueuedMessages( XRootDStatus status )
3108 {
3109 RequestList::iterator it;
3110 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3111 FailMessage( *it, status );
3112 pToBeRecovered.clear();
3113 }
3114
3115 //------------------------------------------------------------------------
3116 // Re-send queued messages
3117 //------------------------------------------------------------------------
3118 void FileStateHandler::ReSendQueuedMessages()
3119 {
3120 RequestList::iterator it;
3121 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3122 {
3123 it->request->SetSessionId( pSessionId );
3124 ReWriteFileHandle( it->request );
3125 XRootDStatus st = IssueRequest( *pDataServer, it->request,
3126 it->handler, it->params );
3127 if( !st.IsOK() )
3128 FailMessage( *it, st );
3129 }
3130 pToBeRecovered.clear();
3131 }
3132
3133 //------------------------------------------------------------------------
3134 // Re-write file handle
3135 //------------------------------------------------------------------------
3136 void FileStateHandler::ReWriteFileHandle( Message *msg )
3137 {
3138 ClientRequestHdr *hdr = (ClientRequestHdr*)msg->GetBuffer();
3139 switch( hdr->requestid )
3140 {
3141 case kXR_read:
3142 {
3143 ClientReadRequest *req = (ClientReadRequest*)msg->GetBuffer();
3144 memcpy( req->fhandle, pFileHandle, 4 );
3145 break;
3146 }
3147 case kXR_write:
3148 {
3149 ClientWriteRequest *req = (ClientWriteRequest*)msg->GetBuffer();
3150 memcpy( req->fhandle, pFileHandle, 4 );
3151 break;
3152 }
3153 case kXR_sync:
3154 {
3155 ClientSyncRequest *req = (ClientSyncRequest*)msg->GetBuffer();
3156 memcpy( req->fhandle, pFileHandle, 4 );
3157 break;
3158 }
3159 case kXR_truncate:
3160 {
3161 ClientTruncateRequest *req = (ClientTruncateRequest*)msg->GetBuffer();
3162 memcpy( req->fhandle, pFileHandle, 4 );
3163 break;
3164 }
3165 case kXR_readv:
3166 {
3167 ClientReadVRequest *req = (ClientReadVRequest*)msg->GetBuffer();
3168 readahead_list *dataChunk = (readahead_list*)msg->GetBuffer( 24 );
3169 for( size_t i = 0; i < req->dlen/sizeof(readahead_list); ++i )
3170 memcpy( dataChunk[i].fhandle, pFileHandle, 4 );
3171 break;
3172 }
3173 case kXR_writev:
3174 {
3175 ClientWriteVRequest *req =
3176 reinterpret_cast<ClientWriteVRequest*>( msg->GetBuffer() );
3177 XrdProto::write_list *wrtList =
3178 reinterpret_cast<XrdProto::write_list*>( msg->GetBuffer( 24 ) );
3179 size_t size = req->dlen / sizeof(XrdProto::write_list);
3180 for( size_t i = 0; i < size; ++i )
3181 memcpy( wrtList[i].fhandle, pFileHandle, 4 );
3182 break;
3183 }
3184 case kXR_pgread:
3185 {
3186 ClientPgReadRequest *req = (ClientPgReadRequest*) msg->GetBuffer();
3187 memcpy( req->fhandle, pFileHandle, 4 );
3188 break;
3189 }
3190 case kXR_pgwrite:
3191 {
3192 ClientPgWriteRequest *req = (ClientPgWriteRequest*) msg->GetBuffer();
3193 memcpy( req->fhandle, pFileHandle, 4 );
3194 break;
3195 }
3196 }
3197
3198 Log *log = DefaultEnv::GetLog();
3199 log->Dump( FileMsg, "[%p@%s] Rewritten file handle for %s to %#x",
3200 (void*)this, pFileUrl->GetObfuscatedURL().c_str(), msg->GetObfuscatedDescription().c_str(),
3201 *((uint32_t*)pFileHandle) );
3203 }
3204
3205 //----------------------------------------------------------------------------
3206 // Dispatch monitoring information on close
3207 //----------------------------------------------------------------------------
3208 void FileStateHandler::MonitorClose( const XRootDStatus *status )
3209 {
3210 Monitor *mon = DefaultEnv::GetMonitor();
3211 if( mon )
3212 {
3213 Monitor::CloseInfo i;
3214 i.file = pFileUrl;
3215 i.oTOD = pOpenTime;
3216 gettimeofday( &i.cTOD, 0 );
3217 i.rBytes = pRBytes;
3218 i.vrBytes = pVRBytes;
3219 i.wBytes = pWBytes;
3220 i.vwBytes = pVWBytes;
3221 i.vSegs = pVSegs;
3222 i.rCount = pRCount;
3223 i.vCount = pVRCount;
3224 i.wCount = pWCount;
3225 i.status = status;
3226 mon->Event( Monitor::EvClose, &i );
3227 }
3228 }
3229
3230 XRootDStatus FileStateHandler::IssueRequest( const URL &url,
3231 Message *msg,
3232 ResponseHandler *handler,
3233 MessageSendParams &sendParams )
3234 {
3235 // first handle Metalinks
3236 if( pUseVirtRedirector && url.IsMetalink() )
3237 return MessageUtils::RedirectMessage( url, msg, handler,
3238 sendParams, pLFileHandler );
3239
3240 // than local file access
3241 if( url.IsLocalFile() )
3242 return pLFileHandler->ExecRequest( url, msg, handler, sendParams );
3243
3244 // and finally ordinary XRootD requests
3245 return MessageUtils::SendMessage( url, msg, handler,
3246 sendParams, pLFileHandler );
3247 }
3248
3249 //------------------------------------------------------------------------
3250 // Send a write request with payload being stored in a kernel buffer
3251 //------------------------------------------------------------------------
3252 XRootDStatus FileStateHandler::WriteKernelBuffer( std::shared_ptr<FileStateHandler> &self,
3253 uint64_t offset,
3254 uint32_t length,
3255 std::unique_ptr<XrdSys::KernelBuffer> kbuff,
3256 ResponseHandler *handler,
3257 uint16_t timeout )
3258 {
3259 //--------------------------------------------------------------------------
3260 // Create the write request
3261 //--------------------------------------------------------------------------
3262 XrdSysMutexHelper scopedLock( self->pMutex );
3263
3264 if( self->pFileState != Opened && self->pFileState != Recovering )
3265 return XRootDStatus( stError, errInvalidOp );
3266
3267 Log *log = DefaultEnv::GetLog();
3268 log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
3269 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
3270 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
3271
3272 Message *msg;
3273 ClientWriteRequest *req;
3274 MessageUtils::CreateRequest( msg, req );
3275
3276 req->requestid = kXR_write;
3277 req->offset = offset;
3278 req->dlen = length;
3279 memcpy( req->fhandle, self->pFileHandle, 4 );
3280
3281 MessageSendParams params;
3282 params.timeout = timeout;
3283 params.followRedirects = false;
3284 params.stateful = true;
3285 params.kbuff = kbuff.release();
3286 params.chunkList = new ChunkList();
3287
3289
3291 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
3292
3293 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
3294 }
3295}
kXR_unt16 requestid
Definition XProtocol.hh:479
kXR_unt16 requestid
Definition XProtocol.hh:630
kXR_unt16 requestid
Definition XProtocol.hh:806
@ kXR_fattrDel
Definition XProtocol.hh:270
@ kXR_fattrSet
Definition XProtocol.hh:273
@ kXR_fattrList
Definition XProtocol.hh:272
@ kXR_fattrGet
Definition XProtocol.hh:271
#define kXR_suppgrw
kXR_char fhandle[4]
Definition XProtocol.hh:531
kXR_char fhandle[4]
Definition XProtocol.hh:782
struct ClientPgReadRequest pgread
Definition XProtocol.hh:861
kXR_char fhandle[4]
Definition XProtocol.hh:807
kXR_char fhandle[4]
Definition XProtocol.hh:771
kXR_unt16 requestid
Definition XProtocol.hh:644
@ kXR_virtReadv
Definition XProtocol.hh:150
kXR_unt16 options
Definition XProtocol.hh:481
static const int kXR_ckpXeq
Definition XProtocol.hh:216
struct ClientPgWriteRequest pgwrite
Definition XProtocol.hh:862
kXR_unt16 requestid
Definition XProtocol.hh:228
@ kXR_async
Definition XProtocol.hh:458
@ kXR_delete
Definition XProtocol.hh:453
@ kXR_open_read
Definition XProtocol.hh:456
@ kXR_open_updt
Definition XProtocol.hh:457
@ kXR_open_apnd
Definition XProtocol.hh:462
@ kXR_retstat
Definition XProtocol.hh:463
struct ClientRequestHdr header
Definition XProtocol.hh:846
kXR_char fhandle[4]
Definition XProtocol.hh:509
#define kXR_recoverWrts
kXR_char fhandle[4]
Definition XProtocol.hh:645
kXR_char fhandle[4]
Definition XProtocol.hh:229
kXR_unt16 requestid
Definition XProtocol.hh:157
kXR_char fhandle[4]
Definition XProtocol.hh:633
@ kXR_read
Definition XProtocol.hh:125
@ kXR_open
Definition XProtocol.hh:122
@ kXR_writev
Definition XProtocol.hh:143
@ kXR_readv
Definition XProtocol.hh:137
@ kXR_sync
Definition XProtocol.hh:128
@ kXR_fattr
Definition XProtocol.hh:132
@ kXR_query
Definition XProtocol.hh:113
@ kXR_write
Definition XProtocol.hh:131
@ kXR_truncate
Definition XProtocol.hh:140
@ kXR_stat
Definition XProtocol.hh:129
@ kXR_pgread
Definition XProtocol.hh:142
@ kXR_chkpoint
Definition XProtocol.hh:124
@ kXR_close
Definition XProtocol.hh:115
@ kXR_pgwrite
Definition XProtocol.hh:138
struct ClientReadRequest read
Definition XProtocol.hh:867
kXR_int32 rlen
Definition XProtocol.hh:660
kXR_unt16 requestid
Definition XProtocol.hh:768
kXR_unt16 requestid
Definition XProtocol.hh:781
kXR_int64 offset
Definition XProtocol.hh:661
#define kXR_PROTPGRWVERSION
Definition XProtocol.hh:73
struct ClientWriteRequest write
Definition XProtocol.hh:876
kXR_unt16 requestid
Definition XProtocol.hh:670
@ kXR_Qopaqug
Definition XProtocol.hh:625
@ kXR_Qvisa
Definition XProtocol.hh:622
unsigned char kXR_char
Definition XPtypes.hh:65
static int mapError(int rc)
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
Binary blob representation.
void Append(const char *buffer, uint32_t size)
Append data at the position pointed to by the append cursor.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
uint32_t GetSize() const
Get the size of the message.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static FileTimer * GetFileTimer()
Get file timer task.
static ForkHandler * GetForkHandler()
Get the fork handler.
static Env * GetEnv()
Get default client environment.
XRootDStatus Open(uint16_t flags, ResponseHandler *handler, uint16_t timeout)
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
An interface for file plug-ins.
static XRootDStatus PgReadRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, size_t pgnb, void *buffer, PgReadHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWriteImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, kXR_char flags, ResponseHandler *handler, uint16_t timeout=0)
void AfterForkChild()
Called in the child process after the fork.
static XRootDStatus Stat(std::shared_ptr< FileStateHandler > &self, bool force, ResponseHandler *handler, uint16_t timeout=0)
static void OnStateRedirection(std::shared_ptr< FileStateHandler > &self, const std::string &redirectUrl, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle stateful redirect.
static XRootDStatus Sync(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
void TimeOutRequests(time_t now)
Declare timeout on requests being recovered.
static XRootDStatus DelXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus GetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus ListXAttr(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus SetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< xattr_t > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static void OnStateError(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle an error while sending a stateful message.
FileStateHandler(FilePlugIn *&plugin)
Constructor.
static XRootDStatus ReadV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgReadImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, uint16_t flags, ResponseHandler *handler, uint16_t timeout=0)
@ OpenInProgress
Opening is in progress.
@ CloseInProgress
Closing operation is in progress.
@ Opened
Opening has succeeded.
@ Recovering
Recovering from an error.
static XRootDStatus ChkptWrt(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout=0)
bool SetProperty(const std::string &name, const std::string &value)
static void OnStateResponse(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, AnyObject *response, HostList *hostList)
Handle stateful response.
static XRootDStatus Read(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
void OnClose(const XRootDStatus *status)
Process the results of the closing operation.
static XRootDStatus Fcntl(std::shared_ptr< FileStateHandler > &self, const Buffer &arg, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Truncate(std::shared_ptr< FileStateHandler > &self, uint64_t size, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Close(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus ChkptWrtV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWrite(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, ResponseHandler *handler, uint16_t timeout=0)
void OnOpen(const XRootDStatus *status, const OpenInfo *openInfo, const HostList *hostList)
Process the results of the opening operation.
static XRootDStatus PgRead(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWriteRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, uint32_t digest, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus VectorWrite(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus WriteV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Visa(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
bool GetProperty(const std::string &name, std::string &value) const
static XRootDStatus Open(std::shared_ptr< FileStateHandler > &self, const std::string &url, uint16_t flags, uint16_t mode, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus VectorRead(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
bool IsOpen() const
Check if the file is open.
static XRootDStatus Write(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Checkpoint(std::shared_ptr< FileStateHandler > &self, kXR_char code, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus TryOtherServer(std::shared_ptr< FileStateHandler > &self, uint16_t timeout)
Try other data server.
void UnRegisterFileObject(FileStateHandler *file)
Un-register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file object.
void UnRegisterFileObject(FileStateHandler *file)
A synchronized queue.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
static void MergeCGI(URL::ParamsMap &cgi1, const URL::ParamsMap &cgi2, bool replace)
Merge cgi2 into cgi1.
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static Status CreateXAttrBody(Message *msg, const std::vector< T > &vec, const std::string &path="")
static Status RedirectMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Redirect message.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
void SetSessionId(uint64_t sessionId)
Set the session ID which this message is meant for.
void SetVirtReqID(uint16_t virtReqID)
Set virtual request ID for the message.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
An abstract class to describe the client-side monitoring plugin interface.
@ EvClose
CloseInfo: File closed.
@ EvErrIO
ErrorInfo: An I/O error occurred.
@ EvOpen
OpenInfo: File opened.
virtual void Event(EventCode evCode, void *evData)=0
Information returned by file open operation.
void GetFileHandle(uint8_t *fileHandle) const
Get the file handle (4bytes)
const StatInfo * GetStatInfo() const
Get the stat info.
uint64_t GetSessionId() const
PgReadSubstitutionHandler(XrdCl::ResponseHandler *a, bool isHttps)
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
JobManager * GetJobManager()
Get the job manager object user by the post master.
void DecFileInstCnt(const URL &url)
Decrement file object instance count bound to this channel.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
void Release(const URL &url)
Release the virtual redirector associated with the given URL.
Handle an async response.
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
static ResponseHandler * Wrap(std::function< void(XRootDStatus &, AnyObject &)> func)
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Call the user callback.
Object stat info.
URL representation.
Definition XrdClURL.hh:31
const std::string & GetPath() const
Get the path.
Definition XrdClURL.hh:217
bool IsMetalink() const
Is it a URL to a metalink.
Definition XrdClURL.cc:465
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33
std::string GetPathWithFilteredParams() const
Get the path with params, filteres out 'xrdcl.'.
Definition XrdClURL.cc:331
std::string GetObfuscatedURL() const
Get the URL with authz information obfuscated.
Definition XrdClURL.cc:498
void SetPath(const std::string &path)
Set the path.
Definition XrdClURL.hh:225
bool IsLocalFile() const
Definition XrdClURL.cc:474
static XrdCl::XRootDStatus GetProtocolVersion(const XrdCl::URL url, int &protver)
const std::string & GetErrorMessage() const
Get error message.
std::string ToStr() const
Convert to string.
static void SetDescription(Message *msg)
Get the description of a message.
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition XrdOucCRC.cc:190
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
static bool IsPageAligned(const void *ptr)
const uint16_t errSocketOptError
const uint16_t errTlsError
const uint16_t errOperationExpired
const uint16_t errPollerError
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errInProgress
const uint16_t errSocketTimeout
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const uint16_t errInvalidOp
const uint64_t FileMsg
const uint16_t suAlreadyDone
EcHandler * GetEcHandler(const URL &headnode, const URL &redirurl)
const uint16_t errInvalidArgs
const int DefaultRequestTimeout
std::vector< ChunkInfo > ChunkList
List of chunks.
const uint16_t errConnectionError
const uint16_t errSocketError
const uint16_t errOperationInterrupted
const uint16_t errInvalidSession
T & To(AnyObject &any)
const uint16_t errRedirect
const uint16_t errSocketDisconnected
Response NullRef< Response >::value
XrdSysError Log
Definition XrdConfig.cc:113
static const int PageSize
ssize_t Read(int fd, KernelBuffer &buffer, uint32_t length, int64_t offset)
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
static const int aData
Definition XProtocol.hh:298
kXR_char fhandle[4]
Definition XProtocol.hh:288
kXR_unt16 requestid
Definition XProtocol.hh:287
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
std::vector< uint32_t > crc32cDigests
XrdSys::KernelBuffer * kbuff
uint64_t vwBytes
Total number of bytes written vie writev.
const XRootDStatus * status
Close status.
uint32_t wCount
Total count of writes.
uint64_t vSegs
Total count of readv segments.
uint64_t vrBytes
Total number of bytes read via readv.
timeval cTOD
gettimeofday() when file was closed
uint32_t vCount
Total count of readv.
const URL * file
The file in question.
uint64_t rBytes
Total number of bytes read via read.
timeval oTOD
gettimeofday() when file was opened
uint64_t wBytes
Total number of bytes written.
uint32_t rCount
Total count of reads.
Describe an encountered file-based error.
@ ErrUnc
Unclassified operation.
const XRootDStatus * status
Status code.
const URL * file
The file in question.
Operation opCode
The associated operation.
Describe a file open event to the monitor.
uint64_t fSize
File size in bytes.
const URL * file
File in question.
std::string dataServer
Actual fata server.
uint16_t oFlags
OpenFlags.
void SetNbRepair(size_t nbrepair)
Set number of repaired pages.
std::vector< uint32_t > & GetCksums()
Get the checksums.
uint32_t GetLength() const
Get the data length.
uint64_t GetOffset() const
Get the offset.
void * GetBuffer()
Get the buffer.
std::tuple< uint64_t, uint32_t > At(size_t i)
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
static const uint16_t ServerFlags
returns server flags
static const uint16_t IsEncrypted
returns true if the channel is encrypted