diff --git a/DAQController.cc b/DAQController.cc index 75ed2a6..1c25b94 100644 --- a/DAQController.cc +++ b/DAQController.cc @@ -267,7 +267,7 @@ int DAQController::OpenThreads(){ fProcessingThreads.reserve(fNProcessingThreads); for(int i=0; i(fOptions, fLog)); + fFormatters.emplace_back(std::make_unique(fOptions, fLog, fDigitizers)); fProcessingThreads.emplace_back(&StraxFormatter::Process, fFormatters.back().get()); } catch(const std::exception& e) { fLog->Entry(MongoLog::Warning, "Error opening processing threads: %s", diff --git a/DAXHelpers.hh b/DAXHelpers.hh index bb4611a..fd30c5d 100644 --- a/DAXHelpers.hh +++ b/DAXHelpers.hh @@ -14,12 +14,25 @@ public: DAXHelpers(){}; ~DAXHelpers(){}; -static unsigned int StringToHex(std::string str){ +static unsigned int StringToHexOld(std::string str){ + // This function takes ~360ns to run std::stringstream ss(str); u_int32_t result; return ss >> std::hex >> result ? result : 0; }; +static unsigned int StringToHex(const std::string& str) { + // this function takes ~12ns to run + uint32_t result = 0; + int i = 0; + for (auto it = str.rbegin(); it != str.rend(); ++it) { + bool is_letter = *it & 0x40; // see ascii + int val = (!is_letter)*(*it-'0') + is_letter*(9+(*it & 0xF)); // branchless is 2x faster + result += (val << 4*i++); // nibble by nibble + } + return result; +} + const static int Idle = 0; const static int Arming = 1; const static int Armed = 2; diff --git a/Options.cc b/Options.cc index 408d551..2785831 100644 --- a/Options.cc +++ b/Options.cc @@ -308,7 +308,7 @@ int16_t Options::GetChannel(int bid, int cid){ return bson_options["channels"][boardstring][cid].get_int32().value; } catch(std::exception& e){ - fLog->Entry(MongoLog::Error, "Failed to look up board %i ch %i", bid, cid); + fLog->Entry(MongoLog::Local, "Failed to look up board %i ch %i", bid, cid); return -1; } } diff --git a/StraxFormatter.cc b/StraxFormatter.cc index 21ee1dc..b1dd02a 100644 --- a/StraxFormatter.cc +++ b/StraxFormatter.cc @@ -57,7 +57,7 @@ const std::map&, st {"delete", compress_devnull} }; -StraxFormatter::StraxFormatter(std::shared_ptr& opts, std::shared_ptr& log){ +StraxFormatter::StraxFormatter(std::shared_ptr& opts, std::shared_ptr& log, const std::map>>& digis){ fActive = true; fChunkNameLength=6; fStraxHeaderSize=24; @@ -65,37 +65,36 @@ StraxFormatter::StraxFormatter(std::shared_ptr& opts, std::shared_ptrGetDouble("strax_chunk_length", 5)*1e9); // default 5s - fChunkOverlap = long(fOptions->GetDouble("strax_chunk_overlap", 0.5)*1e9); // default 0.5s - fFragmentBytes = fOptions->GetInt("strax_fragment_payload_bytes", 110*2); + fChunkLength = long(opts->GetDouble("strax_chunk_length", 5)*1e9); // default 5s + fChunkOverlap = long(opts->GetDouble("strax_chunk_overlap", 0.5)*1e9); // default 0.5s + fFragmentBytes = opts->GetInt("strax_fragment_payload_bytes", 110*2); fFullFragmentSize = fFragmentBytes + fStraxHeaderSize; try { - fCompressor = compressors.at(fOptions->GetString("compressor", "lz4")); + fCompressor = compressors.at(opts->GetString("compressor", "lz4")); } catch (...) { fLog->Entry(MongoLog::Error, "Invalid compressor specified"); throw std::runtime_error("Invalid compressor"); } fFullChunkLength = fChunkLength+fChunkOverlap; - fHostname = fOptions->Hostname(); - std::string run_name; + fHostname = opts->Hostname(); const int run_name_length = 6; - int run_num = fOptions->GetInt("number", -1); - if (run_num == -1) run_name = "run"; - else { - run_name = std::to_string(run_num); - if (run_name.size() < run_name_length) - run_name.insert(0, run_name_length - run_name.size(), int('0')); - } + std::string run_name(run_name_length+1, '\0'); + fRunNumber = opts->GetInt("number", -1); + if (fRunNumber == -1) run_name = "run"; + else sprintf(run_name.data(), "06d", fRunNumber); // run name length is 6 + + // cache channel map + for (const auto& link : digis) for (const auto& digi: link) for (unsigned ch = 0; ch < digi->GetNumChannels; ++ch) + fChannelMap[digi->fBID()].push_back(Options->GetChannel(digi->fBID(), ch)); fEmptyVerified = 0; fLog = log; - fBufferNumChunks = fOptions->GetInt("strax_buffer_num_chunks", 2); - fWarnIfChunkOlderThan = fOptions->GetInt("strax_chunk_phase_limit", 2); + fBufferNumChunks = opts->GetInt("strax_buffer_num_chunks", 2); + fWarnIfChunkOlderThan = opts->GetInt("strax_chunk_phase_limit", 2); fMutexWaitTime.reserve(1<<20); - std::string output_path = fOptions->GetString("strax_output_path", "./"); + std::string output_path = opts->GetString("strax_output_path", "./"); try{ fs::path op(output_path); op /= run_name; @@ -133,25 +132,6 @@ void StraxFormatter::GetDataPerChan(std::map& ret) { return; } -void StraxFormatter::GenerateArtificialDeadtime(int64_t timestamp, const std::shared_ptr& digi) { - std::string fragment; - fragment.reserve(fFullFragmentSize); - timestamp *= digi->GetClockWidth(); // TODO nv - int32_t length = fFragmentBytes>>1; - int16_t sw = digi->SampleWidth(), channel = digi->GetADChannel(), zero = 0; - fragment.append((char*)×tamp, sizeof(timestamp)); - fragment.append((char*)&length, sizeof(length)); - fragment.append((char*)&sw, sizeof(sw)); - fragment.append((char*)&channel, sizeof(channel)); - fragment.append((char*)&length, sizeof(length)); - fragment.append((char*)&zero, sizeof(zero)); // fragment_i - fragment.append((char*)&zero, sizeof(zero)); // baseline - for (; length > 0; length--) - fragment.append((char*)&zero, sizeof(zero)); // wf - AddFragmentToBuffer(std::move(fragment), 0, 0); - return; -} - void StraxFormatter::ProcessDatapacket(std::unique_ptr dp){ // Take a buffer and break it up into one document per channel auto it = dp->buff.begin(); @@ -174,7 +154,7 @@ void StraxFormatter::ProcessDatapacket(std::unique_ptr dp){ missed = false; // this happens quite rarely, the chance of overwriting ourselves is vanishing // but it's nice to be able to know why we missed an event - std::string filename = std::to_string(fOptions->GetInt("number", -1)) + "_missed"; + std::string filename = std::to_string(fRunNumber) + "_missed"; std::ofstream fout(filename, std::ios::out | std::ios::binary); fout.write((char*)dp->buff.data(), dp->buff.size()*sizeof(dp->buff[0])); fout.close(); @@ -233,7 +213,7 @@ int StraxFormatter::ProcessChannel(std::u32string_view buff, int words_in_event, uint32_t samples_in_pulse = wf.size()*sizeof(char32_t)/sizeof(uint16_t); uint16_t sw = dp->digi->SampleWidth(); int samples_per_frag= fFragmentBytes>>1; - int16_t global_ch = fOptions->GetChannel(dp->digi->bid(), channel); + int16_t global_ch = fChannelMap[dp->digi->bid()][channel]; // Failing to discern which channel we're getting data from seems serious enough to throw if(global_ch==-1) throw std::runtime_error("Failed to parse channel map. I'm gonna just kms now."); @@ -298,11 +278,14 @@ void StraxFormatter::AddFragmentToBuffer(std::string fragment, uint32_t ts, int fOutputBufferSize += fFullFragmentSize; - if(!overlap){ + auto chunks = {&fChunks, &fOverlaps}; + (*chunks[overlap])[chunk_id].emplace_back(std::move(fragment)); + +/* if(!overlap){ fChunks[chunk_id].emplace_back(std::move(fragment)); } else { fOverlaps[chunk_id].emplace_back(std::move(fragment)); - } + }*/ } int StraxFormatter::ReceiveDatapackets(std::list>& in, int bytes) { diff --git a/StraxFormatter.hh b/StraxFormatter.hh index 9db9645..cdfd7e3 100644 --- a/StraxFormatter.hh +++ b/StraxFormatter.hh @@ -52,7 +52,7 @@ class StraxFormatter{ */ public: - StraxFormatter(std::shared_ptr&, std::shared_ptr&); + StraxFormatter(std::shared_ptr&, std::shared_ptr&, const std::map>>&); ~StraxFormatter(); void Close(std::map& ret); @@ -71,7 +71,6 @@ private: void WriteOutChunk(int); void WriteOutChunks(); void End(); - void GenerateArtificialDeadtime(int64_t, const std::shared_ptr&); void AddFragmentToBuffer(std::string, uint32_t, int); std::vector GetChunkNames(int); @@ -89,10 +88,10 @@ private: int fFullFragmentSize; int fBufferNumChunks; int fWarnIfChunkOlderThan; + int fRunNumber; unsigned fChunkNameLength; int64_t fFullChunkLength; std::string fOutputPath, fHostname, fFullHostname; - std::shared_ptr fOptions; std::shared_ptr fLog; std::atomic_bool fActive; std::map> fChunks, fOverlaps; @@ -105,6 +104,7 @@ private: std::map fBytesPerChunk; std::atomic_int fInputBufferSize, fOutputBufferSize; long fBytesProcessed; + std::map> fChannelMap; double fProcTimeDP, fProcTimeEv, fProcTimeCh, fCompTime; std::thread::id fThreadId; diff --git a/V1724.cc b/V1724.cc index 1d5657d..3e205e6 100644 --- a/V1724.cc +++ b/V1724.cc @@ -334,9 +334,10 @@ std::tuple V1724::UnpackChannelHead // More rollover logic here, because channels are independent and the // processing is multithreaded. We leverage the fact that readout windows are // short and polled frequently compared to the rollover timescale, so there - // will never be a large difference in timestamps in one data packet - if (ch_time > 15e8 && header_time < 5e8 && rollovers != 0) rollovers--; - else if (ch_time < 5e8 && header_time > 15e8) rollovers++; + // will never be a large difference in timestamps in one data packet. + // Allegedly + rollovers -= 1*(ch_time > 15e8 && header_time < 5e8 && rollovers != 0); // header rolled while channel hasn't + rollovers += 1*(ch_time < 5e8 && header_time > 15e8); // channel rolled while header hasn't return {((rollovers<<31)+ch_time)*fClockCycle - fDelayPerCh[ch] - fPreTrigPerCh[ch], words, 0, sv.substr(2, words-2)}; }