Last active
July 29, 2020 19:38
-
-
Save oktal/8d56812dccd434e8a0c5bb49b05340bc to your computer and use it in GitHub Desktop.
Low Latency Logging
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <algorithm> | |
#include <array> | |
#include <atomic> | |
#include <condition_variable> | |
#include <cstring> | |
#include <deque> | |
#include <functional> | |
#include <iostream> | |
#include <random> | |
#include <sstream> | |
#include <thread> | |
class LogBufferBase; | |
class LogBufferView | |
{ | |
public: | |
LogBufferView(const LogBufferBase& buffer) | |
: buffer{ buffer } | |
{} | |
const char* read(size_t index) const; | |
template<typename T> | |
T readAs(size_t index) const | |
{ | |
return *reinterpret_cast<const T*>(read(index)); | |
} | |
template<typename T> | |
const T * overlayAs(size_t index) const | |
{ | |
return reinterpret_cast<const T*>(read(index)); | |
} | |
private: | |
const LogBufferBase& buffer; | |
}; | |
template<typename T> | |
struct Offset | |
{ | |
Offset() | |
: offset{ 0 } | |
{} | |
explicit Offset(uint16_t offset) | |
: offset{ offset } | |
{} | |
T get(LogBufferView buffer) const | |
{ | |
return buffer.readAs<T>(offset); | |
} | |
private: | |
uint16_t offset; | |
}; | |
namespace tag | |
{ | |
struct String {}; | |
struct Function {}; | |
} | |
template<> | |
struct Offset<tag::String> | |
{ | |
Offset() | |
: offset{ 0 } | |
{} | |
explicit Offset(uint16_t offset) | |
: offset{ offset } | |
{} | |
std::string get(LogBufferView buffer) const | |
{ | |
auto size = buffer.readAs<uint16_t>(offset); | |
const char* data = buffer.read(offset + sizeof(uint16_t)); | |
return std::string(data, size); | |
} | |
const char* data(LogBufferView buffer) const | |
{ | |
return buffer.read(offset + sizeof(uint16_t)); | |
} | |
size_t size(LogBufferView buffer) const | |
{ | |
return buffer.readAs<uint16_t>(offset); | |
} | |
private: | |
uint16_t offset; | |
}; | |
template<> | |
struct Offset<tag::Function> | |
{ | |
Offset() | |
: offset{ 0 } | |
{} | |
explicit Offset(uint16_t offset) | |
: offset{ offset } | |
{} | |
template<typename FuncType, typename... Args> | |
auto invoke(LogBufferView buffer, Args&& ...args) const | |
{ | |
auto* func = buffer.overlayAs<FuncType>(offset); | |
std::invoke(*func, std::forward<Args>(args)...); | |
} | |
private: | |
uint16_t offset; | |
}; | |
using StringOffset = Offset<tag::String>; | |
using FunctionOffset = Offset<tag::Function>; | |
class LogBufferBase | |
{ | |
public: | |
friend class LogBufferView; | |
virtual ~LogBufferBase() = default; | |
template<typename T> | |
Offset<T> write(T value) | |
{ | |
return offsetAt<T>(encode(value)); | |
} | |
template<size_t N> | |
StringOffset write(const char(&str)[N]) | |
{ | |
return writeString(str, N - 1); | |
} | |
StringOffset write(const std::string& str) | |
{ | |
return writeString(str.data(), str.size()); | |
} | |
StringOffset write(const char* str) | |
{ | |
return writeString(str, std::strlen(str)); | |
} | |
template<typename Return, typename... Args> | |
FunctionOffset write(Return (*func)(Args...)) | |
{ | |
return offsetAt<tag::Function>(encodeFunction(func)); | |
} | |
size_t size() const | |
{ | |
return m_cursor; | |
} | |
private: | |
size_t m_cursor{ 0 }; | |
StringOffset writeString(const char* str, size_t size) | |
{ | |
return offsetAt<tag::String>(encodeString(str, size)); | |
} | |
virtual void reserve(size_t capacity) = 0; | |
protected: | |
virtual char* dataAt(size_t index) = 0; | |
const char* dataAt(size_t index) const | |
{ | |
return const_cast<LogBufferBase*>(this)->dataAt(index); | |
} | |
template<typename T> | |
T* overlayAt(size_t offset) | |
{ | |
return reinterpret_cast<T*>(dataAt(offset)); | |
} | |
template<typename T> | |
const T* overlayAt(size_t offset) const | |
{ | |
return reinterpret_cast<const T*>(dataAt(offset)); | |
} | |
template<typename T> | |
size_t encode(T value) | |
{ | |
reserve(size() + sizeof(value)); | |
auto index = m_cursor; | |
*overlayAt<T>(index) = value; | |
m_cursor += sizeof(T); | |
return index; | |
} | |
size_t encodeRaw(const char* bytes, size_t size) | |
{ | |
reserve(this->size() + size); | |
auto index = m_cursor; | |
std::memcpy(dataAt(index), bytes, size); | |
m_cursor += size; | |
return index; | |
} | |
size_t encodeString(const char* str, size_t size) | |
{ | |
auto index = encode(static_cast<uint16_t>(size)); | |
encodeRaw(str, size); | |
return index; | |
} | |
template<typename Function> | |
size_t encodeFunction(Function func) | |
{ | |
reserve(size() + sizeof(Function)); | |
auto index = m_cursor; | |
//*overlayAt<Function>(index) = func; | |
m_cursor += sizeof(Function); | |
return index; | |
} | |
void advance(size_t size) | |
{ | |
m_cursor += size; | |
} | |
size_t cursor() const | |
{ | |
return m_cursor; | |
} | |
template<typename T> | |
Offset<T> offsetAt(size_t index) const | |
{ | |
return Offset<T> { static_cast<uint16_t>(index) }; | |
} | |
}; | |
template<size_t N> | |
class LogBuffer : public LogBufferBase | |
{ | |
public: | |
LogBuffer() | |
: m_data(&m_inlineData[0]) | |
, m_capacity { N } | |
{} | |
~LogBuffer() | |
{ | |
if (!isSmall()) | |
{ | |
std::free(m_data); | |
m_data = nullptr; | |
} | |
} | |
LogBuffer(const LogBuffer& other) | |
{ | |
*this = other; | |
} | |
LogBuffer(LogBuffer&& other) noexcept | |
{ | |
*this = std::move(other); | |
} | |
LogBuffer& operator=(const LogBuffer& other) | |
{ | |
reserve(other.m_capacity); | |
std::memcpy(m_data, other.m_data, other.size()); | |
m_capacity = other.m_capacity; | |
advance(other.cursor()); | |
return *this; | |
} | |
LogBuffer& operator=(LogBuffer&& other) noexcept | |
{ | |
// If the other is not small, let's steal its buffer | |
if (!other.isSmall()) | |
{ | |
// Cleanup our buffer if we were not small | |
if (!isSmall()) | |
std::free(m_data); | |
m_data = other.m_data; | |
other.m_data = nullptr; | |
} | |
// The other is small, which means that we will also be small, so memcpy the bytes | |
// over | |
else | |
{ | |
// Cleanup our buffer if we were not small | |
if (!isSmall()) | |
std::free(m_data); | |
std::memcpy(m_data, other.m_data, cursor()); | |
m_data = m_inlineData.data(); | |
} | |
advance(other.cursor()); | |
m_capacity = other.m_capacity; | |
return* this; | |
} | |
private: | |
using InlineStorage = std::array<char, N>; | |
InlineStorage m_inlineData {}; | |
char* m_data { nullptr }; | |
size_t m_capacity { 0 }; | |
bool isSmall() const | |
{ | |
return m_data == &m_inlineData[0]; | |
} | |
virtual void reserve(size_t capacity) override | |
{ | |
if (capacity <= m_capacity) | |
return; | |
auto newCapacity = std::max(m_capacity * 2, capacity); | |
auto* data = static_cast<char*>(::malloc(newCapacity * sizeof(char))); | |
if (data == nullptr) | |
throw std::bad_alloc(); | |
if (isSmall()) | |
{ | |
std::memcpy(data, m_inlineData.data(), m_inlineData.size()); | |
} | |
else | |
{ | |
std::memcpy(data, m_data, this->cursor()); | |
::free(m_data); | |
} | |
m_data = data; | |
m_capacity = newCapacity; | |
} | |
protected: | |
char* dataAt(size_t index) override | |
{ | |
return m_data + index; | |
} | |
}; | |
inline const char* LogBufferView::read(size_t index) const | |
{ | |
return buffer.dataAt(index); | |
} | |
class FormatCallback | |
{ | |
public: | |
void operator()(const char* data, size_t size) const | |
{ | |
call(data, size); | |
} | |
private: | |
virtual void call(const char* data, size_t size) const = 0; | |
}; | |
template<typename Stream> | |
class StreamCallback : public FormatCallback | |
{ | |
public: | |
StreamCallback(Stream& stream) | |
: stream { stream } | |
{} | |
private: | |
Stream& stream; | |
void call(const char* data, size_t size) const override | |
{ | |
stream.write(data, size); | |
} | |
}; | |
/* | |
+---------+--------------------------------+---------+ | |
| LogFunc | OffsetsIndex | Data | Offsets | | |
+---------+--------------+-----------------+---------+ | |
| | |
^------------------- | |
*/ | |
class EventLogBuffer : public LogBuffer<255> | |
{ | |
public: | |
using LogFunc = void (*)(const LogBufferBase& buffer, uint16_t offsetsIndex, FormatCallback& callback); | |
static constexpr size_t HeaderOffset = 0; | |
struct Header | |
{ | |
LogFunc logFunc; | |
uint16_t offsetsIndex; | |
}; | |
EventLogBuffer() | |
{ | |
LogBufferBase::advance(HeaderOffset + sizeof(Header)); | |
} | |
template<typename T> | |
void writeEvent(const T& value) | |
{ | |
auto offsetsIndex = encode(value); | |
encodeHeader<T>(offsetsIndex); | |
} | |
void format(FormatCallback& callback) const | |
{ | |
const auto* header = decodeHeader(); | |
std::invoke(header->logFunc, *this, header->offsetsIndex, callback); | |
} | |
private: | |
template<typename T> | |
void encodeHeader(size_t offsetsIndex) | |
{ | |
auto* header = LogBufferBase::template overlayAt<Header>(HeaderOffset); | |
header->offsetsIndex = static_cast<uint16_t>(offsetsIndex); | |
header->logFunc = [](const LogBufferBase& buffer, uint16_t offsetsIndex, FormatCallback& callback) { | |
LogBufferView view{ buffer }; | |
const T* obj = view.overlayAs<T>(offsetsIndex); | |
obj->format(view, callback); | |
}; | |
} | |
const Header* decodeHeader() const | |
{ | |
return LogBufferBase::template overlayAt<Header>(HeaderOffset); | |
} | |
}; | |
struct MyEvent | |
{ | |
Offset<uint32_t> providerId; | |
Offset<char> key; | |
StringOffset reason; | |
template<typename FormatCallback> | |
void format(LogBufferView view, FormatCallback& callback) const | |
{ | |
std::ostringstream os; | |
os << "MyEvent { ProviderId: " << providerId.get(view) << ", Key: " << key.get(view) << ", Reason: " << reason.get(view) << " }"; | |
const auto& str = os.str(); | |
callback(str.data(), str.size()); | |
} | |
}; | |
struct AsyncChannel | |
{ | |
~AsyncChannel() | |
{ | |
if (isStarted()) | |
stop(); | |
if (thread.joinable()) | |
thread.join(); | |
} | |
void log(const EventLogBuffer& buffer) | |
{ | |
LockGuard guard(queueLock); | |
queue.push_back(Entry::create(buffer)); | |
cv.notify_one(); | |
} | |
void start() | |
{ | |
bool expected = false; | |
if (!started.compare_exchange_strong(expected, true)) | |
return; | |
thread = std::thread([=] { | |
this->threadMain(); | |
}); | |
} | |
void stop() | |
{ | |
bool expected = true; | |
if (!started.compare_exchange_strong(expected, false)) | |
return; | |
LockGuard guard(queueLock); | |
queue.push_back(Entry::stop()); | |
cv.notify_one(); | |
} | |
bool isStarted() const { | |
return started.load(std::memory_order_acquire); | |
} | |
private: | |
struct Entry | |
{ | |
static Entry create(const EventLogBuffer& buffer) | |
{ | |
return Entry{false, buffer}; | |
} | |
static Entry stop() | |
{ | |
return Entry{true, EventLogBuffer{}}; | |
} | |
bool stopFlag; | |
EventLogBuffer buffer; | |
}; | |
using Lock = std::mutex; | |
using LockGuard = std::unique_lock<Lock>; | |
Lock queueLock; | |
std::condition_variable cv; | |
std::deque<Entry> queue; | |
std::thread thread; | |
std::atomic<bool> started { false }; | |
void threadMain() | |
{ | |
std::ostringstream oss; | |
for (;;) | |
{ | |
bool stopFlag = false; | |
LockGuard guard(queueLock); | |
cv.wait(guard, [&] { return !queue.empty(); }); | |
for (const auto& entry: queue) | |
{ | |
if (entry.stopFlag) | |
{ | |
stopFlag = true; | |
break; | |
} | |
StreamCallback<std::ostringstream> callback { oss }; | |
entry.buffer.format(callback); | |
std::cout << oss.str() << std::endl; | |
oss.str(""); | |
} | |
queue.clear(); | |
if (stopFlag) | |
break; | |
} | |
} | |
}; | |
template<typename T, T... Args> | |
T pickRandom() | |
{ | |
static constexpr T Values[] = { Args... }; | |
static std::random_device rd; | |
static std::default_random_engine engine(rd()); | |
static std::uniform_int_distribution<int> dist(0, sizeof...(Args) - 1); | |
return Values[dist(engine)]; | |
} | |
namespace details | |
{ | |
template<typename T> using OffsetT = decltype(static_cast<LogBufferBase *>(0)->write(std::declval<T>())); | |
template<typename... Args> using OffsetTuple = std::tuple<OffsetT<std::decay_t<Args>>...>; | |
template<typename... Args> | |
auto writeAll(LogBufferBase& buffer, Args&& ...args) | |
{ | |
return std::make_tuple(buffer.write(args)...); | |
} | |
template<typename Func, typename... Args> | |
struct EventWrapper | |
{ | |
using Offsets = OffsetTuple<Args...>; | |
EventWrapper(Offsets offsets, FunctionOffset funcOffset) | |
: offsets { offsets } | |
, funcOffset { funcOffset } | |
{} | |
void format(LogBufferView view, FormatCallback& callback) const | |
{ | |
formatImpl(view, callback, std::index_sequence_for<Args...>()); | |
} | |
private: | |
Offsets offsets; | |
FunctionOffset funcOffset; | |
template<size_t... Indexes> | |
void formatImpl(LogBufferView view, FormatCallback& callback, std::index_sequence<Indexes...>) const | |
{ | |
funcOffset.invoke<Func>(view, view, callback, std::get<Indexes>(offsets)...); | |
} | |
}; | |
} | |
template<typename Func, typename... Args> | |
void logInfo(AsyncChannel& channel, Func func, Args&& ...args) | |
{ | |
using FunctionPtr = void (*)(LogBufferView, FormatCallback&, details::OffsetT<std::decay_t<Args>>...); | |
EventLogBuffer buffer; | |
auto offsets = details::writeAll(buffer, std::forward<Args>(args)...); | |
auto funcOffset = buffer.write(static_cast<FunctionPtr>(func)); | |
details::EventWrapper<Func, Args...> event(offsets, funcOffset); | |
buffer.writeEvent(event); | |
channel.log(buffer); | |
} | |
int main() | |
{ | |
AsyncChannel channel; | |
channel.start(); | |
static constexpr size_t Count = 100; | |
for (size_t i = 0; i < Count; ++i) | |
{ | |
/* | |
EventLogBuffer buffer; | |
MyEvent myEvent{ | |
buffer.write(pickRandom<uint32_t, 10, 32, 45>()), | |
buffer.write(pickRandom<char, 'A', 'F', '!'>()), | |
buffer.write(reason), | |
}; | |
buffer.writeEvent(myEvent); | |
channel.log(buffer); | |
*/ | |
std::string reason("My specific reason"); | |
auto providerId = pickRandom<uint32_t, 10, 32, 45>(); | |
auto key = pickRandom<char, 'A', 'F', '!'>(); | |
logInfo(channel, [](LogBufferView buffer, FormatCallback& callback, auto providerId, auto key, auto reason) { | |
std::ostringstream oss; | |
oss << "Something happened. ProviderId: " << providerId.get(buffer) << " Key: " << key.get(buffer) << " Reason: " << reason.get(buffer); | |
const auto& str = oss.str(); | |
callback(str.data(), str.size()); | |
}, providerId, key, reason); | |
} | |
std::this_thread::sleep_for(std::chrono::seconds(1)); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment