Initial Commit - Lesson 31 (Commit #1)

This commit is contained in:
Norman Lansing
2026-02-24 22:39:26 -05:00
commit 9591e7f503
4631 changed files with 1019212 additions and 0 deletions

View File

@@ -0,0 +1,102 @@
/*
* All or portions of this file Copyright (c) Amazon.com, Inc. or its affiliates
* or its licensors.
*
* For complete copyright and license terms please see the LICENSE at the root
* of this distribution (the "License"). All use of this software is governed by
* the License, or, if provided, by the license below or the license
* accompanying this file. Do not remove or modify any license notices. This
* file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
* ANY KIND, either express or implied.
*
*/
#include <aws/gamelift/metrics/Combiner.h>
#include <cassert>
void Combiner::Add(MetricMessage message) {
if (message.IsGauge()) {
// We have to do a bit more work for gauges to convert GaugeAdd to GaugeSet
UpdateGauge(message);
} else {
auto it = m_combinedMessages.find(message.Metric);
if (it == std::end(m_combinedMessages)) {
m_combinedMessages.emplace(message.Metric, message);
return;
}
assert(message.IsCounter() || message.IsTimer());
if (message.IsCounter()) {
UpdateCounter(it->second, message);
} else if (message.IsTimer()) {
UpdateTimer(it->second, message);
}
}
}
void Combiner::UpdateGauge(const MetricMessage &message) {
assert(message.Type == MetricMessageType::GaugeSet ||
message.Type == MetricMessageType::GaugeAdd);
if (message.Type == MetricMessageType::GaugeSet) {
// If setting - we just use the new value
m_combinedMessages[message.Metric] = message;
m_gaugeHistory[message.Metric] = message;
} else if (message.Type == MetricMessageType::GaugeAdd) {
// If adding - we get historic value (if available) and add to it
// if there's no historic value, we just add to 0
double currentValue = 0;
auto it = m_gaugeHistory.find(message.Metric);
if (it != std::end(m_gaugeHistory)) {
currentValue = it->second.SubmitDouble.Value;
}
currentValue += message.SubmitDouble.Value;
auto newMessage = MetricMessage::GaugeSet(*message.Metric, currentValue);
m_combinedMessages[message.Metric] = newMessage;
m_gaugeHistory[message.Metric] = newMessage;
}
}
void Combiner::UpdateCounter(MetricMessage &current,
const MetricMessage &newMessage) {
assert(newMessage.Type == MetricMessageType::CounterAdd);
// Just sum the values.
current.SubmitDouble.Value += newMessage.SubmitDouble.Value;
}
void Combiner::UpdateTimer(MetricMessage &current,
const MetricMessage &newMessage) {
assert(newMessage.Type == MetricMessageType::TimerSet);
// Grab or init sample count
auto sampleCountIt = m_timerSampleCount.find(current.Metric);
if (sampleCountIt != std::end(m_timerSampleCount)) {
++(sampleCountIt->second);
} else {
// This is the second sample being added, hence we initialize it to 2.
auto pair = m_timerSampleCount.emplace(current.Metric, 2);
const bool insertSuccess = pair.second;
assert(insertSuccess);
sampleCountIt = pair.first;
}
// Welford's algorithm
// numerically stable mean
//
// Knuth - TACOP Vol 2 pg. 216
//
// https://nullbuffer.com/articles/welford_algorithm.html
const double update =
(newMessage.SubmitDouble.Value - current.SubmitDouble.Value) /
sampleCountIt->second;
current.SubmitDouble.Value += update;
}
void Combiner::Clear() {
m_combinedMessages.clear();
m_timerSampleCount.clear();
}

View File

@@ -0,0 +1,145 @@
/*
* All or portions of this file Copyright (c) Amazon.com, Inc. or its affiliates or
* its licensors.
*
* For complete copyright and license terms please see the LICENSE at the root of this
* distribution (the "License"). All use of this software is governed by the License,
* or, if provided, by the license below or the license accompanying this file. Do not
* remove or modify any license notices. This file is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
*/
#include <aws/gamelift/metrics/CrashReporterClient.h>
#include <aws/gamelift/internal/retry/RetryingCallable.h>
#include <aws/gamelift/internal/retry/JitteredGeometricBackoffRetryStrategy.h>
#include <spdlog/spdlog.h>
#include <stdexcept>
#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif
namespace Aws {
namespace GameLift {
namespace Metrics {
CrashReporterClient::CrashReporterClient(const std::string& host, int port)
: httpClient(std::make_shared<Aws::GameLift::Internal::HttpClient>()) {
baseUrl = "http://" + host + ":" + std::to_string(port) + "/";
}
CrashReporterClient::CrashReporterClient(std::shared_ptr<Aws::GameLift::Internal::HttpClient> httpClient, const std::string &host, int port)
: httpClient(std::move(httpClient)) {
baseUrl = "http://" + host + ":" + std::to_string(port) + "/";
}
bool CrashReporterClient::isRetryableError(const std::string& errorMessage) const {
return errorMessage.find("Connection refused") != std::string::npos ||
errorMessage.find("Connection failed") != std::string::npos;
}
void CrashReporterClient::RegisterProcessWithRetries() {
#ifdef _WIN32
int processPid = static_cast<int>(GetCurrentProcessId());
#else
int processPid = static_cast<int>(getpid());
#endif
std::string requestUri = baseUrl + RegisterProcessUrlPath + "?" +
ProcessPidParameterName + "=" + std::to_string(processPid);
spdlog::info("Registering process with {} {} in OTEL Collector Crash Reporter", ProcessPidParameterName, processPid);
// 5 retries, 1s base delay with jitter (default)
// Total max wait time: ~1s + 2s + 4s + 8s + 16s = ~31s
Aws::GameLift::Internal::JitteredGeometricBackoffRetryStrategy retryStrategy;
auto callable = [this, &requestUri, processPid]() -> bool {
try {
auto response = httpClient->SendGetRequest(requestUri);
if (response.IsSuccessfulStatusCode()) {
spdlog::info("Successfully registered {} {} to OTEL Collector Crash Reporter", ProcessPidParameterName, processPid);
return true;
} else {
spdlog::error("Failed to register {} {} to OTEL Collector Crash Reporter, Http response: {} - {}",
ProcessPidParameterName, processPid, response.statusCode, response.body);
return true; // Don't retry on HTTP errors (4xx, 5xx)
}
} catch (const std::exception& e) {
std::string errorMsg = e.what();
if (isRetryableError(errorMsg)) {
spdlog::warn("Failed to register {} {} to OTEL Collector Crash Reporter due to connection error: {}",
ProcessPidParameterName, processPid, e.what());
return false; // Retry on connection errors
} else {
spdlog::error("Failed to register {} {} to OTEL Collector Crash Reporter due to error: {}",
ProcessPidParameterName, processPid, e.what());
return true; // Don't retry on other errors
}
}
};
Aws::GameLift::Internal::RetryingCallable::Builder()
.WithRetryStrategy(&retryStrategy)
.WithCallable(callable)
.Build()
.call();
}
void CrashReporterClient::RegisterProcess() {
RegisterProcessWithRetries();
}
void CrashReporterClient::TagGameSession(const std::string& sessionId) {
#ifdef _WIN32
int processPid = static_cast<int>(GetCurrentProcessId());
#else
int processPid = static_cast<int>(getpid());
#endif
std::string requestUri = baseUrl + UpdateProcessUrlPath + "?" +
ProcessPidParameterName + "=" + std::to_string(processPid) + "&" +
SessionIdParameterName + "=" + sessionId;
try {
spdlog::info("Adding {} tag {} to process with {} {} to the OTEL Collector Crash Reporter",
SessionIdParameterName, sessionId, ProcessPidParameterName, processPid);
auto response = httpClient->SendGetRequest(requestUri);
if (!response.IsSuccessfulStatusCode()) {
spdlog::error("Failed to add {} tag {} to process with {} {} in the OTEL Collector Crash Reporter, Http response: {} - {}",
SessionIdParameterName, sessionId, ProcessPidParameterName, processPid,
response.statusCode, response.body);
}
} catch (const std::exception& e) {
spdlog::error("Failed to add {} tag {} to process with {} {} in the OTEL Collector Crash Reporter due to error: {}",
SessionIdParameterName, sessionId, ProcessPidParameterName, processPid, e.what());
}
}
void CrashReporterClient::DeregisterProcess() {
#ifdef _WIN32
int processPid = static_cast<int>(GetCurrentProcessId());
#else
int processPid = static_cast<int>(getpid());
#endif
std::string requestUri = baseUrl + DeregisterProcessUrlPath + "?" +
ProcessPidParameterName + "=" + std::to_string(processPid);
try {
spdlog::info("Unregistering process with {} {} in OTEL Collector Crash Reporter",
ProcessPidParameterName, processPid);
auto response = httpClient->SendGetRequest(requestUri);
if (!response.IsSuccessfulStatusCode()) {
spdlog::error("Failed to deregister {} {} in the OTEL Collector Crash Reporter, Http response: {} - {}",
ProcessPidParameterName, processPid, response.statusCode, response.body);
}
} catch (const std::exception& e) {
spdlog::error("Failed to deregister {} {} in the OTEL Collector Crash Reporter due to error: {}",
ProcessPidParameterName, processPid, e.what());
}
}
} // namespace Metrics
} // namespace GameLift
} // namespace Aws

View File

@@ -0,0 +1,25 @@
/*
* All or portions of this file Copyright (c) Amazon.com, Inc. or its affiliates
* or its licensors.
*
* For complete copyright and license terms please see the LICENSE at the root
* of this distribution (the "License"). All use of this software is governed by
* the License, or, if provided, by the license below or the license
* accompanying this file. Do not remove or modify any license notices. This
* file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
* ANY KIND, either express or implied.
*
*/
#include <aws/gamelift/metrics/DerivedMetric.h>
namespace Aws {
namespace GameLift {
namespace Metrics {
IDerivedMetric::~IDerivedMetric() {}
IDerivedMetricVisitor::~IDerivedMetricVisitor() {}
IDerivedMetricCollection::~IDerivedMetricCollection() {}
} // namespace Metrics
} // namespace GameLift
} // namespace Aws

View File

@@ -0,0 +1,33 @@
/*
* All or portions of this file Copyright (c) Amazon.com, Inc. or its affiliates
* or its licensors.
*
* For complete copyright and license terms please see the LICENSE at the root
* of this distribution (the "License"). All use of this software is governed by
* the License, or, if provided, by the license below or the license
* accompanying this file. Do not remove or modify any license notices. This
* file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
* ANY KIND, either express or implied.
*
*/
#include <aws/gamelift/metrics/DynamicMetric.h>
#ifndef GAMELIFT_USE_STD
#include <cstring>
namespace Aws {
namespace GameLift {
namespace Metrics {
void DynamicMetric::SetKey(const char *newKey) {
#ifdef _WIN32
strncpy_s(m_key, DynamicMetric::MAXIMUM_KEY_LENGTH, newKey,
DynamicMetric::MAXIMUM_KEY_LENGTH);
#else
strncpy(m_key, newKey, DynamicMetric::MAXIMUM_KEY_LENGTH);
#endif // _WIN32
}
} // namespace Metrics
} // namespace GameLift
} // namespace Aws
#endif // !GAMELIFT_USE_STD

View File

@@ -0,0 +1,177 @@
/*
* All or portions of this file Copyright (c) Amazon.com, Inc. or its affiliates
* or its licensors.
*
* For complete copyright and license terms please see the LICENSE at the root
* of this distribution (the "License"). All use of this software is governed by
* the License, or, if provided, by the license below or the license
* accompanying this file. Do not remove or modify any license notices. This
* file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
* ANY KIND, either express or implied.
*
*/
#include <aws/gamelift/metrics/GlobalMetricsProcessor.h>
#include <aws/gamelift/metrics/IMetricsProcessor.h>
#include <aws/gamelift/metrics/MetricsProcessor.h>
#include <aws/gamelift/metrics/LoggerMacros.h>
#include <aws/gamelift/metrics/CrashReporterClient.h>
#include <aws/gamelift/metrics/StatsDClient.h>
#include <aws/gamelift/server/model/GameSession.h>
#include <cassert>
#include <cstdlib>
#include <string>
#ifdef __linux__
#include <unistd.h>
#elif defined(_WIN32) || defined(_WIN64)
#include <windows.h>
#endif
GAMELIFT_METRICS_DEFINE_GAUGE(ServerUpGauge);
namespace {
std::unique_ptr<::Aws::GameLift::Metrics::IMetricsProcessor>
GlobalProcessor(nullptr);
std::shared_ptr<::Aws::GameLift::Metrics::StatsDClient>
GlobalStatsDClient(nullptr);
std::shared_ptr<::Aws::GameLift::Metrics::CrashReporterClient>
GlobalCrashReporter(nullptr);
void InitializeCrashReporter(const MetricsSettings &settings) {
std::string crashReporterHost;
#ifdef GAMELIFT_USE_STD
crashReporterHost = settings.CrashReporterHost;
#else
crashReporterHost = (settings.CrashReporterHost != nullptr) ? std::string(settings.CrashReporterHost) : "";
#endif
// Skip crash reporter initialization if host is empty
if (crashReporterHost.empty()) {
GAMELIFT_METRICS_LOG_INFO("Crash reporter disabled - host not set");
return;
}
int crashReporterPort = settings.CrashReporterPort;
GlobalCrashReporter = std::make_shared<CrashReporterClient>(crashReporterHost, crashReporterPort);
GlobalCrashReporter->RegisterProcess();
}
void InitializeStatsDClient(const MetricsSettings &settings) {
std::string statsdHost;
#ifdef GAMELIFT_USE_STD
statsdHost = settings.StatsDClientHost;
#else
statsdHost = (settings.StatsDClientHost != nullptr) ? std::string(settings.StatsDClientHost) : "";
#endif
// Skip crash reporter initialization if host is empty
if (statsdHost.empty()) {
GAMELIFT_METRICS_LOG_INFO("StatsDClient disabled - host not set");
return;
}
int statsdPort = settings.StatsDClientPort;
GlobalStatsDClient = std::make_shared<StatsDClient>(statsdHost.c_str(), statsdPort);
GAMELIFT_METRICS_LOG_INFO("Created StatsD client for {}:{}", statsdHost, statsdPort);
}
void InitializeDefaultGlobalTags() {
if (GlobalProcessor) {
// Check if GAMELIFT_SDK_PROCESS_ID environment variable is set
const char *processId = std::getenv(ENV_VAR_PROCESS_ID);
if (processId != nullptr && processId[0] != '\0') {
GlobalProcessor->SetGlobalTag("gamelift_process_id", processId);
GAMELIFT_METRICS_LOG_INFO("Set global tag gamelift_process_id: {}",
processId);
}
// Set the OS process ID (Linux and Windows).
#if defined(_WIN32) || defined(_WIN64)
DWORD pid = GetCurrentProcessId();
std::string pidStr = std::to_string(pid);
#else
pid_t pid = getpid();
std::string pidStr = std::to_string(pid);
#endif
GlobalProcessor->SetGlobalTag("process_pid", pidStr.c_str());
GAMELIFT_METRICS_LOG_INFO("Set global tag process_pid: {}", pidStr);
}
}
} // namespace
namespace Aws {
namespace GameLift {
namespace Metrics {
void MetricsInitialize(const MetricsSettings &settings) {
GAMELIFT_METRICS_LOG_INFO("Initializing GameLift Servers Metrics");
assert(!GlobalProcessor);
InitializeCrashReporter(settings);
InitializeStatsDClient(settings);
// Create settings with StatsD callback only if no callback is already set
MetricsSettings settingsWithCallbackOverride = settings;
if (!settings.SendPacketCallback) {
settingsWithCallbackOverride.SendPacketCallback = [](const char* data, int size) {
if (GlobalStatsDClient) {
GlobalStatsDClient->Send(data, size);
} else {
GAMELIFT_METRICS_LOG_ERROR("StatsDClient is not initialized. Cannot send metrics data.");
}
};
}
GlobalProcessor.reset(new MetricsProcessor(settingsWithCallbackOverride));
InitializeDefaultGlobalTags();
GAMELIFT_METRICS_LOG_INFO(
"GameLift Servers Metrics initialized successfully");
}
void MetricsTerminate() {
GAMELIFT_METRICS_SET(ServerUpGauge, 0);
// Process the final metrics before shutting down
if (GlobalProcessor) {
GlobalProcessor->ProcessMetricsNow();
}
if (GlobalCrashReporter) {
GlobalCrashReporter->DeregisterProcess();
GlobalCrashReporter.reset();
}
if (GlobalStatsDClient) {
GlobalStatsDClient.reset();
}
GlobalProcessor.reset();
}
void MetricsProcess() {
assert(GlobalProcessor);
if (GlobalProcessor) {
GlobalProcessor->ProcessMetrics();
}
}
void OnGameSessionStarted(
const Aws::GameLift::Server::Model::GameSession &session) {
assert(GlobalProcessor);
if (GlobalProcessor) {
GlobalProcessor->OnStartGameSession(session);
}
if (GlobalCrashReporter) {
#ifdef GAMELIFT_USE_STD
GlobalCrashReporter->TagGameSession(session.GetGameSessionId());
#else
GlobalCrashReporter->TagGameSession(std::string(session.GetGameSessionId()));
#endif
}
}
} // namespace Metrics
} // namespace GameLift
} // namespace Aws
::Aws::GameLift::Metrics::IMetricsProcessor *GameLiftMetricsGlobalProcessor() {
return GlobalProcessor.get();
}

View File

@@ -0,0 +1,53 @@
/*
* All or portions of this file Copyright (c) Amazon.com, Inc. or its affiliates
* or its licensors.
*
* For complete copyright and license terms please see the LICENSE at the root
* of this distribution (the "License"). All use of this software is governed by
* the License, or, if provided, by the license below or the license
* accompanying this file. Do not remove or modify any license notices. This
* file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
* ANY KIND, either express or implied.
*
*/
#include <aws/gamelift/metrics/HighResolutionClock.h>
#ifndef GAMELIFT_USE_STD
#include <aws/gamelift/metrics/InternalTypes.h>
#include <chrono>
/**
* We hide the internal chrono use and report a time as int64 nanoseconds
* publicly.
*
* Then convert back to chrono internally.
*/
namespace {
using Nanoseconds =
std::chrono::duration<Aws::GameLift::Metrics::Int64, std::nano>;
}
namespace Aws {
namespace GameLift {
namespace Metrics {
namespace Internal {
Int64 HighResolutionClock::Now() {
const auto now = std::chrono::high_resolution_clock::now();
return std::chrono::time_point_cast<Nanoseconds>(now)
.time_since_epoch()
.count();
}
double HighResolutionClock::ToMilliseconds(Int64 duration) {
using Milliseconds = std::chrono::duration<double, std::milli>;
return std::chrono::duration_cast<Milliseconds>(Nanoseconds(duration))
.count();
}
} // namespace Internal
} // namespace Metrics
} // namespace GameLift
} // namespace Aws
#endif // !GAMELIFT_USE_STD

View File

@@ -0,0 +1,74 @@
/*
* All or portions of this file Copyright (c) Amazon.com, Inc. or its affiliates
* or its licensors.
*
* For complete copyright and license terms please see the LICENSE at the root
* of this distribution (the "License"). All use of this software is governed by
* the License, or, if provided, by the license below or the license
* accompanying this file. Do not remove or modify any license notices. This
* file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
* ANY KIND, either express or implied.
*
*/
#include <aws/gamelift/metrics/IMetricsProcessor.h>
#include <aws/gamelift/metrics/DynamicTag.h>
namespace Aws {
namespace GameLift {
namespace Metrics {
IMetricsEnqueuer::~IMetricsEnqueuer() {}
IMetricsProcessor::~IMetricsProcessor() {}
#ifdef GAMELIFT_USE_STD
MetricMessage MetricMessage::TagSet(IMetric &metric, std::string key,
std::string value) {
DynamicTag *tag = new DynamicTag(std::move(key), std::move(value));
return MetricMessage(MetricMessageType::TagSet, &metric, MetricSetTag(tag));
}
MetricMessage MetricMessage::TagRemove(Aws::GameLift::Metrics::IMetric &metric,
std::string key) {
DynamicTag *tag = new DynamicTag(std::move(key), "");
return MetricMessage(MetricMessageType::TagRemove, &metric,
MetricSetTag(tag));
}
#else
MetricMessage MetricMessage::TagSet(IMetric &metric, const char *key,
const char *value) {
DynamicTag *tag = new DynamicTag(key, value);
return MetricMessage(MetricMessageType::TagSet, &metric, MetricSetTag(tag));
}
MetricMessage MetricMessage::TagRemove(IMetric &metric, const char *key) {
DynamicTag *tag = new DynamicTag(key, "");
return MetricMessage(MetricMessageType::TagRemove, &metric,
MetricSetTag(tag));
}
#endif
bool operator==(const MetricSetTag &a, const MetricSetTag &b) {
return a.Ptr->Key == b.Ptr->Key && a.Ptr->Value == b.Ptr->Value;
}
void CopyTagMessage(const MetricMessage &original, IMetric &destMetric,
IMetricsEnqueuer &enqueuer) {
switch (original.Type) {
case MetricMessageType::TagSet:
enqueuer.Enqueue(MetricMessage::TagSet(destMetric,
original.SetTag.Ptr->Key.c_str(),
original.SetTag.Ptr->Value.c_str()));
break;
case MetricMessageType::TagRemove:
enqueuer.Enqueue(
MetricMessage::TagRemove(destMetric, original.SetTag.Ptr->Key.c_str()));
break;
default:
break;
}
}
} // namespace Metrics
} // namespace GameLift
} // namespace Aws

View File

@@ -0,0 +1,21 @@
/*
* All or portions of this file Copyright (c) Amazon.com, Inc. or its affiliates
* or its licensors.
*
* For complete copyright and license terms please see the LICENSE at the root
* of this distribution (the "License"). All use of this software is governed by
* the License, or, if provided, by the license below or the license
* accompanying this file. Do not remove or modify any license notices. This
* file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
* ANY KIND, either express or implied.
*
*/
#include <aws/gamelift/metrics/InternalTypes.h>
namespace Aws {
namespace GameLift {
namespace Metrics {
IMetric::~IMetric() {}
} // namespace Metrics
} // namespace GameLift
} // namespace Aws

View File

@@ -0,0 +1,65 @@
/*
* All or portions of this file Copyright (c) Amazon.com, Inc. or its affiliates
* or its licensors.
*
* For complete copyright and license terms please see the LICENSE at the root
* of this distribution (the "License"). All use of this software is governed by
* the License, or, if provided, by the license below or the license
* accompanying this file. Do not remove or modify any license notices. This
* file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
* ANY KIND, either express or implied.
*
*/
#include <aws/gamelift/metrics/KeySuffix.h>
#ifdef GAMELIFT_USE_STD
namespace Aws {
namespace GameLift {
namespace Metrics {
void KeySuffix::Apply(const IMetric &original, DynamicMetric &target) {
target.SetKey(std::string(original.GetKey()) + m_suffix);
}
} // namespace Metrics
} // namespace GameLift
} // namespace Aws
#else
#include <algorithm>
#include <cstring>
#include <string>
namespace Aws {
namespace GameLift {
namespace Metrics {
KeySuffix::KeySuffix() {
std::fill(std::begin(m_suffix), std::end(m_suffix), '\0');
}
KeySuffix::KeySuffix(const char *newSuffix) {
#ifdef _WIN32
strncpy_s(m_suffix, KeySuffix::MAXIMUM_SUFFIX_LENGTH, newSuffix,
KeySuffix::MAXIMUM_SUFFIX_LENGTH);
#else
strncpy(m_suffix, newSuffix, KeySuffix::MAXIMUM_SUFFIX_LENGTH);
#endif // _WIN32
}
void KeySuffix::SetSuffix(const char *newSuffix) {
#ifdef _WIN32
strncpy_s(m_suffix, KeySuffix::MAXIMUM_SUFFIX_LENGTH, newSuffix,
KeySuffix::MAXIMUM_SUFFIX_LENGTH);
#else
strncpy(m_suffix, newSuffix, KeySuffix::MAXIMUM_SUFFIX_LENGTH);
#endif // _WIN32
}
void KeySuffix::Apply(const IMetric &original, DynamicMetric &target) {
std::string key = original.GetKey();
key += m_suffix;
target.SetKey(key.c_str());
}
} // namespace Metrics
} // namespace GameLift
} // namespace Aws
#endif // GAMELIFT_USE_STD

View File

@@ -0,0 +1,148 @@
/*
* All or portions of this file Copyright (c) Amazon.com, Inc. or its affiliates
* or its licensors.
*
* For complete copyright and license terms please see the LICENSE at the root
* of this distribution (the "License"). All use of this software is governed by
* the License, or, if provided, by the license below or the license
* accompanying this file. Do not remove or modify any license notices. This
* file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
* ANY KIND, either express or implied.
*
*/
#include <aws/gamelift/metrics/MetricsProcessor.h>
#include <aws/gamelift/metrics/DerivedMetric.h>
#include <aws/gamelift/metrics/GaugeMacros.h>
#include <iterator>
#include <unordered_set>
void MetricsProcessor::ProcessMetrics() {
const auto now = ClockT::now();
if (now < m_nextCaptureTime) {
return;
}
// The server_up metric needs to be set on each flush period, as an untouched
// metric will automatically expire in many metrics backends (e.g. Prometheus)
GAMELIFT_METRICS_SET(ServerUpGauge, 1);
ProcessMetricsNow();
}
void MetricsProcessor::ProcessMetricsNow() {
if (m_preProcessCallback) {
m_preProcessCallback();
}
const auto messageCount = m_messageQueueMPSC.size_approx();
m_messageQueueMPSC.try_dequeue_bulk(std::back_inserter(m_processQueue),
messageCount);
m_combinedMetrics.Clear();
ProcessMessages(m_processQueue);
m_processQueue.clear();
m_enqueuer.Clear();
m_nextCaptureTime = ClockT::now() + m_captureInterval;
}
namespace {
void UpdateDerivedMetrics(std::vector<MetricMessage> &messages,
IMetricsEnqueuer &enqueuer) {
for (auto &message : messages) {
struct HandleMessageVisitor final
: public Aws::GameLift::Metrics::IDerivedMetricVisitor {
MetricMessage &m_message;
IMetricsEnqueuer &m_enqueuer;
explicit HandleMessageVisitor(MetricMessage &message,
IMetricsEnqueuer &enqueuer) noexcept
: m_message(message), m_enqueuer(enqueuer) {}
virtual void VisitDerivedMetric(
Aws::GameLift::Metrics::IDerivedMetric &metric) override {
metric.HandleMessage(m_message, m_enqueuer);
}
};
HandleMessageVisitor visitor(message, enqueuer);
message.Metric->GetDerivedMetrics().Visit(visitor);
}
}
void SubmitDerivedMetrics(std::vector<MetricMessage> &messages,
IMetricsEnqueuer &enqueuer) {
std::unordered_set<const Aws::GameLift::Metrics::IMetric *> submittedMetrics;
for (auto &message : messages) {
const bool derivativeMetricsSubmitted =
submittedMetrics.find(message.Metric) != std::end(submittedMetrics);
if (derivativeMetricsSubmitted) {
continue;
}
submittedMetrics.emplace(message.Metric);
class EmitMessageVisitor final
: public Aws::GameLift::Metrics::IDerivedMetricVisitor {
public:
EmitMessageVisitor(const Aws::GameLift::Metrics::IMetric *originalMetric,
IMetricsEnqueuer &enqueuer) noexcept
: m_originalMetric(originalMetric), m_enqueuer(enqueuer) {}
virtual void VisitDerivedMetric(
Aws::GameLift::Metrics::IDerivedMetric &metric) override {
metric.EmitMetrics(m_originalMetric, m_enqueuer);
}
private:
const Aws::GameLift::Metrics::IMetric *m_originalMetric;
IMetricsEnqueuer &m_enqueuer;
};
EmitMessageVisitor visitor(message.Metric, enqueuer);
message.Metric->GetDerivedMetrics().Visit(visitor);
}
}
} // namespace
void MetricsProcessor::ProcessMessages(std::vector<MetricMessage> &messages) {
// Compute derived metrics and appends their messages to the end
UpdateDerivedMetrics(messages, m_enqueuer);
SubmitDerivedMetrics(messages, m_enqueuer);
std::copy(std::begin(m_enqueuer.m_messages), std::end(m_enqueuer.m_messages),
std::back_inserter(messages));
// Combine all the metrics
for (auto &message : messages) {
if (message.IsTag()) {
m_metricTags.Handle(message);
} else {
m_combinedMetrics.Add(message);
}
}
if (m_combinedMetrics.IsEmpty()) {
return;
}
// Build & send packets
for (const auto &message : m_combinedMetrics) {
m_packet.Append(message, m_globalTags, m_metricTags.GetTags(message.Metric),
m_sendPacket);
}
m_packet.Flush(m_sendPacket);
}
void MetricsProcessor::OnStartGameSession(
const Aws::GameLift::Server::Model::GameSession &session) {
#ifdef GAMELIFT_USE_STD
const char *sessionId = session.GetGameSessionId().c_str();
#else
const char *sessionId = session.GetGameSessionId();
#endif
if (sessionId != nullptr && sessionId[0] != '\0') {
SetGlobalTag("session_id", sessionId);
}
}

View File

@@ -0,0 +1,145 @@
/*
* All or portions of this file Copyright (c) Amazon.com, Inc. or its affiliates
* or its licensors.
*
* For complete copyright and license terms please see the LICENSE at the root
* of this distribution (the "License"). All use of this software is governed by
* the License, or, if provided, by the license below or the license
* accompanying this file. Do not remove or modify any license notices. This
* file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
* ANY KIND, either express or implied.
*
*/
#include <aws/gamelift/metrics/MetricsUtils.h>
#include <aws/gamelift/metrics/GlobalMetricsProcessor.h>
#include <aws/gamelift/metrics/MetricsSettings.h>
#include <aws/gamelift/common/GameLiftErrors.h>
#include <cstdlib>
#include <cstring>
#include <spdlog/spdlog.h>
using namespace Aws::GameLift::Server;
namespace Aws {
namespace GameLift {
namespace Metrics {
MetricsParameters CreateMetricsParametersFromEnvironmentOrDefault() {
// Start with default values
const char* statsdHost = DEFAULT_STATSD_HOST;
int statsdPort = DEFAULT_STATSD_PORT;
const char* crashReporterHost = DEFAULT_CRASH_REPORTER_HOST;
int crashReporterPort = DEFAULT_CRASH_REPORTER_PORT;
int flushIntervalMs = DEFAULT_FLUSH_INTERVAL_MS;
int maxPacketSize = DEFAULT_MAX_PACKET_SIZE;
// Check environment variables and override defaults
const char* envStatsdHost = std::getenv(ENV_VAR_STATSD_HOST);
if (envStatsdHost && envStatsdHost[0] != '\0') {
statsdHost = envStatsdHost;
spdlog::info("Env override for statsdHost: {}", statsdHost);
}
const char* envStatsdPort = std::getenv(ENV_VAR_STATSD_PORT);
if (envStatsdPort && envStatsdPort[0] != '\0') {
statsdPort = std::atoi(envStatsdPort);
spdlog::info("Env override for statsdPort: {}", statsdPort);
}
const char* envCrashReporterHost = std::getenv(ENV_VAR_CRASH_REPORTER_HOST);
if (envCrashReporterHost && envCrashReporterHost[0] != '\0') {
crashReporterHost = envCrashReporterHost;
spdlog::info("Env override for crashReporterHost: {}", crashReporterHost);
}
const char* envCrashReporterPort = std::getenv(ENV_VAR_CRASH_REPORTER_PORT);
if (envCrashReporterPort && envCrashReporterPort[0] != '\0') {
crashReporterPort = std::atoi(envCrashReporterPort);
spdlog::info("Env override for crashReporterPort: {}", crashReporterPort);
}
const char* envFlushInterval = std::getenv(ENV_VAR_FLUSH_INTERVAL_MS);
if (envFlushInterval && envFlushInterval[0] != '\0') {
flushIntervalMs = std::atoi(envFlushInterval);
spdlog::info("Env override for flushIntervalMs: {}", flushIntervalMs);
}
const char* envMaxPacketSize = std::getenv(ENV_VAR_MAX_PACKET_SIZE);
if (envMaxPacketSize && envMaxPacketSize[0] != '\0') {
maxPacketSize = std::atoi(envMaxPacketSize);
spdlog::info("Env override for maxPacketSize: {}", maxPacketSize);
}
#ifdef GAMELIFT_USE_STD
return MetricsParameters(std::string(statsdHost), statsdPort, std::string(crashReporterHost), crashReporterPort, flushIntervalMs, maxPacketSize);
#else
return MetricsParameters(statsdHost, statsdPort, crashReporterHost, crashReporterPort, flushIntervalMs, maxPacketSize);
#endif
}
MetricsSettings FromMetricsParameters(const Aws::GameLift::Server::MetricsParameters &params) {
MetricsSettings settings;
settings.StatsDClientHost = params.GetStatsDHost();
settings.StatsDClientPort = params.GetStatsDPort();
settings.CrashReporterHost = params.GetCrashReporterHost();
settings.CrashReporterPort = params.GetCrashReporterPort();
settings.MaxPacketSizeBytes = params.GetMaxPacketSize();
settings.CaptureIntervalSec = params.GetFlushIntervalMs() / 1000.0f;
return settings;
}
Aws::GameLift::GenericOutcome ValidateMetricsParameters(const Aws::GameLift::Server::MetricsParameters &params) {
// Validate StatsD host
#ifdef GAMELIFT_USE_STD
if (params.GetStatsDHost().empty()) {
return Aws::GameLift::GenericOutcome(Aws::GameLift::GameLiftError(Aws::GameLift::GAMELIFT_ERROR_TYPE::VALIDATION_EXCEPTION, "StatsDHost cannot be empty"));
}
#else
const char* statsdHost = params.GetStatsDHost();
if (statsdHost == nullptr || statsdHost[0] == '\0') {
return Aws::GameLift::GenericOutcome(Aws::GameLift::GameLiftError(Aws::GameLift::GAMELIFT_ERROR_TYPE::VALIDATION_EXCEPTION, "StatsDHost cannot be empty"));
}
#endif
// Validate StatsD port
int statsdPort = params.GetStatsDPort();
if (statsdPort < PORT_MIN || statsdPort > PORT_MAX) {
return Aws::GameLift::GenericOutcome(Aws::GameLift::GameLiftError(Aws::GameLift::GAMELIFT_ERROR_TYPE::VALIDATION_EXCEPTION, "StatsDPort must be between 1 and 65535"));
}
// Validate CrashReporter host
#ifdef GAMELIFT_USE_STD
if (params.GetCrashReporterHost().empty()) {
return Aws::GameLift::GenericOutcome(Aws::GameLift::GameLiftError(Aws::GameLift::GAMELIFT_ERROR_TYPE::VALIDATION_EXCEPTION, "CrashReporterHost cannot be empty"));
}
#else
const char* crashReporterHost = params.GetCrashReporterHost();
if (crashReporterHost == nullptr || crashReporterHost[0] == '\0') {
return Aws::GameLift::GenericOutcome(Aws::GameLift::GameLiftError(Aws::GameLift::GAMELIFT_ERROR_TYPE::VALIDATION_EXCEPTION, "CrashReporterHost cannot be empty"));
}
#endif
// Validate CrashReporter port
int crashReporterPort = params.GetCrashReporterPort();
if (crashReporterPort < PORT_MIN || crashReporterPort > PORT_MAX) {
return Aws::GameLift::GenericOutcome(Aws::GameLift::GameLiftError(Aws::GameLift::GAMELIFT_ERROR_TYPE::VALIDATION_EXCEPTION, "CrashReporterPort must be between 1 and 65535"));
}
// Validate FlushIntervalMs
int flushIntervalMs = params.GetFlushIntervalMs();
if (flushIntervalMs < 0) {
return Aws::GameLift::GenericOutcome(Aws::GameLift::GameLiftError(Aws::GameLift::GAMELIFT_ERROR_TYPE::VALIDATION_EXCEPTION, "FlushIntervalMs must be non-negative"));
}
// Validate MaxPacketSize
int maxPacketSize = params.GetMaxPacketSize();
if (maxPacketSize < 0) {
return Aws::GameLift::GenericOutcome(Aws::GameLift::GameLiftError(Aws::GameLift::GAMELIFT_ERROR_TYPE::VALIDATION_EXCEPTION, "MaxPacketSize must be non-negative"));
}
return Aws::GameLift::GenericOutcome(nullptr);
}
} // namespace Metrics
} // namespace GameLift
} // namespace Aws

View File

@@ -0,0 +1,230 @@
/*
* All or portions of this file Copyright (c) Amazon.com, Inc. or its affiliates
* or its licensors.
*
* For complete copyright and license terms please see the LICENSE at the root
* of this distribution (the "License"). All use of this software is governed by
* the License, or, if provided, by the license below or the license
* accompanying this file. Do not remove or modify any license notices. This
* file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
* ANY KIND, either express or implied.
*
*/
#include <aws/gamelift/metrics/PacketBuilder.h>
#include <aws/gamelift/metrics/Samplers.h>
#include <aws/gamelift/metrics/LoggerMacros.h>
#include <cmath>
#include <iomanip>
#include <iostream>
#include <spdlog/sinks/stdout_color_sinks.h>
namespace {
static constexpr int NullTerminator = 1;
}
PacketBuilder::PacketBuilder(size_t packetSize, int floatPrecision)
: m_packetSize(packetSize), m_floatPrecision(floatPrecision) {
m_formatBuffer << std::setprecision(floatPrecision)
<< std::setiosflags(std::ios_base::fixed);
}
void PacketBuilder::Append(
const MetricMessage &message, const TagMap &globalTags,
const TagMap &metricTags,
Aws::GameLift::Metrics::MetricsSettings::SendPacketFunc sendPacketFunc) {
const auto startPosition = m_formatBuffer.tellp();
AppendToStream(message, m_floatPrecision, globalTags, metricTags,
m_formatBuffer);
const auto endPosition = m_formatBuffer.tellp();
const size_t messageLength = endPosition - startPosition;
if (messageLength > GetPacketSize() - NullTerminator) {
// If there's no way a message can fit the packet size, we drop it and log
// an error.
try {
// Use a simple message that doesn't reference message.Metric
// which could be null in tests
GAMELIFT_METRICS_LOG_WARN(
"Message length ({}) exceeds packet size ({}), message has "
"been dropped.",
messageLength, GetPacketSize() - NullTerminator);
} catch (...) {
// Silently continue if logging fails - don't break tests
}
// Reset the stream to the start position so we can continue appending
m_formatBuffer.seekp(startPosition);
return;
}
const size_t formatBufferLength = endPosition;
if (formatBufferLength > GetPacketSize() - NullTerminator) {
// Likely case:
// this message caused us to exceed packet size
//
// 1. Seek back to the end of the previous message (cut the new message
// out of the stream).
// 2. Flush (sending packet).
// 3. Re-append the message to the (now) empty stream.
m_formatBuffer.seekp(startPosition);
Flush(sendPacketFunc);
AppendToStream(message, m_floatPrecision, globalTags, metricTags,
m_formatBuffer);
} else if (formatBufferLength == GetPacketSize() - NullTerminator) {
// Unlikely case:
// we hit the packet size exactly
// so we can just flush
Flush(sendPacketFunc);
}
}
void PacketBuilder::Flush(
Aws::GameLift::Metrics::MetricsSettings::SendPacketFunc sendPacketFunc) {
m_sendBuffer = m_formatBuffer.str();
m_sendBuffer.resize(m_formatBuffer.tellp());
sendPacketFunc(m_sendBuffer.c_str(),
static_cast<int>(m_sendBuffer.size() + NullTerminator));
m_formatBuffer.seekp(0);
}
namespace {
void WriteTags(const PacketBuilder::TagMap &tags, std::ostream &stream) {
if (tags.size() == 0) {
return;
}
auto it = std::begin(tags);
stream << it->first << ':' << it->second;
for (++it; it != std::end(tags); ++it) {
stream << ',' << it->first << ':' << it->second;
}
}
void WriteTags(const PacketBuilder::TagMap &globalTags,
const PacketBuilder::TagMap &metricTags, std::ostream &stream) {
if (globalTags.size() > 0 || metricTags.size() > 0) {
stream << "|#";
WriteTags(globalTags, stream);
if (globalTags.size() > 0 && metricTags.size() > 0) {
stream << ',';
}
WriteTags(metricTags, stream);
}
}
void WriteValue(double value, int floatPrecision, std::ostream &stream) {
const auto valueAsInteger = static_cast<int64_t>(value);
const double fractionalPart = value - valueAsInteger;
/*
* We check if the fractional part of our value is 0 when rounded for display.
*
* We configured ostream to round to a specific float precision earlier.
* Ie with a precision = 2, 1.425 becomes 1.43
*
* Due to floating point imprecision, we may, at some point, end up with a
* value like 1.0000002. When rounded this ends up as 1.00 (with precision = 2
* as before) and we'd prefer to print it as an integer.
*
* The check is simple: we multiply the fraction by 10^precision and see if
* when rounded the value equals to 0.
*
* 0.425 * 10^2 = 42.5 ~= 43 => not zero so we print 1.425 as a real
* 0.0000002 * 10^2 = 0.00002 ~= 0 => zero so we can print 1.0000002 as an
* integer
*
* This also takes care of integer valued doubles in general. (Fractional part
* is already zero in that case.)
*/
const double thousandths = std::pow(10, floatPrecision);
const bool hasFractionalPart = std::round(fractionalPart * thousandths) != 0;
if (hasFractionalPart) {
stream << value;
} else {
stream << valueAsInteger;
}
}
void WriteSampleRate(const MetricMessage &message, std::ostream &stream) {
// Get the sample rate from the sampler
if (message.Metric) {
Aws::GameLift::Metrics::ISampler &sampler = message.Metric->GetSampler();
float rate = sampler.GetSampleRate();
// Only add sample rate if it's less than 1.0 (1.0 is implicit/default in
// StatsD). Packets with a 0.0 sample rate are not sent.
if (rate < 1.0f) {
// Format using a separate stringstream with default formatting
std::ostringstream rateStream;
rateStream << rate;
stream << "|@" << rateStream.str();
}
}
}
void WriteMessage(const MetricMessage &message, int floatPrecision,
std::ostream &stream) {
static constexpr auto showPositiveSign = std::ios_base::showpos;
stream << message.Metric->GetKey() << ':';
if (message.Type == MetricMessageType::GaugeSet) {
WriteValue(message.SubmitDouble.Value, floatPrecision, stream);
stream << "|g";
} else if (message.Type == MetricMessageType::GaugeAdd) {
stream << std::setiosflags(showPositiveSign);
WriteValue(message.SubmitDouble.Value, floatPrecision, stream);
stream << "|g" << std::resetiosflags(showPositiveSign);
} else if (message.Type == MetricMessageType::CounterAdd) {
WriteValue(message.SubmitDouble.Value, floatPrecision, stream);
stream << "|c" << std::resetiosflags(showPositiveSign);
} else if (message.Type == MetricMessageType::TimerSet) {
WriteValue(message.SubmitDouble.Value, floatPrecision, stream);
stream << "|ms";
}
// Add sample rate if using SampleFraction
WriteSampleRate(message, stream);
}
} // namespace
void AppendToStream(const MetricMessage &message, int floatPrecision,
const PacketBuilder::TagMap &globalTags,
const PacketBuilder::TagMap &metricTags,
std::ostream &stream) {
if (message.Type == MetricMessageType::GaugeSet &&
message.SubmitDouble.Value < 0) {
/*
* There's an intentional amgibuity in `gaugor:-10|g`
* It means 'subtract 10 from gaugor'. So the only way to set a
* gauge to a negative value is by setting it to zero first and
* subtracting:
* gaugor:0|g
* gaugor:-10|g
*
* See:
https://github.com/statsd/statsd/blob/master/docs/metric_types.md#gauges
*/
WriteMessage(MetricMessage::GaugeSet(*message.Metric, 0), floatPrecision,
stream);
WriteTags(globalTags, metricTags, stream);
stream << '\n';
WriteMessage(
MetricMessage::GaugeAdd(*message.Metric, message.SubmitDouble.Value),
floatPrecision, stream);
WriteTags(globalTags, metricTags, stream);
stream << '\n';
} else if (message.IsCounter() && message.SubmitDouble.Value <= 0) {
// Skip non-positive or negative counters
} else {
WriteMessage(message, floatPrecision, stream);
WriteTags(globalTags, metricTags, stream);
stream << '\n';
}
}

View File

@@ -0,0 +1,207 @@
/*
* All or portions of this file Copyright (c) Amazon.com, Inc. or its affiliates
* or its licensors.
*
* For complete copyright and license terms please see the LICENSE at the root
* of this distribution (the "License"). All use of this software is governed by
* the License, or, if provided, by the license below or the license
* accompanying this file. Do not remove or modify any license notices. This
* file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
* ANY KIND, either express or implied.
*
*/
#include <aws/gamelift/metrics/Percentiles.h>
#include <aws/gamelift/metrics/LoggerMacros.h>
#include <algorithm>
#include <array>
#include <iomanip>
#include <set>
#include <sstream>
#include <vector>
namespace Aws {
namespace GameLift {
namespace Metrics {
namespace {
struct PercentileMetric {
double m_percentile;
DynamicMetric m_metric;
bool m_metricInitialized = false;
explicit PercentileMetric(double percentile) : m_percentile(percentile) {}
double GetPercentile() const { return m_percentile; }
void SetPercentile(double percentile) { m_percentile = percentile; }
PercentileMetric &WithPercentile(double percentile) {
SetPercentile(percentile);
return *this;
}
};
/**
* INTERNAL: concrete percentile metric implementation
*/
class PercentilesImpl : public IDerivedMetric {
public:
template <class It> PercentilesImpl(It begin, It end) {
std::transform(begin, end, std::back_inserter(m_percentiles),
[](double value) { return PercentileMetric(value); });
}
virtual void HandleMessage(MetricMessage &message,
IMetricsEnqueuer &submitter) override {
switch (message.Type) {
case MetricMessageType::GaugeAdd:
m_currentValue += message.SubmitDouble.Value;
AppendValue(m_currentValue);
break;
case MetricMessageType::GaugeSet:
case MetricMessageType::TimerSet:
m_currentValue = message.SubmitDouble.Value;
AppendValue(m_currentValue);
break;
case MetricMessageType::TagSet:
case MetricMessageType::TagRemove:
for (auto &percentile : m_percentiles) {
CopyTagMessage(message, percentile.m_metric, submitter);
}
break;
default:
break;
}
}
virtual void EmitMetrics(const IMetric *originalMetric,
IMetricsEnqueuer &submitter) override {
if (m_numSeenSinceLastEmitCall == 0) {
return;
}
m_numSeenSinceLastEmitCall = 0;
std::sort(std::begin(m_values), std::end(m_values));
for (auto &percentile : m_percentiles) {
EmitPercentile(originalMetric, percentile, submitter);
}
m_values.clear();
}
private:
void AppendValue(double newCurrentValue) {
m_values.emplace_back(newCurrentValue);
m_numSeenSinceLastEmitCall++;
}
void EmitPercentile(const IMetric *originalMetric,
PercentileMetric &percentile,
IMetricsEnqueuer &submitter) {
const double value = ComputePercentile(percentile.GetPercentile());
if (!percentile.m_metricInitialized) {
percentile.m_metric.SetMetricType(originalMetric->GetMetricType());
std::stringstream stream;
stream << originalMetric->GetKey() << ".p" << std::setw(2)
<< std::setfill('0')
<< static_cast<int>(percentile.GetPercentile() * 100.0);
#ifdef GAMELIFT_USE_STD
percentile.m_metric.SetKey(stream.str());
#else
percentile.m_metric.SetKey(stream.str().c_str());
#endif
percentile.m_metricInitialized = true;
}
switch (percentile.m_metric.GetMetricType()) {
case MetricType::Gauge:
submitter.Enqueue(MetricMessage::GaugeSet(percentile.m_metric, value));
break;
case MetricType::Timer:
submitter.Enqueue(MetricMessage::TimerSet(percentile.m_metric, value));
break;
default:
break;
}
}
double ComputePercentile(double percentile) const {
const double startIndexFloat = percentile * (m_values.size() - 1);
const size_t startIndex = static_cast<size_t>(startIndexFloat);
const double fractionalPart = startIndexFloat - startIndex;
const bool isBetweenValues = fractionalPart != 0;
if (isBetweenValues) {
const double start = m_values[startIndex];
const double end = m_values[startIndex + 1];
return start + fractionalPart * (end - start);
} else {
return m_values[startIndex];
}
}
private:
std::vector<PercentileMetric> m_percentiles;
size_t m_numSeenSinceLastEmitCall = 0;
double m_currentValue = 0;
std::vector<double> m_values;
};
/**
* INTERNAL: validate percentiles and report errors / warnings for debug builds
*
* @param begin The begin iterator of percentile collection
* @param end The past-the-end iterator of percentile collection
*/
template <class InputIt>
inline void ValidatePercentiles(InputIt begin, InputIt end) {
if (begin == end) {
GAMELIFT_METRICS_LOG_CRITICAL("Percentile list is empty.");
}
for (auto it = begin + 1; it != end; ++it) {
if (*(it - 1) == *it) {
GAMELIFT_METRICS_LOG_CRITICAL("Duplicate percentiles detected.", *it);
}
}
for (auto it = begin; it != end; ++it) {
const auto &value = *it;
if (value < 0 || value > 1) {
GAMELIFT_METRICS_LOG_CRITICAL("Percentiles must be in the [0, 1] range.",
value);
}
const auto percentileFloat = value * 100;
const auto percentileInteger = static_cast<int>(percentileFloat);
const double fractionalPart = percentileFloat - percentileInteger;
const double absDistance = 0.0001;
if (std::abs(fractionalPart) >= absDistance) {
// Warn about the user entering too many digits past decimal point.
// Diff might be a tiny value due to float rounding, so we check against
// distance to ignore it.
GAMELIFT_METRICS_LOG_WARN(
"Percentile {} ({}) has too many digits past the decimal "
"point. It will be truncated to {}",
percentileFloat, value, percentileInteger);
}
}
}
} // anonymous namespace
namespace Internal {
PercentilesWrapper PercentilesWrapper::Create(double *begin, double *end) {
std::sort(begin, end); // sort percentiles (mostly for validation but might
// as well keep them in this order)
#ifndef NDEBUG
ValidatePercentiles(begin, end);
#endif
return PercentilesWrapper(new PercentilesImpl(begin, end));
}
} // namespace Internal
} // namespace Metrics
} // namespace GameLift
} // namespace Aws

View File

@@ -0,0 +1,86 @@
/*
* All or portions of this file Copyright (c) Amazon.com, Inc. or its affiliates
* or its licensors.
*
* For complete copyright and license terms please see the LICENSE at the root
* of this distribution (the "License"). All use of this software is governed by
* the License, or, if provided, by the license below or the license
* accompanying this file. Do not remove or modify any license notices. This
* file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
* ANY KIND, either express or implied.
*
*/
#include <aws/gamelift/metrics/Samplers.h>
#include <chrono>
#include <mutex>
#include <random>
namespace Aws {
namespace GameLift {
namespace Metrics {
namespace Internal {
/**
* Basic wrapper around seed_seq so we don't have to leak it in the header
*/
struct SeedState {
std::seed_seq m_seed;
// std::seed_seq has internal state that is updated when we first initialize
// the sampler per-thread
std::mutex m_mutex;
explicit SeedState(Int64 seed) : m_seed{seed} {}
};
} // namespace Internal
namespace {
/**
* Per thread sampler with own RNG.
*
* We use this to avoid synchronization in multithreaded scenarios.
* This way we only have to lock when first initializing the thread.
*/
class PerThreadSampler {
public:
explicit PerThreadSampler(Internal::SeedState &state) noexcept
: m_distribution(0.0, 1.0) {
const std::lock_guard<std::mutex> guard(state.m_mutex);
if (state.m_seed.size() > 0) {
m_engine = std::mt19937(state.m_seed);
}
}
bool ShouldTakeSample(float fraction) {
const float randomValue = m_distribution(m_engine);
return randomValue <= fraction;
}
private:
std::uniform_real_distribution<float> m_distribution;
std::mt19937 m_engine;
};
} // namespace
ISampler::~ISampler() {}
SampleFraction::SampleFraction(float fractionToSample, Int64 seed)
: m_fractionToSample(fractionToSample),
m_seed(new Internal::SeedState(seed)) {}
SampleFraction::~SampleFraction() { delete m_seed; }
bool SampleFraction::ShouldTakeSample() {
// We use per-thread RNG to avoid synchronization.
static thread_local PerThreadSampler sampler(*m_seed);
return sampler.ShouldTakeSample(m_fractionToSample);
}
Int64 SampleFraction::DefaultSeed() {
using Nanoseconds = std::chrono::duration<Int64, std::nano>;
return std::chrono::duration_cast<Nanoseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
}
} // namespace Metrics
} // namespace GameLift
} // namespace Aws

View File

@@ -0,0 +1,38 @@
/*
* All or portions of this file Copyright (c) Amazon.com, Inc. or its affiliates or
* its licensors.
*
* For complete copyright and license terms please see the LICENSE at the root of this
* distribution (the "License"). All use of this software is governed by the License,
* or, if provided, by the license below or the license accompanying this file. Do not
* remove or modify any license notices. This file is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
*/
#include <aws/gamelift/metrics/StatsDClient.h>
#include <aws/gamelift/metrics/LoggerMacros.h>
using namespace Aws::GameLift::Metrics;
#ifdef GAMELIFT_USE_STD
StatsDClient::StatsDClient(const std::string& host, int port)
#else
StatsDClient::StatsDClient(const char* host, int port)
#endif
: m_socket(m_io_service)
, m_endpoint(asio::ip::address::from_string(host), port) {
try {
m_socket.open(asio::ip::udp::v4());
} catch (const std::exception& e) {
GAMELIFT_METRICS_LOG_ERROR("Failed to open StatsD socket: {}", e.what());
}
}
void StatsDClient::Send(const char* data, int size) {
try {
m_socket.send_to(asio::buffer(data, size), m_endpoint);
} catch (const std::exception& e) {
GAMELIFT_METRICS_LOG_ERROR("Failed to send StatsD packet: {}", e.what());
}
}

View File

@@ -0,0 +1,64 @@
/*
* All or portions of this file Copyright (c) Amazon.com, Inc. or its affiliates
* or its licensors.
*
* For complete copyright and license terms please see the LICENSE at the root
* of this distribution (the "License"). All use of this software is governed by
* the License, or, if provided, by the license below or the license
* accompanying this file. Do not remove or modify any license notices. This
* file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
* ANY KIND, either express or implied.
*
*/
#include <aws/gamelift/metrics/Tags.h>
#include <aws/gamelift/metrics/DynamicTag.h>
#include <algorithm>
#include <cassert>
void Tags::Handle(MetricMessage &message) {
assert(message.Type == MetricMessageType::TagSet ||
message.Type == MetricMessageType::TagRemove);
if (message.Type == MetricMessageType::TagSet) {
HandleSet(message.Metric, message.SetTag);
} else if (message.Type == MetricMessageType::TagRemove) {
HandleRemove(message.Metric, message.SetTag);
}
}
void Tags::HandleSet(const IMetric *metric, MetricSetTag &message) {
// Create tag map for current metric if not exist
auto it = m_tags.find(metric);
if (it == std::end(m_tags)) {
it = m_tags.emplace(metric, std::unordered_map<std::string, std::string>())
.first;
}
assert(it != std::end(m_tags));
auto &metricTags = it->second;
auto tagIt = metricTags.find(message.Ptr->Key);
if (tagIt != std::end(metricTags)) {
// Swap-in the new value to avoid copy
// Old value gets deleted below.
using std::swap;
swap(tagIt->second, message.Ptr->Value);
} else {
// Move key and value into dictionary to avoid copy. (We delete empty shells
// below.)
metricTags.emplace(std::move(message.Ptr->Key),
std::move(message.Ptr->Value));
}
delete message.Ptr;
message.Ptr = nullptr;
}
void Tags::HandleRemove(const IMetric *metric, MetricSetTag &message) {
auto it = m_tags.find(metric);
if (it != std::end(m_tags)) {
it->second.erase(message.Ptr->Key);
}
delete message.Ptr;
message.Ptr = nullptr;
}

View File

@@ -0,0 +1,26 @@
/*
* All or portions of this file Copyright (c) Amazon.com, Inc. or its affiliates
* or its licensors.
*
* For complete copyright and license terms please see the LICENSE at the root
* of this distribution (the "License"). All use of this software is governed by
* the License, or, if provided, by the license below or the license
* accompanying this file. Do not remove or modify any license notices. This
* file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
* ANY KIND, either express or implied.
*
*/
#include <aws/gamelift/metrics/TypeTraits.h>
#ifndef GAMELIFT_USE_STD
namespace Aws {
namespace GameLift {
namespace Metrics {
namespace Internal {
const bool TrueType::value;
const bool FalseType::value;
} // namespace Internal
} // namespace Metrics
} // namespace GameLift
} // namespace Aws
#endif // !GAMELIFT_USE_STD