XRootD
XrdClEcHandler.cc
Go to the documentation of this file.
1 /*
2  * XrdClEcHandler.cc
3  *
4  * Created on: Nov 2, 2021
5  * Author: simonm
6  */
7 
8 
9 
10 #include "XrdCl/XrdClEcHandler.hh"
11 
12 namespace XrdCl
13 {
14 
16  if (getenv("XrdCl_EC_X_RATIO"))
17  {
18  xRatio = atoi(getenv("XrdCl_EC_X_RATIO"));
19  }
20  else
21  {
22  xRatio = 1;
23  }
24  };
25 
27  XrdCl::LocationInfo &newList,
28  uint32_t n)
29  {
30  TryInitExportPaths();
31  AddServers(oldList);
32  UpdateSpaceInfo();
33 
34  lock.lock();
35  if (oldList.GetSize() > n && ! BlindSelect())
36  {
37  for (uint32_t j=0; j<ServerList.size(); j++)
38  {
39  for (uint32_t i=0; i<oldList.GetSize(); i++)
40  {
41  if (ServerList[j].address == oldList.At(i).GetAddress() &&
43  {
44  newList.Add(oldList.At(i));
45  if (newList.GetSize() == n)
46  {
47  lock.unlock();
48  return;
49  }
50  }
51  }
52  }
53  }
54  else
55  {
56  for (uint32_t i=0; i<oldList.GetSize(); i++)
57  {
58  if (Exists(oldList.At(i)) &&
60  {
61  newList.Add(oldList.At(i));
62  }
63  }
64  }
65  lock.unlock();
66  }
67 
69  {
70  for (uint32_t j=0; j<ServerList.size(); j++)
71  {
72  ServerList[j].Dump();
73  }
74  };
75 
76  void ServerSpaceInfo::TryInitExportPaths()
77  {
78  if (initExportPaths) return;
79  lock.lock();
80  if (! initExportPaths && getenv("XRDEXPORTS"))
81  {
82  std::istringstream p(getenv("XRDEXPORTS"));
83  std::string s;
84  while(std::getline(p, s, ' '))
85  {
86  ExportPaths.push_back(s);
87  }
88  initExportPaths = true;
89  }
90  lock.unlock();
91  };
92 
93  uint64_t ServerSpaceInfo::GetFreeSpace(const std::string addr)
94  {
95  XrdCl::FileSystem fs(addr);
96  XrdCl::Buffer queryArgs(1024), *queryResp = nullptr;
97 
98  for (uint32_t i=0; i<ExportPaths.size(); i++)
99  {
100  queryArgs.FromString(ExportPaths[i]);
101  XrdCl::XRootDStatus st = fs.Query(XrdCl::QueryCode::Space, queryArgs, queryResp, 0);
102  if (st.IsOK())
103  {
104  std::string resp = queryResp->ToString();
105  int b = resp.find("oss.free=", 0);
106  int e = resp.find("&", b);
107  uint64_t s = 0;
108  std::stringstream sstream0( resp.substr(b+9, e-(b+9)) );
109  sstream0 >> s;
110  if (queryResp) delete queryResp;
111  return s;
112  }
113  if (queryResp) delete queryResp;
114  }
115  return 0;
116  };
117 
118  bool ServerSpaceInfo::BlindSelect() {
119  auto ms_since_epoch = std::chrono::system_clock::now().time_since_epoch() /
120  std::chrono::nanoseconds(1);
121  return (ms_since_epoch % 10 > xRatio ? true : false);
122  };
123 
124  void ServerSpaceInfo::UpdateSpaceInfo()
125  {
126  if (! initExportPaths) return;
127  time_t t = time(NULL);
128  if (t < lastUpdateT + 300) return;
129  lock.lock();
130  if (t > lastUpdateT + 300)
131  {
132  for (uint32_t j=0; j<ServerList.size(); j++)
133  ServerList[j].freeSpace = GetFreeSpace(ServerList[j].address);
134  std::sort(ServerList.begin(), ServerList.end());
135  lastUpdateT = t;
136  }
137  lock.unlock();
138  };
139 
140  bool ServerSpaceInfo::Exists(XrdCl::LocationInfo::Location &loc)
141  {
142  for (uint32_t j=0; j<ServerList.size(); j++)
143  if (loc.GetAddress() == ServerList[j].address)
144  return true;
145  return false;
146  };
147 
148  void ServerSpaceInfo::AddServers(XrdCl::LocationInfo &locInfo)
149  {
150  lock.lock();
151  for (uint32_t i=0; i<locInfo.GetSize(); i++)
152  {
153  if (Exists(locInfo.At(i))) continue;
154  if (locInfo.At(i).GetType() == XrdCl::LocationInfo::ServerOnline)
155  {
156  FreeSpace s;
157  s.address = locInfo.At(i).GetAddress();
158  s.freeSpace = GetFreeSpace( s.address );
159  ServerList.push_back(s);
160  std::sort(ServerList.begin(), ServerList.end());
161  }
162  }
163  lock.unlock();
164  };
165 
166  EcHandler* GetEcHandler( const URL &headnode, const URL &redirurl )
167  {
168  const URL::ParamsMap &params = redirurl.GetParams();
169  // make sure all the xrdec. tokens are present and the values are sane
170  URL::ParamsMap::const_iterator itr = params.find( "xrdec.nbdta" );
171  if( itr == params.end() ) return nullptr;
172  uint8_t nbdta = std::stoul( itr->second );
173 
174  itr = params.find( "xrdec.nbprt" );
175  if( itr == params.end() ) return nullptr;
176  uint8_t nbprt = std::stoul( itr->second );
177 
178  itr = params.find( "xrdec.blksz" );
179  if( itr == params.end() ) return nullptr;
180  uint64_t blksz = std::stoul( itr->second );
181 
182  itr = params.find( "xrdec.plgr" );
183  if( itr == params.end() ) return nullptr;
184  std::vector<std::string> plgr;
185  Utils::splitString( plgr, itr->second, "," );
186  if( plgr.size() < nbdta + nbprt ) return nullptr;
187 
188  itr = params.find( "xrdec.objid" );
189  if( itr == params.end() ) return nullptr;
190  std::string objid = itr->second;
191 
192  itr = params.find( "xrdec.format" );
193  if( itr == params.end() ) return nullptr;
194  size_t format = std::stoul( itr->second );
195  if( format != 1 ) return nullptr; // TODO use constant
196 
197  std::vector<std::string> dtacgi;
198  itr = params.find( "xrdec.dtacgi" );
199  if( itr != params.end() )
200  {
201  Utils::splitString( dtacgi, itr->second, "," );
202  if( plgr.size() != dtacgi.size() ) return nullptr;
203  }
204 
205  std::vector<std::string> mdtacgi;
206  itr = params.find( "xrdec.mdtacgi" );
207  if( itr != params.end() )
208  {
209  Utils::splitString( mdtacgi, itr->second, "," );
210  if( plgr.size() != mdtacgi.size() ) return nullptr;
211  }
212 
213  itr = params.find( "xrdec.cosc" );
214  if( itr == params.end() ) return nullptr;
215  std::string cosc_str = itr->second;
216  if( cosc_str != "true" && cosc_str != "false" ) return nullptr;
217  bool cosc = cosc_str == "true";
218 
219  std::string ckstype;
220  itr = params.find( "xrdec.cksum" );
221  if( cosc && itr == params.end() ) return nullptr;
222  if( cosc )
223  ckstype = itr->second;
224 
225  std::string chdigest;
226  itr = params.find( "xrdec.chdigest" );
227  if( itr == params.end() )
228  chdigest = "crc32c";
229  else
230  chdigest = itr->second;
231  bool usecrc32c = ( chdigest == "crc32c" );
232 
233  bool nomtfile = false;
234  itr = params.find( "xrdec.nomtfile" );
235  if( itr != params.end() )
236  nomtfile = ( itr->second == "true" );
237 
238  XrdEc::ObjCfg *objcfg = new XrdEc::ObjCfg( objid, nbdta, nbprt, blksz / nbdta, usecrc32c );
239  objcfg->plgr = std::move( plgr );
240  objcfg->dtacgi = std::move( dtacgi );
241  objcfg->mdtacgi = std::move( mdtacgi );
242  objcfg->nomtfile = nomtfile;
243 
244  std::unique_ptr<CheckSumHelper> cksHelper( cosc ? new CheckSumHelper( "", ckstype ) : nullptr );
245  if( cksHelper )
246  {
247  auto st = cksHelper->Initialize();
248  if( !st.IsOK() ) return nullptr;
249  }
250 
251  return new EcHandler( headnode, objcfg, std::move( cksHelper ) );
252  }
253 
254 }
255 
void getline(uchar *buff, int blen)
Binary blob representation.
Definition: XrdClBuffer.hh:34
void FromString(const std::string str)
Fill the buffer from a string.
Definition: XrdClBuffer.hh:205
std::string ToString() const
Convert the buffer to a string.
Definition: XrdClBuffer.hh:215
Check sum helper for stdio.
Send file/filesystem queries to an XRootD cluster.
LocationType GetType() const
Get location type.
const std::string & GetAddress() const
Get address.
Path location info.
uint32_t GetSize() const
Get number of locations.
Location & At(uint32_t index)
Get the location at index.
void Add(const Location &location)
Add a location.
@ ServerOnline
server node where the file is online
void SelectLocations(XrdCl::LocationInfo &oldList, XrdCl::LocationInfo &newList, uint32_t n)
URL representation.
Definition: XrdClURL.hh:31
std::map< std::string, std::string > ParamsMap
Definition: XrdClURL.hh:33
const ParamsMap & GetParams() const
Get the URL params.
Definition: XrdClURL.hh:244
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition: XrdClUtils.hh:56
EcHandler * GetEcHandler(const URL &headnode, const URL &redirurl)
@ Space
Query logical space stats.
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
std::vector< std::string > mdtacgi
Definition: XrdEcObjCfg.hh:94
std::vector< std::string > plgr
Definition: XrdEcObjCfg.hh:92
std::vector< std::string > dtacgi
Definition: XrdEcObjCfg.hh:93