XRootD
XrdEcStrmWriter.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #include "XrdEc/XrdEcStrmWriter.hh"
26 #include "XrdEc/XrdEcThreadPool.hh"
27 
28 #include "XrdOuc/XrdOucCRC32C.hh"
29 
30 #include "XrdZip/XrdZipLFH.hh"
31 #include "XrdZip/XrdZipCDFH.hh"
32 #include "XrdZip/XrdZipEOCD.hh"
33 #include "XrdZip/XrdZipUtils.hh"
34 
35 #include <numeric>
36 #include <algorithm>
37 #include <future>
38 
39 namespace XrdEc
40 {
41  //---------------------------------------------------------------------------
42  // Open the data object for writting
43  //---------------------------------------------------------------------------
44  void StrmWriter::Open( XrdCl::ResponseHandler *handler, uint16_t timeout )
45  {
46  const size_t size = objcfg.plgr.size();
47 
48  std::vector<XrdCl::Pipeline> opens;
49  opens.reserve( size );
50  // initialize all zip archive objects
51  for( size_t i = 0; i < size; ++i )
52  dataarchs.emplace_back( std::make_shared<XrdCl::ZipArchive>(
54 
55  for( size_t i = 0; i < size; ++i )
56  {
57  std::string url = objcfg.GetDataUrl( i );
58  XrdCl::Ctx<XrdCl::ZipArchive> zip( *dataarchs[i] );
59  opens.emplace_back( XrdCl::OpenArchive( zip, url, XrdCl::OpenFlags::New | XrdCl::OpenFlags::Write ) );
60  }
61 
62  XrdCl::Async( XrdCl::Parallel( opens ).AtLeast( objcfg.nbchunks ) >>
63  [=]( XrdCl::XRootDStatus &st )
64  {
65  if( !st.IsOK() ) global_status.report_open( st );
66  handler->HandleResponse( new XrdCl::XRootDStatus( st ), nullptr );
67  }, timeout );
68  }
69 
70  //---------------------------------------------------------------------------
71  // Write data to the data object
72  //---------------------------------------------------------------------------
73  void StrmWriter::Write( uint32_t size, const void *buff, XrdCl::ResponseHandler *handler )
74  {
75  //-------------------------------------------------------------------------
76  // First, check the global status, if we are in an error state just
77  // fail the request.
78  //-------------------------------------------------------------------------
79  XrdCl::XRootDStatus gst = global_status.get();
80  if( !gst.IsOK() ) return ScheduleHandler( handler, gst );
81 
82  //-------------------------------------------------------------------------
83  // Update the number of bytes left to be written
84  //-------------------------------------------------------------------------
85  global_status.issue_write( size );
86 
87  const char* buffer = reinterpret_cast<const char*>( buff );
88  uint32_t wrtsize = size;
89  while( wrtsize > 0 )
90  {
91  if( !wrtbuff ) wrtbuff.reset( new WrtBuff( objcfg ) );
92  uint64_t written = wrtbuff->Write( wrtsize, buffer );
93  buffer += written;
94  wrtsize -= written;
95  if( wrtbuff->Complete() ) EnqueueBuff( std::move( wrtbuff ) );
96  }
97 
98  //-------------------------------------------------------------------------
99  // We can tell the user it's done as we have the date cached in the
100  // buffer
101  //-------------------------------------------------------------------------
102  ScheduleHandler( handler );
103  }
104 
105  //---------------------------------------------------------------------------
106  // Close the data object
107  //---------------------------------------------------------------------------
108  void StrmWriter::Close( XrdCl::ResponseHandler *handler, uint16_t timeout )
109  {
110  //-------------------------------------------------------------------------
111  // First, check the global status, if we are in an error state just
112  // fail the request.
113  //-------------------------------------------------------------------------
114  XrdCl::XRootDStatus gst = global_status.get();
115  if( !gst.IsOK() ) return ScheduleHandler( handler, gst );
116  //-------------------------------------------------------------------------
117  // Take care of the left-over data ...
118  //-------------------------------------------------------------------------
119  if( wrtbuff && !wrtbuff->Empty() ) EnqueueBuff( std::move( wrtbuff ) );
120  //-------------------------------------------------------------------------
121  // Let the global status handle the close
122  //-------------------------------------------------------------------------
123  global_status.issue_close( handler, timeout );
124  }
125 
126  //---------------------------------------------------------------------------
127  // Issue the write requests for the given write buffer
128  //---------------------------------------------------------------------------
129  void StrmWriter::WriteBuff( std::unique_ptr<WrtBuff> buff )
130  {
131  //-------------------------------------------------------------------------
132  // Our buffer with the data block, will be shared between all pipelines
133  // writing to different servers.
134  //-------------------------------------------------------------------------
135  std::shared_ptr<WrtBuff> wrtbuff( std::move( buff ) );
136 
137  //-------------------------------------------------------------------------
138  // Shuffle the servers so every block has a different placement
139  //-------------------------------------------------------------------------
140  static std::default_random_engine random_engine( std::chrono::system_clock::now().time_since_epoch().count() );
141  std::shared_ptr<sync_queue<size_t>> servers = std::make_shared<sync_queue<size_t>>();
142  std::vector<size_t> zipid( dataarchs.size() );
143  std::iota( zipid.begin(), zipid.end(), 0 );
144  std::shuffle( zipid.begin(), zipid.end(), random_engine );
145  auto itr = zipid.begin();
146  for( ; itr != zipid.end() ; ++itr ) servers->enqueue( std::move( *itr ) );
147 
148  //-------------------------------------------------------------------------
149  // Create the write pipelines for updating stripes
150  //-------------------------------------------------------------------------
151  const size_t nbchunks = objcfg.nbchunks;
152  std::vector<XrdCl::Pipeline> writes;
153  writes.reserve( nbchunks );
154  size_t blknb = next_blknb++;
155  uint64_t blksize = 0;
156  for( size_t strpnb = 0; strpnb < nbchunks; ++strpnb )
157  {
158  std::string fn = objcfg.GetFileName( blknb, strpnb );
159  uint32_t crc32c = wrtbuff->GetCrc32c( strpnb );
160  uint64_t strpsize = wrtbuff->GetStrpSize( strpnb );
161  char* strpbuff = wrtbuff->GetStrpBuff( strpnb );
162  if( strpnb < objcfg.nbdata ) blksize += strpsize;
163 
164  //-----------------------------------------------------------------------
165  // Find a server where we can append the next data chunk
166  //-----------------------------------------------------------------------
168  size_t srvid;
169  if( !servers->dequeue( srvid ) )
170  {
172  0, "No more data servers to try." );
173  //---------------------------------------------------------------------
174  // calculate the full block size, otherwise the user handler
175  // will be never called
176  //---------------------------------------------------------------------
177  for( size_t i = strpnb + 1; i < objcfg.nbdata; ++i )
178  blksize += wrtbuff->GetStrpSize( i );
179  global_status.report_wrt( err, blksize );
180  return;
181  }
182  zip = *dataarchs[srvid];
183 
184  //-----------------------------------------------------------------------
185  // Create the Write request
186  //-----------------------------------------------------------------------
187  XrdCl::Pipeline p = XrdCl::AppendFile( zip, fn, crc32c, strpsize, strpbuff ) >>
188  [=]( XrdCl::XRootDStatus &st ) mutable
189  {
190  //------------------------------------------------
191  // Try to recover from error
192  //------------------------------------------------
193  if( !st.IsOK() )
194  {
195  //----------------------------------------------
196  // Select another server
197  //----------------------------------------------
198  if( !servers->dequeue( srvid ) ) return; // if there are no more servers we simply fail
199  zip = *dataarchs[srvid];
200  //----------------------------------------------
201  // Retry this operation at different server
202  //----------------------------------------------
204  }
205  //------------------------------------------------
206  // Make sure the buffer is only deallocated
207  // after the handler is called
208  //------------------------------------------------
209  wrtbuff.reset();
210  };
211  writes.emplace_back( std::move( p ) );
212  }
213 
214  XrdCl::WaitFor( XrdCl::Parallel( writes ) >> [=]( XrdCl::XRootDStatus &st ){ global_status.report_wrt( st, blksize ); } );
215  }
216 
217  //---------------------------------------------------------------------------
218  // Get a buffer with metadata (CDFH and EOCD records)
219  //---------------------------------------------------------------------------
220  XrdZip::buffer_t StrmWriter::GetMetadataBuffer()
221  {
222  using namespace XrdZip;
223 
224  const size_t cdcnt = objcfg.plgr.size();
225  std::vector<buffer_t> buffs; buffs.reserve( cdcnt ); // buffers with raw data
226  std::vector<LFH> lfhs; lfhs.reserve( cdcnt ); // LFH records
227  std::vector<CDFH> cdfhs; cdfhs.reserve( cdcnt ); // CDFH records
228 
229  //-------------------------------------------------------------------------
230  // prepare data structures (LFH and CDFH records)
231  //-------------------------------------------------------------------------
232  uint64_t offset = 0;
233  uint64_t cdsize = 0;
234  mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
235  for( size_t i = 0; i < cdcnt; ++i )
236  {
237  std::string fn = std::to_string( i ); // file name (URL of the data archive)
238  buffer_t buff( dataarchs[i]->GetCD() ); // raw data buffer (central directory of the data archive)
239  uint32_t cksum = objcfg.digest( 0, buff.data(), buff.size() ); // digest (crc) of the buffer
240  lfhs.emplace_back( fn, cksum, buff.size(), time( 0 ) ); // LFH record for the buffer
241  LFH &lfh = lfhs.back();
242  cdfhs.emplace_back( &lfh, mode, offset ); // CDFH record for the buffer
243  offset += LFH::lfhBaseSize + fn.size() + buff.size(); // shift the offset
244  cdsize += cdfhs.back().cdfhSize; // update central directory size
245  buffs.emplace_back( std::move( buff ) ); // keep the buffer for later
246  }
247 
248  uint64_t zipsize = offset + cdsize + EOCD::eocdBaseSize;
249  buffer_t zipbuff; zipbuff.reserve( zipsize );
250 
251  //-------------------------------------------------------------------------
252  // write into the final buffer LFH records + raw data
253  //-------------------------------------------------------------------------
254  for( size_t i = 0; i < cdcnt; ++i )
255  {
256  lfhs[i].Serialize( zipbuff );
257  std::copy( buffs[i].begin(), buffs[i].end(), std::back_inserter( zipbuff ) );
258  }
259  //-------------------------------------------------------------------------
260  // write into the final buffer CDFH records
261  //-------------------------------------------------------------------------
262  for( size_t i = 0; i < cdcnt; ++i )
263  cdfhs[i].Serialize( zipbuff );
264  //-------------------------------------------------------------------------
265  // prepare and write into the final buffer the EOCD record
266  //-------------------------------------------------------------------------
267  EOCD eocd( offset, cdcnt, cdsize );
268  eocd.Serialize( zipbuff );
269 
270  return zipbuff;
271  }
272 
273  //---------------------------------------------------------------------------
274  // Close the data object (implementation)
275  //---------------------------------------------------------------------------
276  void StrmWriter::CloseImpl( XrdCl::ResponseHandler *handler, uint16_t timeout )
277  {
278  //-------------------------------------------------------------------------
279  // First, check the global status, if we are in an error state just
280  // fail the request.
281  //-------------------------------------------------------------------------
282  XrdCl::XRootDStatus gst = global_status.get();
283  if( !gst.IsOK() ) return ScheduleHandler( handler, gst );
284 
285  const size_t size = objcfg.plgr.size();
286  //-------------------------------------------------------------------------
287  // prepare the metadata (the Central Directory of each data ZIP)
288  //-------------------------------------------------------------------------
289  auto zipbuff = objcfg.nomtfile ? nullptr :
290  std::make_shared<XrdZip::buffer_t>( GetMetadataBuffer() );
291  //-------------------------------------------------------------------------
292  // prepare the pipelines ...
293  //-------------------------------------------------------------------------
294  std::vector<XrdCl::Pipeline> closes;
295  std::vector<XrdCl::Pipeline> save_metadata;
296  closes.reserve( size );
297  std::string closeTime = std::to_string( time(NULL) );
298 
299  std::vector<XrdCl::xattr_t> xav{ {"xrdec.filesize", std::to_string(GetSize())},
300  {"xrdec.strpver", closeTime.c_str()} };
301 
302  for( size_t i = 0; i < size; ++i )
303  {
304  //-----------------------------------------------------------------------
305  // close ZIP archives with data
306  //-----------------------------------------------------------------------
307  if( dataarchs[i]->IsOpen() )
308  {
309  XrdCl::Pipeline p = XrdCl::SetXAttr( dataarchs[i]->GetFile(), xav )
310  | XrdCl::CloseArchive( *dataarchs[i] );
311  closes.emplace_back( std::move( p ) );
312  }
313  //-----------------------------------------------------------------------
314  // replicate the metadata
315  //-----------------------------------------------------------------------
316  if( zipbuff )
317  {
318  std::string url = objcfg.GetMetadataUrl( i );
319  metadataarchs.emplace_back( std::make_shared<XrdCl::File>(
320  Config::Instance().enable_plugins ) );
322  | XrdCl::Write( *metadataarchs[i], 0, zipbuff->size(), zipbuff->data() )
323  | XrdCl::Close( *metadataarchs[i] )
324  | XrdCl::Final( [zipbuff]( const XrdCl::XRootDStatus& ){ } );
325 
326  save_metadata.emplace_back( std::move( p ) );
327  }
328  }
329 
330  //-------------------------------------------------------------------------
331  // If we were instructed not to create the the additional metadata file
332  // do the simplified close
333  //-------------------------------------------------------------------------
334  if( save_metadata.empty() )
335  {
336  XrdCl::Pipeline p = XrdCl::Parallel( closes ).AtLeast( objcfg.nbchunks ) >> handler;
337  XrdCl::Async( std::move( p ), timeout );
338  return;
339  }
340 
341  //-------------------------------------------------------------------------
342  // compose closes & save_metadata:
343  // - closes must be successful at least for #data + #parity
344  // - save_metadata must be successful at least for #parity + 1
345  //-------------------------------------------------------------------------
347  XrdCl::Parallel( closes ).AtLeast( objcfg.nbchunks ),
348  XrdCl::Parallel( save_metadata ).AtLeast( objcfg.nbparity + 1 )
349  ) >> handler;
350  XrdCl::Async( std::move( p ), timeout );
351  }
352 }
uint32_t crc32c(uint32_t crc, void const *buf, size_t len)
static void Repeat()
Repeat current operation.
Handle an async response.
bool enable_plugins
Definition: XrdEcConfig.hh:77
static Config & Instance()
Singleton access.
Definition: XrdEcConfig.hh:46
void Open(XrdCl::ResponseHandler *handler, uint16_t timeout=0)
void Write(uint32_t size, const void *buff, XrdCl::ResponseHandler *handler)
void Close(XrdCl::ResponseHandler *handler, uint16_t timeout=0)
WriteImpl< false > Write(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< const void * > buffer, uint16_t timeout=0)
Factory for creating WriteImpl objects.
CloseArchiveImpl< false > CloseArchive(Ctx< ZipArchive > zip, uint16_t timeout=0)
Factory for creating CloseFileImpl objects.
SetXAttrImpl< false > SetXAttr(Ctx< File > file, Arg< std::string > name, Arg< std::string > value)
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
std::future< XRootDStatus > Async(Pipeline pipeline, uint16_t timeout=0)
XRootDStatus WaitFor(Pipeline pipeline, uint16_t timeout=0)
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, uint16_t timeout=0)
Factory for creating ReadImpl objects.
ParallelOperation< false > Parallel(Container &&container)
Factory function for creating parallel operation from a vector.
AppendFileImpl< false > AppendFile(Ctx< ZipArchive > zip, Arg< std::string > fn, Arg< uint32_t > crc32, Arg< uint32_t > size, Arg< const void * > buffer, uint16_t timeout=0)
Factory for creating ArchiveReadImpl objects.
FinalOperation Final
const uint16_t errNoMoreReplicas
No more replicas to try.
Definition: XrdClStatus.hh:65
OpenArchiveImpl< false > OpenArchive(Ctx< ZipArchive > zip, Arg< std::string > fn, Arg< OpenFlags::Flags > flags, uint16_t timeout=0)
Factory for creating OpenArchiveImpl objects.
CloseImpl< false > Close(Ctx< File > file, uint16_t timeout=0)
Factory for creating CloseImpl objects.
void ScheduleHandler(uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler)
std::vector< char > buffer_t
Definition: XrdZipUtils.hh:56
@ Write
Open only for writing.
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
std::string GetDataUrl(size_t i) const
Definition: XrdEcObjCfg.hh:65
std::vector< std::string > plgr
Definition: XrdEcObjCfg.hh:92
const uint8_t nbdata
Definition: XrdEcObjCfg.hh:87
std::string GetMetadataUrl(size_t i) const
Definition: XrdEcObjCfg.hh:72
std::string GetFileName(size_t blknb, size_t strpnb) const
Definition: XrdEcObjCfg.hh:79
const uint8_t nbchunks
Definition: XrdEcObjCfg.hh:85
const uint8_t nbparity
Definition: XrdEcObjCfg.hh:86
A data structure representing ZIP Local File Header.
Definition: XrdZipLFH.hh:42
void Serialize(buffer_t &buffer)
Serialize the object into a buffer.
Definition: XrdZipLFH.hh:106