XRootD
XrdEc::StrmWriter Class Reference

#include <XrdEcStrmWriter.hh>

+ Collaboration diagram for XrdEc::StrmWriter:

Public Member Functions

 StrmWriter (const ObjCfg &objcfg)
 Constructor. More...
 
virtual ~StrmWriter ()
 Destructor. More...
 
void Close (XrdCl::ResponseHandler *handler, uint16_t timeout=0)
 
uint64_t GetSize ()
 
void Open (XrdCl::ResponseHandler *handler, uint16_t timeout=0)
 
void Write (uint32_t size, const void *buff, XrdCl::ResponseHandler *handler)
 

Detailed Description

The Stream Writer objects, responsible for writing erasure coded data into selected placement group.

Definition at line 52 of file XrdEcStrmWriter.hh.

Constructor & Destructor Documentation

◆ StrmWriter()

XrdEc::StrmWriter::StrmWriter ( const ObjCfg objcfg)
inline

Constructor.

Definition at line 64 of file XrdEcStrmWriter.hh.

64  : objcfg( objcfg ),
65  writer_thread_stop( false ),
66  writer_thread( writer_routine, this ),
67  next_blknb( 0 ),
68  global_status( this )
69  {
70  }

◆ ~StrmWriter()

virtual XrdEc::StrmWriter::~StrmWriter ( )
inlinevirtual

Destructor.

Definition at line 75 of file XrdEcStrmWriter.hh.

76  {
77  writer_thread_stop = true;
78  buffers.interrupt();
79  writer_thread.join();
80  }

References XrdEc::sync_queue< Element >::interrupt().

+ Here is the call graph for this function:

Member Function Documentation

◆ Close()

void XrdEc::StrmWriter::Close ( XrdCl::ResponseHandler handler,
uint16_t  timeout = 0 
)

Close the data object

Parameters
handler: user callback

Definition at line 108 of file XrdEcStrmWriter.cc.

109  {
110  //-------------------------------------------------------------------------
111  // First, check the global status, if we are in an error state just
112  // fail the request.
113  //-------------------------------------------------------------------------
114  XrdCl::XRootDStatus gst = global_status.get();
115  if( !gst.IsOK() ) return ScheduleHandler( handler, gst );
116  //-------------------------------------------------------------------------
117  // Take care of the left-over data ...
118  //-------------------------------------------------------------------------
119  if( wrtbuff && !wrtbuff->Empty() ) EnqueueBuff( std::move( wrtbuff ) );
120  //-------------------------------------------------------------------------
121  // Let the global status handle the close
122  //-------------------------------------------------------------------------
123  global_status.issue_close( handler, timeout );
124  }
void ScheduleHandler(uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler)
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124

References XrdCl::Status::IsOK(), and XrdEc::ScheduleHandler().

+ Here is the call graph for this function:

◆ GetSize()

uint64_t XrdEc::StrmWriter::GetSize ( )
inline
Returns
: get file size

Definition at line 108 of file XrdEcStrmWriter.hh.

109  {
110  return global_status.get_btswritten();
111  }

◆ Open()

void XrdEc::StrmWriter::Open ( XrdCl::ResponseHandler handler,
uint16_t  timeout = 0 
)

Open the data object for writting

Parameters
handler: user callback

Definition at line 44 of file XrdEcStrmWriter.cc.

45  {
46  const size_t size = objcfg.plgr.size();
47 
48  std::vector<XrdCl::Pipeline> opens;
49  opens.reserve( size );
50  // initialize all zip archive objects
51  for( size_t i = 0; i < size; ++i )
52  dataarchs.emplace_back( std::make_shared<XrdCl::ZipArchive>(
54 
55  for( size_t i = 0; i < size; ++i )
56  {
57  std::string url = objcfg.GetDataUrl( i );
58  XrdCl::Ctx<XrdCl::ZipArchive> zip( *dataarchs[i] );
59  opens.emplace_back( XrdCl::OpenArchive( zip, url, XrdCl::OpenFlags::New | XrdCl::OpenFlags::Write ) );
60  }
61 
62  XrdCl::Async( XrdCl::Parallel( opens ).AtLeast( objcfg.nbchunks ) >>
63  [=]( XrdCl::XRootDStatus &st )
64  {
65  if( !st.IsOK() ) global_status.report_open( st );
66  handler->HandleResponse( new XrdCl::XRootDStatus( st ), nullptr );
67  }, timeout );
68  }
bool enable_plugins
Definition: XrdEcConfig.hh:77
static Config & Instance()
Singleton access.
Definition: XrdEcConfig.hh:46
std::future< XRootDStatus > Async(Pipeline pipeline, uint16_t timeout=0)
ParallelOperation< false > Parallel(Container &&container)
Factory function for creating parallel operation from a vector.
OpenArchiveImpl< false > OpenArchive(Ctx< ZipArchive > zip, Arg< std::string > fn, Arg< OpenFlags::Flags > flags, uint16_t timeout=0)
Factory for creating OpenArchiveImpl objects.
@ Write
Open only for writing.
std::string GetDataUrl(size_t i) const
Definition: XrdEcObjCfg.hh:65
std::vector< std::string > plgr
Definition: XrdEcObjCfg.hh:92
const uint8_t nbchunks
Definition: XrdEcObjCfg.hh:85

References XrdCl::Async(), XrdEc::Config::enable_plugins, XrdEc::ObjCfg::GetDataUrl(), XrdEc::Config::Instance(), XrdEc::ObjCfg::nbchunks, XrdCl::OpenFlags::New, XrdCl::OpenArchive(), XrdCl::Parallel(), XrdEc::ObjCfg::plgr, and XrdCl::OpenFlags::Write.

+ Here is the call graph for this function:

◆ Write()

void XrdEc::StrmWriter::Write ( uint32_t  size,
const void *  buff,
XrdCl::ResponseHandler handler 
)

Write data to the data object

Parameters
size: number of bytes to be written
buff: buffer with data to be written
handler: user callback

Definition at line 73 of file XrdEcStrmWriter.cc.

74  {
75  //-------------------------------------------------------------------------
76  // First, check the global status, if we are in an error state just
77  // fail the request.
78  //-------------------------------------------------------------------------
79  XrdCl::XRootDStatus gst = global_status.get();
80  if( !gst.IsOK() ) return ScheduleHandler( handler, gst );
81 
82  //-------------------------------------------------------------------------
83  // Update the number of bytes left to be written
84  //-------------------------------------------------------------------------
85  global_status.issue_write( size );
86 
87  const char* buffer = reinterpret_cast<const char*>( buff );
88  uint32_t wrtsize = size;
89  while( wrtsize > 0 )
90  {
91  if( !wrtbuff ) wrtbuff.reset( new WrtBuff( objcfg ) );
92  uint64_t written = wrtbuff->Write( wrtsize, buffer );
93  buffer += written;
94  wrtsize -= written;
95  if( wrtbuff->Complete() ) EnqueueBuff( std::move( wrtbuff ) );
96  }
97 
98  //-------------------------------------------------------------------------
99  // We can tell the user it's done as we have the date cached in the
100  // buffer
101  //-------------------------------------------------------------------------
102  ScheduleHandler( handler );
103  }

References XrdCl::Status::IsOK(), and XrdEc::ScheduleHandler().

+ Here is the call graph for this function:

The documentation for this class was generated from the following files: